Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to implement multithreading for a mqtt client that can send and receive messages

I'm setting up a python MQTT client that is supposed to receive messages of a specific topic and publish messages to two different topics. If a message was received the client should send a message with topic 1 to the MQTT broker.

Also the client shall send a message with topic 2 every 2 seconds to the broker.

I think I've to implement multi-threading, right? Here is my code so far:

 #!/usr/bin/env python
import time
import paho.mqtt.client as mqtt
import socket
import json
import requests
from configparser import SafeConfigParser
from threading import Timer

def on_connect(client, userdata, flags, rc):
    print("CONNECTED")
    print("Connected with result code: ", str(rc))
    print("subscribing to topics")
    client.subscribe(mqtt_sub_topics)

def on_message(client, userdata, message):
    print("Data requested")
    client.publish(mqtt_pub_topic_control,json.dumps(msg))

def main():
    print("WAIT for max: ",delay)
    while True:
        time.sleep(delay)
        client.publish(mqtt_pub_topic_state,json.dumps(msg))

### INIT ###
........

### MQTT ###
client = mqtt.Client(hostname)
client.on_connect = on_connect
client.on_message = on_message
client.on_disconnect = on_disconnect
client.connect(mqtt_broker, mqtt_port)
client.loop_start()

### Start MAIN ###
main()

I've several questions regarding my code:

Is there an issue with the implemented time.sleep(delay) delay?
Is it better to use a timer and is it possible to receive multiple messages at the same time?

like image 557
MSchuett Avatar asked Oct 30 '25 21:10

MSchuett


2 Answers

I'll answer the MQTT question as that is reasonably clear.

The MQTT client is single threaded, it will only receive and process one message at a time, if you want to process multiple messages in parallel, you will need to have your own thread pool and use the on_message function to hand off incoming messages to the pool to be processed.

like image 89
hardillb Avatar answered Nov 02 '25 11:11

hardillb


here the code with thread for each the publishing and the subscribing at the same time and to handle each topic separately a callback "client.message_callback_add("topic", your_callbak)"

import threading
import time
import paho.mqtt.client as mqtt

import json
topic="data"
broker="test.mosquitto.org"
port=1883
def on_connect(client, userdata, flags, rc):
    print("CONNECTED")
    print("Connected with result code: ", str(rc))
    client.subscribe("data")
    print("subscribing to topic : "+topic)


def on_message(client, userdata, message):
    print("Data requested "+str(message.payload))


def main():
    print("WAIT for max: ",2)
    while True:
        time.sleep(1)
        client.publish(topic,"dfdfd")

### MQTT ###
client = mqtt.Client()
client.connect(broker, port)
client.on_connect = on_connect

#client.on_disconnect = on_disconnect
def subscribing():
    client.on_message = on_message
    client.loop_forever()
sub=threading.Thread(target=subscribing)
pub=threading.Thread(target=main)

### Start MAIN ###

sub.start()
pub.start()
like image 38
Ayoub Benayache Avatar answered Nov 02 '25 10:11

Ayoub Benayache