Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to store data mqtt when offline and send them when online

Tags:

android

mqtt

I have a problem, when my connection intrupted, mqtt publish not send at reconnect, how to resolve it? Im following this answer but not working

What I've done :

  • Im already implement a service mqtt to send gps location and work as usual when online.
  • Set Qos to 1.
  • Set ClientId fixed.
  • Set Publish Qos to 1.
  • Set clean session to false

But the result when I reconnect still publish data when I online & not publishing stored persistence data.

Here is my source code:

package id.trustudio.android.mdm.service;

import android.app.Service;
import android.content.Context;
import android.content.Intent;
import android.content.SharedPreferences;
import android.content.pm.ApplicationInfo;
import android.content.pm.PackageManager;
import android.net.TrafficStats;
import android.os.Handler;
import android.os.IBinder;
import android.support.annotation.Nullable;
import android.util.Log;

import org.eclipse.paho.android.service.MqttAndroidClient;
import org.eclipse.paho.client.mqttv3.IMqttActionListener;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.IMqttToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

import java.io.UnsupportedEncodingException;

import id.trustudio.android.mdm.http.DetectConnection;
import id.trustudio.android.mdm.util.Cons;
import id.trustudio.android.mdm.util.Debug;
import id.trustudio.android.mdm.util.GPSTracker;
import id.trustudio.android.mdm.util.GPSTracker2;

public class MqttService extends Service implements MqttCallback {

    public static boolean isStarted = false;

    private double latitude  = 0;
    private double longitude = 0;
    private GPSTracker mGPSTracker;
    private GPSTracker2 mGPSTracker2;

    boolean isInternetPresent = false;

    private SharedPreferences mPrivatePref;
    private SharedPreferences.Editor editor;

    private DetectConnection mDetectConnection;
    String deviceID,Name;
    int totalbyte;
    String packages;
    MemoryPersistence persistence;
    String clientId;
    MqttAndroidClient client;

    @Nullable
    @Override
    public IBinder onBind(Intent intent) {
        return null;
    }

    @Override
    public void onCreate() {
        super.onCreate();

        mPrivatePref = this.getSharedPreferences(Cons.PRIVATE_PREF, Context.MODE_PRIVATE);
        editor = mPrivatePref.edit();

        deviceID = mPrivatePref.getString(Cons.APP_PACKAGE + "deviceid", "");
        Name = mPrivatePref.getString(Cons.APP_PACKAGE + "user", "");

        clientId = MqttClient.generateClientId();
        persistence = new MemoryPersistence();

        client =
                new MqttAndroidClient(getApplicationContext(), "tcp://broker.administrasi.id:1883",
                        clientId, persistence);

        client.setCallback(this);

        try{
            MqttConnectOptions connOpts = new MqttConnectOptions();
            connOpts.setCleanSession(false);
            client.connect(connOpts,null, new IMqttActionListener() {

                        @Override
                        public void onSuccess(IMqttToken asyncActionToken) {

                        }

                        @Override
                        public void onFailure(IMqttToken asyncActionToken, Throwable exception) {

                        }
                    });
        }catch (Exception e){
            e.printStackTrace();
        }

        mHandler.postDelayed(mUpdateTask, 1000);
    }


    public int onStartCommand(Intent intent, int flags, int startId) {

        int res = super.onStartCommand(intent, flags, startId);

        //check if your service is already started
        if (isStarted){      //yes - do nothing
            return Service.START_STICKY;
        } else {             //no
            isStarted = true;
        }

        return Service.START_STICKY;

    }

    private Handler mHandler = new Handler();
    private Runnable mUpdateTask = new Runnable() {
        public void run() {

            getLatLng();
            if (latitude == 0.0 || longitude == 0.0) getLatLngWifi();

                        Debug.e("MQTT","Connect");
                        String topic = "gps/kodeupi/kodeap/kodeup/" + deviceID;
                        Debug.e("MQTT CLIENT", clientId);
                        int qos = 1;
                        try {
                            IMqttToken subToken = client.subscribe(topic, qos);
                            subToken.setActionCallback(new IMqttActionListener() {
                                @Override
                                public void onSuccess(IMqttToken asyncActionToken) {
                                    // The message was published

                                    String topic = "gps/kodeupi/kodeap/kodeup/" + deviceID;
                                    long CurrentTime = System.currentTimeMillis();

                                    String payload = deviceID + "|" + latitude + "|" + longitude + "|" + CurrentTime;

                                    byte[] encodedPayload = new byte[0];
                                    try {
                                        encodedPayload = payload.getBytes("UTF-8");
                                        MqttMessage message = new MqttMessage(encodedPayload);
                                        client.publish(topic, message);
                                        message.setRetained(true);
                                        // set quality of service
                                        message.setQos(1);
                                        Log.d("TAG", "onSuccess");
                                    } catch (UnsupportedEncodingException | MqttException e) {
                                        e.printStackTrace();
                                    }
                                }

                                @Override
                                public void onFailure(IMqttToken asyncActionToken,
                                                      Throwable exception) {
                                    // The subscription could not be performed, maybe the user was not
                                    // authorized to subscribe on the specified topic e.g. using wildcards

                                }
                            });
                        } catch (MqttException e) {
                            e.printStackTrace();
                        }

            mHandler.postDelayed(this, 20000);
        }
    };

    private void getLatLng() {
        mGPSTracker2        = new GPSTracker2(this);
        isInternetPresent   = mDetectConnection.isConnectingToInternet();
        if (isInternetPresent == true) {
            if (mGPSTracker2.canGetLocation()) {
                latitude    = mGPSTracker2.getLatitude();
                longitude   = mGPSTracker2.getLongitude();

                if(latitude != 0.0 && longitude != 0.0) {
                    editor.putString(Cons.APP_LATITUDE, latitude+"");
                    editor.putString(Cons.APP_LONGITUDE, longitude+"");
                    editor.commit();
                }
            } else {
//              getLatLngWifi();
                Debug.i(Cons.TAG, "on gps failed, please check");

            }
        } else {
            Debug.i(Cons.TAG, "no connection");

            if(mGPSTracker2 != null)
                mGPSTracker2.stopUsingGPS();
        }
    }

    private void getLatLngWifi() {
        mGPSTracker         = new GPSTracker(this);
        isInternetPresent   = mDetectConnection.isConnectingToInternet();
        if (isInternetPresent == true) {
            if (mGPSTracker.canGetLocation()) {
                latitude    = mGPSTracker.getLatitude();
                longitude   = mGPSTracker.getLongitude();

                if(latitude != 0.0 && longitude != 0.0) {
                    editor.putString(Cons.APP_LATITUDE, latitude+"");
                    editor.putString(Cons.APP_LONGITUDE, longitude+"");
                    editor.commit();
                }

            } else {
                Debug.i(Cons.TAG, "wifi " + "on gps failed, please check");

            }
        } else {
            Debug.i(Cons.TAG, "wifi " + "no connection");

            if(mGPSTracker != null)
                mGPSTracker.stopUsingGPS();
        }
    }

    @Override
    public void connectionLost(Throwable cause) {

    }

    @Override
    public void messageArrived(String topic, MqttMessage message) throws Exception {

    }

    @Override
    public void deliveryComplete(IMqttDeliveryToken token) {

    }
}

Sorry for my bad English

like image 550
Mochamad Taufik Hidayat Avatar asked Aug 03 '16 07:08

Mochamad Taufik Hidayat


People also ask

How do I keep MQTT connection alive?

At Connection When an MQTT client creates a connection to the MQTT broker, the Keep Alive mechanism can be enabled between the communicating parties by setting the Keep Alive variable header field in the connection request protocol packet to a non-zero value.

Can MQTT work without Internet?

Does MQTT require internet? Yes, to send or receive messages, the MQTT client must establish a TCP connection to the broker. However, MQTT comes with features specifically designed to cope with unstable network connections, like the broker buffering incoming messages for disconnected clients.

Are MQTT messages stored?

The job of an MQTT broker is to filter messages based on topic, and then distribute them to subscribers. There is no direct connection between a publisher and subscriber. All clients can publish (broadcast) and subscribe (receive). MQTT brokers do not normally store messages.


1 Answers

As laid out in the comments.

This is something you have to code yourself, there is no support for storing messages that have not been sent because the client is disconnected in the framework. The MQTT persistence is only used to ensure messages with QOS 1/2 are not lost in flight if the connection to the broker goes down before the QOS handshake is complete.

If you try to publish a message while disconnected the client.publish(topic, message) call will throw an exception, you need to catch this exception and then make arrangements to store the content of the message for when the connection is re-established. Once a connection is back up and running then you need to iterate over the stored details and try the send again.

like image 51
hardillb Avatar answered Sep 23 '22 00:09

hardillb