Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to check if subscriber is valid to accept the message received for a published topic on MQTT

Tags:

java

mqtt

paho

I'm newbie in MQTT.

I am implementing MQTT in java and I am using below code for publisher to publish to a particualr topic,

    public void publish()
        {
            MqttClient myClient = null;
            MqttConnectOptions connOpt;
            try {
                //  Subscription with Brokers
                connOpt = new MqttConnectOptions();
                connOpt.setAutomaticReconnect(true);
                connOpt.setCleanSession(true);//if your not setting cleanSession to false then subscriptions shouldn't be persisted.

                String clientID = UUID.randomUUID().toString().replace("-", "");
                System.out.println("clientID " + clientID);

                myClient = new MqttClient("tcp://192.168.10.500:1883", clientID);
                myClient.connect(connOpt);
                String myTopic = "Device1";
                MqttTopic topic = myClient.getTopic(myTopic);

                int pubQoS = 0;
                MqttMessage message = new MqttMessage("mqttMessage".getBytes());
                message.setQos(pubQoS);
                message.setRetained(false);

                MqttDeliveryToken token = null;
                token = topic.publish(message);
                System.out.println("publish successful with the message :: " + message);
                // Wait until the message has been delivered to the broker
                token.waitForCompletion();
            } catch (MqttException me) {
            } catch (Exception e) {
            }
        }

And then I am using below code to read the published message for a particular topic as a subscriber,

public void subscribe()
{
    try {
        MqttConnectOptions connOpt;

        //  Subscription with mqttBrokerEndPoint
        connOpt = new MqttConnectOptions();
        connOpt.setAutomaticReconnect(true);
        connOpt.setCleanSession(true);//if your not setting cleanSession to false then subscriptions shouldn't be persisted.

        String clientID = UUID.randomUUID().toString().replace("-", "");
        System.out.println("clientID " + clientID);

        MqttSubscriber mqttConnection = new MqttSubscriber();
        myClient = new MqttClient("tcp://192.168.10.500:1883" clientID);
        myClient.setCallback(mqttConnection);
        myClient.connect(connOpt);
        myClient.subscribe("Device1");

    } catch (MqttException e) {
    }
}

@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
    try {
        System.out.println(message);
        boolean isValidClient = true;// Here i need to check if this is the valid subscriber for the message publised on topic "Device1"

        //if(isValidClient) {

            if(message != null) {
                System.out.println("message" + message.toString());
            }

            myClient.unsubscribe("Device1");
            myClient.disconnect();
    //}
  }
    catch(Exception e){}
}

The above implementation is working fine as it is.

Since I am very new to mqtt I have some doubts with the above implementation.

1) Is Client id in both publisher and subscriber should be SAME for one particular flow ?

or should different in both publisher and subscriber as above : which can be randomly generated?

    String clientID = UUID.randomUUID().toString().replace("-", "");

This randomly generated clientID is working fine with both subscribing and publishing.

But, If I use same client for both publisher and subscriber and validate subscriber?

I mean use "clientID" myClient = new MqttClient(mqttBrokerEndPoint, "clientID"); in subsriber and then same "clientID" myClient = new MqttClient(mqttBrokerEndPoint, "clientID"); in publisher

I am getting the below socket error in mqtt broker console(used windows version) ,

    1549414715: Socket error on client 82, disconnecting.
    1549414715: New client connected from 192.168.10.500 as clientID (c1, k60).
    1549414715: No will message specified.
    1549414715: Sending CONNACK to 82 (0, 0)
    1549414716: New connection from 192.168.10.500 on port 1883.
    1549414716: Client 82 already connected, closing old connection.
    1549414716: Socket error on client 82, disconnecting.
    1549414716: New client connected from 192.168.10.500 as clientID (c1, k60).
    1549414716: No will message specified.
    1549414716: Sending CONNACK to 82 (0, 0)
    1549414716: New connection from 192.168.10.500 on port 1883.
    1549414716: Client 82 already connected, closing old connection.
    1549414716: Socket error on client 82, disconnecting.
    1549414716: New client connected from 192.168.10.500 as clientID (c1, k60).
    1549414716: No will message specified.
    1549414716: Sending CONNACK to 82 (0, 0)

and the above program is not working.

Can't we use same clientID for for both subscriber and publisher? Why is it resulting in socket error and programs not working?

2) Is username and password mandatory while implementation? Or can we establish a connection without those 2 below properties?

            connOpt.setUserName(USERNAME);
            connOpt.setPassword(PASSWORD.toCharArray());

3) Is pubQoS is mandatory to be used for publisher? I am currently using it as zero '0' ?

        MqttMessage message = new MqttMessage(mqttMessage.getBytes());
        message.setQos(0);
        message.setRetained(false);

Also is retained attribute is mandatory for publisher?

4) Is these 2 below attributes mandatory while subscribing? I am using as below in code.

        connOpt.setAutomaticReconnect(true);
        connOpt.setCleanSession(true);//if your not setting cleanSession to 

false then subscriptions shouldn't be persisted.

5) Also, Once Message is received from MQTT publisher in to MessageArrived callback as below, how to validate if this is valid subscriber and proceed with further logic?

@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
    try {
        System.out.println(message);
        boolean isValidClient = true;// Here i need to check if this is the valid subscriber for the message publised on topic "Device1"
        //if valid subscriber only i need to read message publised on the topic "Device1"
        **//if(isValidClient) {**

            if(message != null) {
                System.out.println("message" + message.toString());
            }

            myClient.unsubscribe("Device1");
            myClient.disconnect();
    //}
  }
    catch(Exception e){}

I mean which attribute MQTT API to check that this message is only sent for this subscriber and he can proceed further to use the message received in message arrived callback?

I.e, which attribute of MQTT api can be used to check if the received message is related to current PROCESS/STEP.(In SUbscriber Program as below)

IS the topic is the only common attributes between subscriber and publisher to validate subscriber in messageArrived callback ? or do we have any other common attribute to check a valid contract between subscriber and publisher?

Or should we used clientID to validate subscriber? But if i use the same clientID for subscriber and publisher I am getting the socket error which i have mentioned in point number 1.

How to proceed further on this?

like image 764
user1660325 Avatar asked Feb 06 '19 03:02

user1660325


People also ask

Which MQTT topic can be subscribed to in order to receive messages published on any topic?

In MQTT the process of sending messages is called publishing, and to receive messages an MQTT client must subscribe to an MQTT topic.

How do I receive messages from MQTT?

Messages are received by the on_message callback, and so this callback must be defined and bound in the main script. Message is an object and the payload property contains the message data which is binary data.

How does a publish and subscribe system work in MQTT?

MQTT Publish/Subscribe Pattern In the client-sever model, a client communicates directly with an endpoint. The pub/sub model decouples the client that sends a message (the publisher) from the client or clients that receive the messages (the subscribers). The publishers and subscribers never contact each other directly.

Which among these are used to retain all message for a session in MQTT connect message features?

A retained message is a normal MQTT message with the retained flag set to true. The broker stores the last retained message and the corresponding QoS for that topic.


1 Answers

1) You cannot use same client id for two mqtt client. Client ids for mqtt must be unique. You can create different topic for handling different operations. You can refer to the below answer

Two paho.mqtt clients subscribing to the same client localy

2) Username and password is optional. If your mqtt server is configured to use username and password then your client also need to pass username and password to mqtt server. Refer to the below link for understanding the code usage

How can i connect a Java mqtt client with username and password to an emqttd(EMQ) broker?

If you want advanced level go for tls/ssl security

https://dzone.com/articles/secure-communication-with-tls-and-the-mosquitto-broker

3) QOS - The Quality of Service (QoS) level is an agreement between the sender of a message and the receiver of a message that defines the guarantee of delivery for a specific message. Refer to the below link for more information on qos

https://www.hivemq.com/blog/mqtt-essentials-part-6-mqtt-quality-of-service-levels/

4) Auto Reconnect (True) - If mqtt server is disconnected during run time then client tries to reconnect to the server.

Clean Session (True) - On reconnecting to the mqtt server all older sessions associated with same client id will be deleted.

Clean Session (False) - On reconnecting to the mqtt server all older sessions associated with same client id will be retained. Mqtt server in some cases retains messages based on QOS level.

5) Try to create multiple topics for multiple operations. In message arrived method you can check on which topic message has arrived. Then you can call different methods based on that. Below codes are one of the way. You can find out best way that suits your needs. Cheers !!!!!

@Override
public void deliveryComplete(IMqttDeliveryToken token) 
{
    try 
    {
        if(token.getTopics()[0].equals("TOPIC_A"))
        {
            //Do something
        }   
        else if(token.getTopics()[0].equals("TOPIC_B"))
        {
            //Do something
        }
    } 
    catch (Exception e) 
    {
        MQTTLogger.logSevere(e);
    }
}

Json Format for data

First message

{
  "client-name":"client1"
  "data":""
}

Second message

{
  "client-name":"client2"
  "data":""
}
like image 84
Santosh Balaji Avatar answered Oct 22 '22 11:10

Santosh Balaji