如何使用moto模拟SQS,以测试使用boto3调用Lambda函数。GetQueueUrl上引发的ClientError

Stacey丶梦菡
Stacey丶梦菡
Stacey丶梦菡
订阅者
314
文章
0
评论
测试交流1 208字数 222阅读0分44秒阅读模式
摘要我正在尝试为连接到已经创建的SQS队列的Lambda函数编写一些集成测试。我需要模拟这个SQS连接。 我不确定如果嘲笑将工作,因为我使用。。。

我正在尝试为连接到已经创建的SQS队列的Lambda函数编写一些集成测试。我需要模拟这个SQS连接。
由于我使用boto3 Lambda,我不确定模仿是否有效。调用()和SAM- sam本地启动lambda 调用pytest函数中的Lambda函数。

pytest样本代码文章源自玩技e族-https://www.playezu.com/179891.html


def connect_to_lambda_client(running_locally: bool = True):
    if running_locally:
        lambda_client = boto3.client(
            "lambda",
            region_name="us-west-2",
            endpoint_url="http://127.0.0.1:3001",
            use_ssl=False,
            verify=False,
            config=botocore.client.Config(
                signature_version=botocore.UNSIGNED,
                read_timeout=20,
                retries={"max_attempts": 0},
            ),
        )
    else:
        lambda_client = boto3.client("lambda")
    return lambda_client
@mock_sqs
def test_lambda_function():
    client = connect_to_lambda_client()
    lambda_response = client.invoke(FunctionName="ListPersonFunction")
    assert lambda_response.get("statusCode") == 200

样本lambda函数文章源自玩技e族-https://www.playezu.com/179891.html


QUEUE_NAME = os.getenv('SQSLOGQUEUENAME')
client = boto3.resource('sqs')
queue = client.get_queue_by_name(QueueName=QUEUE_NAME) #raises ClientError
queue.send_message(
            MessageBody=_record,
            MessageGroupId=self.group_id,
            MessageDeduplicationId=self.deduplication_id
        )

客户端错误的回溯文章源自玩技e族-https://www.playezu.com/179891.html

[ERROR] ClientError: An error occurred (InvalidClientTokenId) when calling the GetQueueUrl operation: The secu    
raise error_class(parsed_response, operation_name) _make_api_callarams)
文章源自玩技e族-https://www.playezu.com/179891.html文章源自玩技e族-https://www.playezu.com/179891.html
 
    • Bert Blommers
      Bert Blommers 9

      Moto只模拟在其执行的Python上下文中发出的请求。它无法知道SAM内部发生了什么。
      在这种情况下,您可以使用MotoServer。它提供与Moto相同的功能,但作为独立服务器运行。
      要启动服务器:
      pip install moto[server,all]
      moto_server -H 0.0.0.0 -p 5000

      创建/检索队列可以通过提供端点URL来完成,这正是您连接SAM的方式:
      sqs_client = boto3.client(
      "sqs",
      region_name="us-west-2",
      endpoint_url="http://127.0.0.1:5000",
      )

      由于此服务器与所有服务器分开运行,因此可以从Python程序(创建队列)和SAM(检索队列)中连接到它。

    匿名

    发表评论

    匿名网友
    确定

    拖动滑块以完成验证