#python #mocking #amazon-sqs #moto
Вопрос:
Я пытаюсь издеваться над AWS SQS с помощью moto, ниже приведен мой код
from myClass import get_msg_from_sqs
from moto import mock_sqs
#from moto.sqs import mock_sqs
@mock_sqs
def test_get_all_msg_from_queue():
#from myClass import get_msg_from_sqs
conn = boto3.client('sqs', region_name='us-east-1')
queue = conn.create_queue(QueueName='Test')
os.environ["SQS_URL"] = queue["QueueUrl"]
queue.send_message( MessageBody=json.dumps({'a': '1', 'b': '2', 'c': '3'}))
#Tried this as well
#conn.send_message(QueueUrl=queue["QueueUrl"], MessageBody=json.dumps({'a': '1', 'b': '2', 'c': '3'}))
resp = get_msg_from_sqs(queue["QueueUrl"])
assert resp is not None
При выполнении этого я получаю следующую ошибку
> queue.send_message( MessageBody=json.dumps({'a': '1', 'b': '2', 'c': '3'}))
E AttributeError: 'dict' object has no attribute 'send_message'
Если я попробую другой способ отправки сообщения в SQS(см. Закомментированный код #Пробовал и это)
, то во время фактического вызова SQS в моем методе get_msg_from_sqs я получаю следующую ошибку
E botocore.exceptions.ClientError: An error occurred
(InvalidAddress) when calling the ReceiveMessage operation:
The address https://queue.amazonaws.com/ is not valid for this endpoint.
Я запускаю его на win10 с помощью PyCharm, и версия moto установлена на
moto = "^2.2.6"
Мой код приведен ниже
sqs = boto3.client('sqs')
def get_msg_from_queue(queue_url: str) -> dict:
return sqs.receive_message(QueueUrl=queue_url, AttributeNames=['All'],
MaxNumberOfMessages=1, VisibilityTimeout=3600, WaitTimeSeconds=0)
Что я здесь упускаю?
Комментарии:
1. Вы используете макет здесь, чтобы проверить, не является ли сообщение, прочитанное из очереди, «Нет». Обычно существует определенный шаблон, который вы ищете, который вы устанавливаете, а затем утверждаете. Не могли бы вы также добавить код для get_msg_from_sqs ?
2. @vivekveeramani Я добавил код для get_msg_from_sqs, надеюсь, это поможет.
3. Как вы можете видеть в документах,
create_queue
возвращает диктант, а не очередь: boto3.amazonaws.com/v1/documentation/api/latest/reference/…4. @gshpychka пожалуйста, взгляните на код еще раз, очередь-это просто имя переменной, которую я использовал.
5. @gshpychka можете ли вы добавить то же самое в качестве ответа, я хотел бы присудить вам награду
Ответ №1:
Ваша queue
переменная-это диктант, возвращаемый create_queue:
queue = conn.create_queue(QueueName='Test')
Это не очередь, и поэтому вы не можете позвонить sendMessage
по ней.
Для этого вам нужно создать объект очереди:
sqs = boto3.resource('sqs')
response = conn.create_queue(QueueName='Test')
queue_url = response["QueueURL"]
queue = sqs.Queue(queue_url)
queue.send_message()
Ответ №2:
Согласно @gshpychka, вам нужно посмотреть, как create_queue
это работает. В частности, он возвращает диктант этой формы:
Структура ответа (дикт) —
Очередь (строка) —
URL-адрес созданной очереди Amazon SQS.
Поэтому, используя этот api, вы делаете:
import boto3
from time import sleep
conn = boto3.client('sqs')
queue = conn.create_queue(QueueName="Test")["QueueUrl"]
sleep(1) # wait at least 1s before using queue
response = conn.send_message(
QueueUrl=queue,
MessageBody='string',
...)
Я согласен, что документы сбивают с толку. Путаница, вероятно, возникла из-за api ресурсов sqs, который работает по-другому:
import boto3
from time import sleep
sqs = boto3.resource('sqs')
queue = sqs.create_queue(QueueName="Test2")
sleep(1)
queue.send_message(...)
Это работает, потому что этот api возвращает Queue
объект, который, вероятно, является тем, что вы ожидали.
Пожалуйста, обратите внимание, что @gshpychka уже дал ответ в комментарии; я только что написал его.