#python #rabbitmq #mqtt #amqp #pika
#python #rabbitmq #mqtt #amqp #pika
Вопрос:
Я запускаю RabbitMQ 3.7.28 на узле Linux, одиночная установка, больше нет узлов кластера. Плагин сообщений MQTT включен, как TLS, так и не-TLS соединения успешны. Используя python 3.8 и pika 1.1.0, я отправил 1 000 000 сообщений брокеру через AMQP.
Во время отправки сообщений у меня было подключено два потребителя: один с использованием pika / AMQP, один с использованием paho-mqtt 1.5.1. Оба потребителя получили 1 000 000 сообщений.
Затем я попытался отправлять сообщения с помощью paho-mqtt, и после завершения этого скрипта оба клиента получили 999 983 сообщения. Повторные тесты показали, что отбрасывается разное количество сообщений, но всегда в десятках.
Чтобы выяснить, что происходит, я добавил количество сообщений к сообщению. Полученные сообщения показали, что отсутствовали только последние сообщения. Потребитель показал это сообщение как последнюю запись:
99979: dev / testtopic b’99979: 2020-10-05T12:00:00.682216′
(первый 99979 — это счетчик от потребителя, второй — счетчик от производителя)
Пытаясь улучшить ситуацию, я установил qos = 1. Теперь надежно после 20 сообщений потребители перестают получать сообщения. Производитель существует без ошибок после количества сообщений, которые я намеревался отправить.
Я делаю что-то не так? Можете ли вы сказать мне, где теряются сообщения? Или дайте мне подсказку о том, как отладить эту проблему? Результаты не зависят от использования TLS или его отключения.
Если у вас есть вопросы, пожалуйста, задавайте их!
Спасибо.
Для справки: вот (большая часть) кода, который я использовал:
Производитель MQTT
import paho.mqtt.client as mqtt
from datetime import datetime
client = mqtt.Client()
client.username_pw_set(user, password)
client.connect(server, port)
print(datetime.utcnow().isoformat())
for i in range(1000000):
client.publish("dev/testtopic", f'{i 1}: {datetime.utcnow().isoformat()}', qos=0)
print(datetime.utcnow().isoformat())
client.disconnect()
Производитель AMQP
import pika
from datetime import datetime
from urllib.parse import quote
with pika.BlockingConnection(pika.URLParameters(f'amqp://{user}:{password}@{server}:{port}/{vhost}')) as connection:
print(datetime.utcnow().isoformat())
channel = connection.channel()
routing_key = 'dev.testtopic'
for i in range(1000000):
channel.basic_publish(
exchange='amq.topic', routing_key=routing_key, body=f'{i}: {datetime.utcnow().isoformat()}')
print(datetime.utcnow().isoformat())
Потребитель MQTT
import paho.mqtt.client as mqtt
def on_connect(client, userdata, flags, rc):
print("Connected with result code " str(rc))
# Subscribing in on_connect() means that if we lose the connection and
# reconnect then subscriptions will be renewed.
client.subscribe("$SYS/#")
client.subscribe("dev/testtopic")
def on_message(client, userdata, msg):
global count
count = 1
print(f'{count}: {msg.topic} {str(msg.payload)}')
count = 0
client = mqtt.Client()
client.username_pw_set(user, password)
client.on_connect = on_connect
client.on_message = on_message
client.connect(server, host)
client.loop_forever()
Потребитель AMQP
import pika
def callback(ch, method, properties, body):
global count
count = 1
print(f'{count}: {method.routing_key} {body}')
with pika.BlockingConnection(pika.URLParameters(f'amqp://{user}:{password}@{server}:{port}/{vhost}')) as connection:
channel = connection.channel()
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
channel.queue_bind(exchange='amq.topic', queue=queue_name, routing_key='dev.testtopic')
print(' [*] Waiting for messages. To exit press CTRL C')
count = 0
channel.basic_consume(
queue=queue_name, on_message_callback=callback, auto_ack=True)
channel.start_consuming()
Комментарии:
1. Вы пытались добавить ‘on_disconnect ()’ к вашему потребителю MQTT, чтобы узнать, отключается ли он вообще? Если проблема возникает только в конце выполнения, попробуйте прокомментировать вызов ‘client.disconnect ()’ у производителя MQTT и посмотреть, изменит ли это что-нибудь …. может быть проблема со временем.
2. Большое спасибо. Потребитель не отключается. Оказывается, это проблема с синхронизацией и / или сбросом; с некоторыми изменениями в коде я могу надежно отправлять> 5 000 000 сообщений в спешке. Я выясню, вызвано ли это paho или плагином MQTT на сервере, и создам соответствующий тикет.
Ответ №1:
Некоторые предложения:
- Ваш издатель AMQP должен использовать подтверждения издателя. Без них вы можете потерять сообщения — https://www.rabbitmq.com/confirms.html#publisher-confirms
- Ваш клиент MQTT завершает работу до публикации всех сообщений. Это не ошибка RabbitMQ. Вам необходимо зарегистрироваться для
on_publish
обратного вызова и убедиться, что публикация завершена, прежде чем ваша программа завершит работу. Хакерский способ сделать это — подождать некоторое время после вашей последней публикации (30 секунд?), А Затем выйти.
ПРИМЕЧАНИЕ: команда RabbitMQ отслеживает список rabbitmq-users
рассылки и только иногда отвечает на вопросы по StackOverflow.