Python AWS SQS издевается над MOTO

#python #mocking #amazon-sqs #moto

Вопрос:

Я пытаюсь издеваться над AWS SQS с помощью moto, ниже приведен мой код

 from myClass import get_msg_from_sqs
from moto import mock_sqs
#from moto.sqs import mock_sqs


@mock_sqs
def test_get_all_msg_from_queue():
    
    #from myClass import get_msg_from_sqs
    
    conn = boto3.client('sqs', region_name='us-east-1')
    
    queue = conn.create_queue(QueueName='Test')
    
    os.environ["SQS_URL"] = queue["QueueUrl"]
    
    queue.send_message( MessageBody=json.dumps({'a': '1', 'b': '2', 'c': '3'}))
    
    
    #Tried this as well
    #conn.send_message(QueueUrl=queue["QueueUrl"], MessageBody=json.dumps({'a': '1', 'b': '2', 'c': '3'}))

    resp = get_msg_from_sqs(queue["QueueUrl"]) 

    assert resp is not None
 

При выполнении этого я получаю следующую ошибку

 >       queue.send_message( MessageBody=json.dumps({'a': '1', 'b': '2', 'c': '3'}))
E       AttributeError: 'dict' object has no attribute 'send_message'
 

Если я попробую другой способ отправки сообщения в SQS(см. Закомментированный код #Пробовал и это)
, то во время фактического вызова SQS в моем методе get_msg_from_sqs я получаю следующую ошибку

 E  botocore.exceptions.ClientError: An error occurred
(InvalidAddress) when calling the ReceiveMessage operation: 
The address https://queue.amazonaws.com/ is not valid for this endpoint.
 

Я запускаю его на win10 с помощью PyCharm, и версия moto установлена на

 moto = "^2.2.6"
 

Мой код приведен ниже

 sqs = boto3.client('sqs')
def get_msg_from_queue(queue_url: str) -> dict:
    return sqs.receive_message(QueueUrl=queue_url, AttributeNames=['All'],
               MaxNumberOfMessages=1, VisibilityTimeout=3600, WaitTimeSeconds=0)
 

Что я здесь упускаю?

Комментарии:

1. Вы используете макет здесь, чтобы проверить, не является ли сообщение, прочитанное из очереди, «Нет». Обычно существует определенный шаблон, который вы ищете, который вы устанавливаете, а затем утверждаете. Не могли бы вы также добавить код для get_msg_from_sqs ?

2. @vivekveeramani Я добавил код для get_msg_from_sqs, надеюсь, это поможет.

3. Как вы можете видеть в документах, create_queue возвращает диктант, а не очередь: boto3.amazonaws.com/v1/documentation/api/latest/reference/…

4. @gshpychka пожалуйста, взгляните на код еще раз, очередь-это просто имя переменной, которую я использовал.

5. @gshpychka можете ли вы добавить то же самое в качестве ответа, я хотел бы присудить вам награду

Ответ №1:

Ваша queue переменная-это диктант, возвращаемый create_queue:

 queue = conn.create_queue(QueueName='Test')
 

Это не очередь, и поэтому вы не можете позвонить sendMessage по ней.

Для этого вам нужно создать объект очереди:

 sqs = boto3.resource('sqs')
response = conn.create_queue(QueueName='Test')
queue_url = response["QueueURL"]
queue = sqs.Queue(queue_url)

queue.send_message()
 

Ответ №2:

Согласно @gshpychka, вам нужно посмотреть, как create_queue это работает. В частности, он возвращает диктант этой формы:

Структура ответа (дикт) —

Очередь (строка) —

URL-адрес созданной очереди Amazon SQS.

Поэтому, используя этот api, вы делаете:

 import boto3
from time import sleep

conn = boto3.client('sqs')
queue = conn.create_queue(QueueName="Test")["QueueUrl"]
sleep(1) # wait at least 1s before using queue
response = conn.send_message(
    QueueUrl=queue,
    MessageBody='string',
    ...)
 

Я согласен, что документы сбивают с толку. Путаница, вероятно, возникла из-за api ресурсов sqs, который работает по-другому:

 import boto3
from time import sleep

sqs = boto3.resource('sqs')
queue = sqs.create_queue(QueueName="Test2")
sleep(1)
queue.send_message(...)
 

Это работает, потому что этот api возвращает Queue объект, который, вероятно, является тем, что вы ожидали.

Пожалуйста, обратите внимание, что @gshpychka уже дал ответ в комментарии; я только что написал его.