Как включить параллельное получение / обработку подписанных тем / сообщений с помощью клиента HiveMQ mqtt

#parallel-processing #client #mqtt #hivemq

#параллельная обработка #клиент #mqtt #hivemq

Вопрос:

в настоящее время мы переходим с более старой версии клиента Eclipe Paho MQTT на версию 1.2 клиента HiveMQ MQTT. https://github.com/hivemq/hivemq-mqtt-client

В настоящее время играется с Aync-версией клиента, которому требуется функция-потребитель в качестве обратного вызова.

Одно из наших клиентских приложений MQTT должно обрабатывать / получать много сообщений по многим различным темам, и обработка одного сообщения не должна ждать завершения предыдущего. Мы не уверены, какой наилучший способ добиться параллельной обработки сообщений только с одним экземпляром клиента.

В документации выше есть необязательный исполнитель, который может быть определен

 client.subscribeWith()
    .topicFilter("test/topic")
    .qos(MqttQos.EXACTLY_ONCE)
    .callback(System.out::println)
    .executor(executor) // optional
    .send();
  

Как должен вести себя AsyncClient, когда исполнитель не определен?
Затем все обрабатывается последовательно блокирующим способом?
Это каким-то образом противоречит цели определения асинхронности с обратным вызовом….

В нашей старой реализации мы использовали общие подписки (что было нестандартной функцией в HiveMQ 3, а теперь является стандартной функцией MQTT 5) с несколькими экземплярами клиента, постоянно ожидающими, что одни и те же темы будут обрабатываться поочередно.

Однако, учитывая клиентский API HiveMQ (в котором, к сожалению, не хватает дополнительных объяснений или примеров), мы надеемся найти более элегантный и простой способ для достижения параллельной обработки с помощью пула потоков или чего-то еще!

Любая помощь приветствуется!

Ответ №1:

Обычно общие подписки необходимы только при масштабировании приложения на несколько компьютеров. Если ваша обработка сообщений может быть распараллелена, то не должно быть причин использовать общую подписку на одном компьютере. Если в будущем нагрузка на сообщения увеличится, вы все равно можете выбрать общие подписки, чтобы позже распространить их на несколько компьютеров.

Поскольку MQTT обеспечивает гарантии упорядочения, клиент HiveMQ MQTT последовательно выполняет один и тот же обратный вызов. Несколько обратных вызовов для разных подписок выполняются параллельно. Для одного обратного вызова только ваше приложение может выбрать разбиение заказа. Для этого вы можете просто передавать сообщения из обратного вызова параллельным рабочим.

Ответ №2:

Сейчас я работаю над той же проблемой. Из того, что я вижу, можно выполнить параллель для Qos 0, поскольку PUBACK не требуется, но … для Qos 1 и 2 проблема сложнее.
Брокер не отправит сообщение, если предыдущее не будет опубликовано. Конечно, клиент все еще может отправить PUBACK asap, даже не начиная обрабатывать сообщения. Но если используется подписка Qos 1, это используется по определенной причине — подписчик не хочет терять сообщения.
Если вы немедленно ОПУБЛИКУЕТЕ сообщение, ваш подписчик может потерпеть неудачу во время обработки и больше никогда не получит сообщение. В этом случае безопаснее получать, обрабатывать PUBACK. Здесь мы возвращаемся к последовательной обработке.

На данный момент я не нашел решения этой проблемы. RabbitMQ с предварительной выборкой очереди может справиться с этим, но я хотел бы решить это с помощью MQTT 🙂