Как опубликовать из мультипроцесса в paho python mqtt

#mqtt #publish #python-multiprocessing

#mqtt #опубликовать #python-многопроцессорный

Вопрос:

Я запускаю скрипт python на raspberry pi, который после получения сообщения MQTT запускает функцию в многопроцессорном режиме. Публикация сообщения mqtt из основного скрипта работает нормально и принимается брокером. Однако функция, которая запускается в новом процессе, не может опубликовать. Нет сообщения об ошибке. Функция печатает журналы тестов, поэтому она определенно запущена.

     ### on message, run function in a new process
    def on_message(client, obj, msg):
        def threaded_message():
            print("Hello, process is running")
            ### This publish does not work!
            mqttc.publish(topicStatus, "message received")

    myProcess = multiprocessing.Process(target=threaded_message)
    myProcess.start()      
    ### MQTT setup
    mqttc = mqtt.Client()
    mqttc.on_message = on_message
    mqttc.on_connect = on_connect
    mqttc.on_publish = on_publish
    mqttc.on_subscribe = on_subscribe

    url_str = 'm24.cloudmqtt.com'
    url_port = '16310'

    topicStatus = "Home/Status"
    topicCommands = "Home/Commands"

    mqttc.username_pw_set(myUsername, myPassword)
    mqttc.connect(url_str, url_port)
    ### This publish does work!
    mqttc.publish(topicStatus, "Online")

    mqttc.loop_forever()
  

Запуск mqttc.publish в основном скрипте успешно публикует сообщение.
Запуск мультипроцесса печатает сообщение, но не публикует сообщение.

Почему функция публикации не работает в этом сценарии?

Комментарии:

1. Вы решили свою проблему?

2. К сожалению, нет. Пришлось переписать скрипт, чтобы все потоки отправляли свои сообщения через очередь в основной скрипт, который затем считывает очередь для нового контента и публикует их.

Ответ №1:

Причина, по которой ваш MQTT не публикуется, заключается в том, что вы пытаетесь подключить несколько потоков MQTT с одним и тем же клиентом, и вы подключаете только один клиент для всех потоков.

Решение: для многопроцессорной обработки вам необходимо создать несколько клиентов mqtt, чтобы ваши несколько клиентов могли публиковать сообщения параллельно.

 from datetime import datetime, time
import paho.mqtt.client as mqtt
import time
import multiprocessing


def on_connect(client, userdata, flags, rc):
    
    if rc == 0:
        global Connected               
        Connected = True               
    
    else:
        print("Connection failed")

def client_conn():
    # MQTT connection 
    cli = "test" str(datetime.now())
    client = mqtt.Client(cli)
    client.on_connect = on_connect
    
                
    client.connect("mqttServer", "mqttPort")
    client.username_pw_set("mqttUsername", "mqttPassword")
    client.loop_start()
    while Connected != True:  
        time.sleep(0.1)
    return client

def mqtt_publish():
    # Publish on mqtt
    client = client_conn()
    client.publish("topic", "payload")
    


def mul_process():
    # Process creation
    p = multiprocessing.Process(target=mqtt_publish)
    p.start()
    p.join()
    
# Execution starts here
mul_process()