I have a problem, when my connection intrupted, mqtt publish not send at reconnect, how to resolve it? Im following this answer but not working
What I've done :
But the result when I reconnect still publish data when I online & not publishing stored persistence data.
Here is my source code:
package id.trustudio.android.mdm.service;
import android.app.Service;
import android.content.Context;
import android.content.Intent;
import android.content.SharedPreferences;
import android.content.pm.ApplicationInfo;
import android.content.pm.PackageManager;
import android.net.TrafficStats;
import android.os.Handler;
import android.os.IBinder;
import android.support.annotation.Nullable;
import android.util.Log;
import org.eclipse.paho.android.service.MqttAndroidClient;
import org.eclipse.paho.client.mqttv3.IMqttActionListener;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.IMqttToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import java.io.UnsupportedEncodingException;
import id.trustudio.android.mdm.http.DetectConnection;
import id.trustudio.android.mdm.util.Cons;
import id.trustudio.android.mdm.util.Debug;
import id.trustudio.android.mdm.util.GPSTracker;
import id.trustudio.android.mdm.util.GPSTracker2;
public class MqttService extends Service implements MqttCallback {
public static boolean isStarted = false;
private double latitude = 0;
private double longitude = 0;
private GPSTracker mGPSTracker;
private GPSTracker2 mGPSTracker2;
boolean isInternetPresent = false;
private SharedPreferences mPrivatePref;
private SharedPreferences.Editor editor;
private DetectConnection mDetectConnection;
String deviceID,Name;
int totalbyte;
String packages;
MemoryPersistence persistence;
String clientId;
MqttAndroidClient client;
@Nullable
@Override
public IBinder onBind(Intent intent) {
return null;
}
@Override
public void onCreate() {
super.onCreate();
mPrivatePref = this.getSharedPreferences(Cons.PRIVATE_PREF, Context.MODE_PRIVATE);
editor = mPrivatePref.edit();
deviceID = mPrivatePref.getString(Cons.APP_PACKAGE + "deviceid", "");
Name = mPrivatePref.getString(Cons.APP_PACKAGE + "user", "");
clientId = MqttClient.generateClientId();
persistence = new MemoryPersistence();
client =
new MqttAndroidClient(getApplicationContext(), "tcp://broker.administrasi.id:1883",
clientId, persistence);
client.setCallback(this);
try{
MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setCleanSession(false);
client.connect(connOpts,null, new IMqttActionListener() {
@Override
public void onSuccess(IMqttToken asyncActionToken) {
}
@Override
public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
}
});
}catch (Exception e){
e.printStackTrace();
}
mHandler.postDelayed(mUpdateTask, 1000);
}
public int onStartCommand(Intent intent, int flags, int startId) {
int res = super.onStartCommand(intent, flags, startId);
//check if your service is already started
if (isStarted){ //yes - do nothing
return Service.START_STICKY;
} else { //no
isStarted = true;
}
return Service.START_STICKY;
}
private Handler mHandler = new Handler();
private Runnable mUpdateTask = new Runnable() {
public void run() {
getLatLng();
if (latitude == 0.0 || longitude == 0.0) getLatLngWifi();
Debug.e("MQTT","Connect");
String topic = "gps/kodeupi/kodeap/kodeup/" + deviceID;
Debug.e("MQTT CLIENT", clientId);
int qos = 1;
try {
IMqttToken subToken = client.subscribe(topic, qos);
subToken.setActionCallback(new IMqttActionListener() {
@Override
public void onSuccess(IMqttToken asyncActionToken) {
// The message was published
String topic = "gps/kodeupi/kodeap/kodeup/" + deviceID;
long CurrentTime = System.currentTimeMillis();
String payload = deviceID + "|" + latitude + "|" + longitude + "|" + CurrentTime;
byte[] encodedPayload = new byte[0];
try {
encodedPayload = payload.getBytes("UTF-8");
MqttMessage message = new MqttMessage(encodedPayload);
client.publish(topic, message);
message.setRetained(true);
// set quality of service
message.setQos(1);
Log.d("TAG", "onSuccess");
} catch (UnsupportedEncodingException | MqttException e) {
e.printStackTrace();
}
}
@Override
public void onFailure(IMqttToken asyncActionToken,
Throwable exception) {
// The subscription could not be performed, maybe the user was not
// authorized to subscribe on the specified topic e.g. using wildcards
}
});
} catch (MqttException e) {
e.printStackTrace();
}
mHandler.postDelayed(this, 20000);
}
};
private void getLatLng() {
mGPSTracker2 = new GPSTracker2(this);
isInternetPresent = mDetectConnection.isConnectingToInternet();
if (isInternetPresent == true) {
if (mGPSTracker2.canGetLocation()) {
latitude = mGPSTracker2.getLatitude();
longitude = mGPSTracker2.getLongitude();
if(latitude != 0.0 && longitude != 0.0) {
editor.putString(Cons.APP_LATITUDE, latitude+"");
editor.putString(Cons.APP_LONGITUDE, longitude+"");
editor.commit();
}
} else {
// getLatLngWifi();
Debug.i(Cons.TAG, "on gps failed, please check");
}
} else {
Debug.i(Cons.TAG, "no connection");
if(mGPSTracker2 != null)
mGPSTracker2.stopUsingGPS();
}
}
private void getLatLngWifi() {
mGPSTracker = new GPSTracker(this);
isInternetPresent = mDetectConnection.isConnectingToInternet();
if (isInternetPresent == true) {
if (mGPSTracker.canGetLocation()) {
latitude = mGPSTracker.getLatitude();
longitude = mGPSTracker.getLongitude();
if(latitude != 0.0 && longitude != 0.0) {
editor.putString(Cons.APP_LATITUDE, latitude+"");
editor.putString(Cons.APP_LONGITUDE, longitude+"");
editor.commit();
}
} else {
Debug.i(Cons.TAG, "wifi " + "on gps failed, please check");
}
} else {
Debug.i(Cons.TAG, "wifi " + "no connection");
if(mGPSTracker != null)
mGPSTracker.stopUsingGPS();
}
}
@Override
public void connectionLost(Throwable cause) {
}
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
}
}
Sorry for my bad English
At Connection When an MQTT client creates a connection to the MQTT broker, the Keep Alive mechanism can be enabled between the communicating parties by setting the Keep Alive variable header field in the connection request protocol packet to a non-zero value.
Does MQTT require internet? Yes, to send or receive messages, the MQTT client must establish a TCP connection to the broker. However, MQTT comes with features specifically designed to cope with unstable network connections, like the broker buffering incoming messages for disconnected clients.
The job of an MQTT broker is to filter messages based on topic, and then distribute them to subscribers. There is no direct connection between a publisher and subscriber. All clients can publish (broadcast) and subscribe (receive). MQTT brokers do not normally store messages.
As laid out in the comments.
This is something you have to code yourself, there is no support for storing messages that have not been sent because the client is disconnected in the framework. The MQTT persistence is only used to ensure messages with QOS 1/2 are not lost in flight if the connection to the broker goes down before the QOS handshake is complete.
If you try to publish a message while disconnected the client.publish(topic, message)
call will throw an exception, you need to catch this exception and then make arrangements to store the content of the message for when the connection is re-established. Once a connection is back up and running then you need to iterate over the stored details and try the send again.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With