I'm setting up a python MQTT client that is supposed to receive messages of a specific topic and publish messages to two different topics. If a message was received the client should send a message with topic 1 to the MQTT broker.
Also the client shall send a message with topic 2 every 2 seconds to the broker.
I think I've to implement multi-threading, right? Here is my code so far:
#!/usr/bin/env python
import time
import paho.mqtt.client as mqtt
import socket
import json
import requests
from configparser import SafeConfigParser
from threading import Timer
def on_connect(client, userdata, flags, rc):
print("CONNECTED")
print("Connected with result code: ", str(rc))
print("subscribing to topics")
client.subscribe(mqtt_sub_topics)
def on_message(client, userdata, message):
print("Data requested")
client.publish(mqtt_pub_topic_control,json.dumps(msg))
def main():
print("WAIT for max: ",delay)
while True:
time.sleep(delay)
client.publish(mqtt_pub_topic_state,json.dumps(msg))
### INIT ###
........
### MQTT ###
client = mqtt.Client(hostname)
client.on_connect = on_connect
client.on_message = on_message
client.on_disconnect = on_disconnect
client.connect(mqtt_broker, mqtt_port)
client.loop_start()
### Start MAIN ###
main()
I've several questions regarding my code:
Is there an issue with the implemented time.sleep(delay) delay?
Is it better to use a timer and is it possible to receive multiple messages at the same time?
I'll answer the MQTT question as that is reasonably clear.
The MQTT client is single threaded, it will only receive and process one message at a time, if you want to process multiple messages in parallel, you will need to have your own thread pool and use the on_message function to hand off incoming messages to the pool to be processed.
here the code with thread for each the publishing and the subscribing at the same time and to handle each topic separately a callback "client.message_callback_add("topic", your_callbak)"
import threading
import time
import paho.mqtt.client as mqtt
import json
topic="data"
broker="test.mosquitto.org"
port=1883
def on_connect(client, userdata, flags, rc):
print("CONNECTED")
print("Connected with result code: ", str(rc))
client.subscribe("data")
print("subscribing to topic : "+topic)
def on_message(client, userdata, message):
print("Data requested "+str(message.payload))
def main():
print("WAIT for max: ",2)
while True:
time.sleep(1)
client.publish(topic,"dfdfd")
### MQTT ###
client = mqtt.Client()
client.connect(broker, port)
client.on_connect = on_connect
#client.on_disconnect = on_disconnect
def subscribing():
client.on_message = on_message
client.loop_forever()
sub=threading.Thread(target=subscribing)
pub=threading.Thread(target=main)
### Start MAIN ###
sub.start()
pub.start()
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With