我正在尝试为连接到已经创建的SQS队列的Lambda函数编写一些集成测试。我需要模拟这个SQS连接。
由于我使用boto3 Lambda,我不确定模仿是否有效。调用()和SAM- sam本地启动lambda
调用pytest函数中的Lambda函数。
pytest样本代码文章源自玩技e族-https://www.playezu.com/179891.html
def connect_to_lambda_client(running_locally: bool = True):
if running_locally:
lambda_client = boto3.client(
"lambda",
region_name="us-west-2",
endpoint_url="http://127.0.0.1:3001",
use_ssl=False,
verify=False,
config=botocore.client.Config(
signature_version=botocore.UNSIGNED,
read_timeout=20,
retries={"max_attempts": 0},
),
)
else:
lambda_client = boto3.client("lambda")
return lambda_client
@mock_sqs
def test_lambda_function():
client = connect_to_lambda_client()
lambda_response = client.invoke(FunctionName="ListPersonFunction")
assert lambda_response.get("statusCode") == 200
样本lambda函数文章源自玩技e族-https://www.playezu.com/179891.html
QUEUE_NAME = os.getenv('SQSLOGQUEUENAME')
client = boto3.resource('sqs')
queue = client.get_queue_by_name(QueueName=QUEUE_NAME) #raises ClientError
queue.send_message(
MessageBody=_record,
MessageGroupId=self.group_id,
MessageDeduplicationId=self.deduplication_id
)
客户端错误的回溯文章源自玩技e族-https://www.playezu.com/179891.html
[ERROR] ClientError: An error occurred (InvalidClientTokenId) when calling the GetQueueUrl operation: The secu
raise error_class(parsed_response, operation_name) _make_api_callarams)
文章源自玩技e族-https://www.playezu.com/179891.html文章源自玩技e族-https://www.playezu.com/179891.html
未知地区 1F
Moto只模拟在其执行的Python上下文中发出的请求。它无法知道SAM内部发生了什么。
在这种情况下,您可以使用MotoServer。它提供与Moto相同的功能,但作为独立服务器运行。
要启动服务器:
pip install moto[server,all]
moto_server -H 0.0.0.0 -p 5000
创建/检索队列可以通过提供端点URL来完成,这正是您连接SAM的方式:
sqs_client = boto3.client(
"sqs",
region_name="us-west-2",
endpoint_url="http://127.0.0.1:5000",
)
由于此服务器与所有服务器分开运行,因此可以从Python程序(创建队列)和SAM(检索队列)中连接到它。