Можете ли вы объяснить, почему RabbitMQ теряет сообщения, отправленные с использованием MQTT через paho / python?

#python #rabbitmq #mqtt #amqp #pika

#python #rabbitmq #mqtt #amqp #pika

Вопрос:

Я запускаю RabbitMQ 3.7.28 на узле Linux, одиночная установка, больше нет узлов кластера. Плагин сообщений MQTT включен, как TLS, так и не-TLS соединения успешны. Используя python 3.8 и pika 1.1.0, я отправил 1 000 000 сообщений брокеру через AMQP.

Во время отправки сообщений у меня было подключено два потребителя: один с использованием pika / AMQP, один с использованием paho-mqtt 1.5.1. Оба потребителя получили 1 000 000 сообщений.

Затем я попытался отправлять сообщения с помощью paho-mqtt, и после завершения этого скрипта оба клиента получили 999 983 сообщения. Повторные тесты показали, что отбрасывается разное количество сообщений, но всегда в десятках.

Чтобы выяснить, что происходит, я добавил количество сообщений к сообщению. Полученные сообщения показали, что отсутствовали только последние сообщения. Потребитель показал это сообщение как последнюю запись:

99979: dev / testtopic b’99979: 2020-10-05T12:00:00.682216′

(первый 99979 — это счетчик от потребителя, второй — счетчик от производителя)

Пытаясь улучшить ситуацию, я установил qos = 1. Теперь надежно после 20 сообщений потребители перестают получать сообщения. Производитель существует без ошибок после количества сообщений, которые я намеревался отправить.

Я делаю что-то не так? Можете ли вы сказать мне, где теряются сообщения? Или дайте мне подсказку о том, как отладить эту проблему? Результаты не зависят от использования TLS или его отключения.

Если у вас есть вопросы, пожалуйста, задавайте их!

Спасибо.

Для справки: вот (большая часть) кода, который я использовал:

Производитель MQTT

 import paho.mqtt.client as mqtt
from datetime import datetime

client = mqtt.Client()
client.username_pw_set(user, password)
client.connect(server, port)

print(datetime.utcnow().isoformat())
for i in range(1000000):
    client.publish("dev/testtopic", f'{i   1}: {datetime.utcnow().isoformat()}', qos=0)
print(datetime.utcnow().isoformat())

client.disconnect()
  

Производитель AMQP

 import pika
from datetime import datetime
from urllib.parse import quote


with pika.BlockingConnection(pika.URLParameters(f'amqp://{user}:{password}@{server}:{port}/{vhost}')) as connection:
    print(datetime.utcnow().isoformat())
    channel = connection.channel()
    routing_key = 'dev.testtopic'
    for i in range(1000000):
        channel.basic_publish(
            exchange='amq.topic', routing_key=routing_key, body=f'{i}: {datetime.utcnow().isoformat()}')
    print(datetime.utcnow().isoformat())
  

Потребитель MQTT

 import paho.mqtt.client as mqtt


def on_connect(client, userdata, flags, rc):
    print("Connected with result code "   str(rc))

    # Subscribing in on_connect() means that if we lose the connection and
    # reconnect then subscriptions will be renewed.
    client.subscribe("$SYS/#")
    client.subscribe("dev/testtopic")


def on_message(client, userdata, msg):
    global count
    count  = 1
    print(f'{count}: {msg.topic} {str(msg.payload)}')


count = 0

client = mqtt.Client()
client.username_pw_set(user, password)
client.on_connect = on_connect
client.on_message = on_message

client.connect(server, host)
client.loop_forever()
  

Потребитель AMQP

 import pika

def callback(ch, method, properties, body):
    global count
    count  = 1
    print(f'{count}: {method.routing_key} {body}')


with pika.BlockingConnection(pika.URLParameters(f'amqp://{user}:{password}@{server}:{port}/{vhost}')) as connection:
    channel = connection.channel()

    result = channel.queue_declare(queue='', exclusive=True)
    queue_name = result.method.queue

    channel.queue_bind(exchange='amq.topic', queue=queue_name, routing_key='dev.testtopic')
    print(' [*] Waiting for messages. To exit press CTRL C')
    count = 0


    channel.basic_consume(
        queue=queue_name, on_message_callback=callback, auto_ack=True)

    channel.start_consuming()
  

Комментарии:

1. Вы пытались добавить ‘on_disconnect ()’ к вашему потребителю MQTT, чтобы узнать, отключается ли он вообще? Если проблема возникает только в конце выполнения, попробуйте прокомментировать вызов ‘client.disconnect ()’ у производителя MQTT и посмотреть, изменит ли это что-нибудь …. может быть проблема со временем.

2. Большое спасибо. Потребитель не отключается. Оказывается, это проблема с синхронизацией и / или сбросом; с некоторыми изменениями в коде я могу надежно отправлять> 5 000 000 сообщений в спешке. Я выясню, вызвано ли это paho или плагином MQTT на сервере, и создам соответствующий тикет.

Ответ №1:

Некоторые предложения:

  • Ваш издатель AMQP должен использовать подтверждения издателя. Без них вы можете потерять сообщения — https://www.rabbitmq.com/confirms.html#publisher-confirms
  • Ваш клиент MQTT завершает работу до публикации всех сообщений. Это не ошибка RabbitMQ. Вам необходимо зарегистрироваться для on_publish обратного вызова и убедиться, что публикация завершена, прежде чем ваша программа завершит работу. Хакерский способ сделать это — подождать некоторое время после вашей последней публикации (30 секунд?), А Затем выйти.

ПРИМЕЧАНИЕ: команда RabbitMQ отслеживает список rabbitmq-users рассылки и только иногда отвечает на вопросы по StackOverflow.