Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to publish from a multiprocess in paho python mqtt

I am running a python script on a raspberry pi, which upon receiving an MQTT message, runs a function in a multiprocess. Publishing an mqtt message from the main script works fine and gets received by broker. However, the function that gets run in a new process is unable to publish. No error message. The function does print test logs, so it is definetely running.

    ### on message, run function in a new process
    def on_message(client, obj, msg):
        def threaded_message():
            print("Hello, process is running")
            ### This publish does not work!
            mqttc.publish(topicStatus, "message received")

    myProcess = multiprocessing.Process(target=threaded_message)
    myProcess.start()      
    ### MQTT setup
    mqttc = mqtt.Client()
    mqttc.on_message = on_message
    mqttc.on_connect = on_connect
    mqttc.on_publish = on_publish
    mqttc.on_subscribe = on_subscribe

    url_str = 'm24.cloudmqtt.com'
    url_port = '16310'

    topicStatus = "Home/Status"
    topicCommands = "Home/Commands"

    mqttc.username_pw_set(myUsername, myPassword)
    mqttc.connect(url_str, url_port)
    ### This publish does work!
    mqttc.publish(topicStatus, "Online")

    mqttc.loop_forever()

Running mqttc.publish in main script successfully publishes the message. Running the multiprocess prints the message, but does not publish the message.

Why is the publish function not working in this scenario?

like image 601
Tasmotanizer Avatar asked Nov 19 '25 02:11

Tasmotanizer


1 Answers

The reason why your MQTT is not publishing because your trying to connect multiple MQTT threads with same client and you are connecting only one client for all threads.

Solution: For multiprocessing you have to create multiple mqtt clients by that your multiple clients can publish messages parallelly.

from datetime import datetime, time
import paho.mqtt.client as mqtt
import time
import multiprocessing


def on_connect(client, userdata, flags, rc):
    
    if rc == 0:
        global Connected               
        Connected = True               
    
    else:
        print("Connection failed")

def client_conn():
    # MQTT connection 
    cli = "test"+str(datetime.now())
    client = mqtt.Client(cli)
    client.on_connect = on_connect
    
                
    client.connect("mqttServer", "mqttPort")
    client.username_pw_set("mqttUsername", "mqttPassword")
    client.loop_start()
    while Connected != True:  
        time.sleep(0.1)
    return client

def mqtt_publish():
    # Publish on mqtt
    client = client_conn()
    client.publish("topic", "payload")
    


def mul_process():
    # Process creation
    p = multiprocessing.Process(target=mqtt_publish)
    p.start()
    p.join()
    
# Execution starts here
mul_process()
like image 71
Aashish Pahumalani Avatar answered Nov 22 '25 02:11

Aashish Pahumalani



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!