Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Opening new connection after Connection Draining. Google Cloud Messaging

I am somewhat new to Google Cloud Messaging. We have been working with it for a couple of months but just recently we have been getting "Connection Draining" messages. When this happens all communication stops.

Google says: https://developer.android.com/google/gcm/ccs.html#response

When you receive a CONNECTION_DRAINING message, you should immediately begin sending messages to another CCS connection, opening a new connection if necessary. You should, however, keep the original connection open and continue receiving messages that may come over the connection (and ACKing them)—CCS will handle initiating a connection close when it is ready.

My question is

  1. If I open a new connection manually, how does it know what connection to use if I don't close the existing connection?
  2. If 6 messages are sent concurrently how do I stop the method from opening 6 connections? Or am I confused on this?
  3. Why does connection draining happen?

I am surprised this isn't already put into play in their example code. It seems like its pretty much everything you need. Is it already done for me in the code and I am missing it?

I don't have a main method in my code, I user servlets as triggers instead. My connection is initailized like this

@PostConstruct
    public void init() throws Exception{
        try {
            smackCcsClient.connect(Long.parseLong(env.getProperty("gcm.api")), env.getProperty("gcm.key"));
        }catch (IOException e ){
            e.printStackTrace();
        }catch(SmackException e){
            e.printStackTrace();
        }catch(XMPPException e){
            e.printStackTrace();
        }
    }

however after this I never touch the connection again. Am I handling this wrong, is the connection something I should be touching more frequently or something I need to keep track of?

_______________________ADDED AFTER THE QUESTION_________________________

I added a connection inside of their example code to try to reinitialize a connection. It looks like this:

if ("CONNECTION_DRAINING".equals(controlType)) {
            connectionDraining = true;
            //Open new connection because old connection will be closing or is already closed.
            try {
                connect(Long.parseLong(env.getProperty("gcm.api")), env.getProperty("gcm.key"));
            } catch (XMPPException e) {
                e.printStackTrace();
            } catch (IOException e) {
                e.printStackTrace();
            } catch (SmackException e) {
                e.printStackTrace();
            }

        } else {
            logger.log(Level.INFO, "Unrecognized control type: %s. This could happen if new features are " + "added to the CCS protocol.",
                    controlType);
        }
like image 753
B Rad Avatar asked Nov 07 '14 03:11

B Rad


2 Answers

I have written a code for handling such cases(basically diverting new downstream messages to a new connection)... not thoroughly tested...

import java.util.Deque;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentLinkedDeque;

import javax.net.ssl.SSLSocketFactory;

import org.jivesoftware.smack.ConnectionConfiguration;
import org.jivesoftware.smack.ConnectionConfiguration.SecurityMode;
import org.jivesoftware.smack.ConnectionListener;
import org.jivesoftware.smack.PacketInterceptor;
import org.jivesoftware.smack.PacketListener;
import org.jivesoftware.smack.SmackException.NotConnectedException;
import org.jivesoftware.smack.XMPPConnection;
import org.jivesoftware.smack.filter.PacketTypeFilter;
import org.jivesoftware.smack.packet.DefaultPacketExtension;
import org.jivesoftware.smack.packet.Message;
import org.jivesoftware.smack.packet.Packet;
import org.jivesoftware.smack.packet.PacketExtension;
import org.jivesoftware.smack.provider.PacketExtensionProvider;
import org.jivesoftware.smack.provider.ProviderManager;
import org.jivesoftware.smack.tcp.XMPPTCPConnection;
import org.jivesoftware.smack.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.xmlpull.v1.XmlPullParser;

import com.fasterxml.jackson.core.type.TypeReference;


/**
 * Based on https://developer.android.com/google/gcm/ccs.html#smack
 * 
 * @author Abhinav.Dwivedi
 *
 */
public class SmackCcsClient implements CcsClient {
    private static final Logger logger = LoggerFactory.getLogger(SmackCcsClient.class);
    private static final String GCM_SERVER = "gcm.googleapis.com";
    private static final int GCM_PORT = 5235;
    private static final String GCM_ELEMENT_NAME = "gcm";
    private static final String GCM_NAMESPACE = "google:mobile:data";
    private static volatile SmackCcsClient instance;
    static {
        ProviderManager.addExtensionProvider(GCM_ELEMENT_NAME, GCM_NAMESPACE, new PacketExtensionProvider() {
            @Override
            public PacketExtension parseExtension(XmlPullParser parser) throws Exception {
                String json = parser.nextText();
                return new GcmPacketExtension(json);
            }
        });
    }
    private final Deque<Channel> channels;

    public static SmackCcsClient instance() {
        if (instance == null) {
            synchronized (SmackCcsClient.class) {
                if (instance == null) {
                    instance = new SmackCcsClient();
                }
            }
        }
        return instance;
    }

    private SmackCcsClient() {
        channels = new ConcurrentLinkedDeque<Channel>();
        channels.addFirst(connect());
    }

    private class Channel {
        private XMPPConnection connection;
        /**
         * Indicates whether the connection is in draining state, which means that it will not accept any new downstream
         * messages.
         */
        private volatile boolean connectionDraining = false;

        /**
         * Sends a packet with contents provided.
         */
        private void send(String jsonRequest) throws NotConnectedException {
            Packet request = new GcmPacketExtension(jsonRequest).toPacket();
            connection.sendPacket(request);
        }

        private void handleControlMessage(Map<String, Object> jsonObject) {
            logger.debug("handleControlMessage(): {}", jsonObject);
            String controlType = (String) jsonObject.get("control_type");
            if ("CONNECTION_DRAINING".equals(controlType)) {
                connectionDraining = true;
            } else {
                logger.info("Unrecognized control type: {}. This could happen if new features are "
                        + "added to the CCS protocol.", controlType);
            }
        }
    }

    /**
     * Sends a downstream message to GCM.
     *
     */
    @Override
    public void sendDownstreamMessage(String message) throws Exception {
        Channel channel = channels.peekFirst();
        if (channel.connectionDraining) {
            synchronized (channels) {
                channel = channels.peekFirst();
                if (channel.connectionDraining) {
                    channels.addFirst(connect());
                    channel = channels.peekFirst();
                }
            }
        }
        channel.send(message);
        logger.debug("Message Sent via CSS: ({})", message);
    }

    /**
     * Handles an upstream data message from a device application.
     *
     */
    protected void handleUpstreamMessage(Map<String, Object> jsonObject) {
        // PackageName of the application that sent this message.
        String category = (String) jsonObject.get("category");
        String from = (String) jsonObject.get("from");
        @SuppressWarnings("unchecked")
        Map<String, String> payload = (Map<String, String>) jsonObject.get("data");
        logger.info("Message received from device: category ({}), from ({}), payload: ({})", category, from,
                JsonUtil.toJson(payload));
    }

    /**
     * Handles an ACK.
     *
     * <p>
     * Logs a INFO message, but subclasses could override it to properly handle ACKs.
     */
    public void handleAckReceipt(Map<String, Object> jsonObject) {
        String messageId = (String) jsonObject.get("message_id");
        String from = (String) jsonObject.get("from");
        logger.debug("handleAckReceipt() from: {}, messageId: {}", from, messageId);
    }

    /**
     * Handles a NACK.
     *
     * <p>
     * Logs a INFO message, but subclasses could override it to properly handle NACKs.
     */
    protected void handleNackReceipt(Map<String, Object> jsonObject) {
        String messageId = (String) jsonObject.get("message_id");
        String from = (String) jsonObject.get("from");
        logger.debug("handleNackReceipt() from: {}, messageId: ", from, messageId);
    }

    /**
     * Creates a JSON encoded ACK message for an upstream message received from an application.
     *
     * @param to
     *            RegistrationId of the device who sent the upstream message.
     * @param messageId
     *            messageId of the upstream message to be acknowledged to CCS.
     * @return JSON encoded ack.
     */
    protected static String createJsonAck(String to, String messageId) {
        Map<String, Object> message = new HashMap<String, Object>();
        message.put("message_type", "ack");
        message.put("to", to);
        message.put("message_id", messageId);
        return JsonUtil.toJson(message);
    }

    /**
     * Connects to GCM Cloud Connection Server using the supplied credentials.
     * 
     * @return
     */
    @Override
    public Channel connect() {
        try {
            Channel channel = new Channel();
            ConnectionConfiguration config = new ConnectionConfiguration(GCM_SERVER, GCM_PORT);
            config.setSecurityMode(SecurityMode.enabled);
            config.setReconnectionAllowed(true);
            config.setRosterLoadedAtLogin(false);
            config.setSendPresence(false);
            config.setSocketFactory(SSLSocketFactory.getDefault());

            channel.connection = new XMPPTCPConnection(config);
            channel.connection.connect();

            channel.connection.addConnectionListener(new LoggingConnectionListener());

            // Handle incoming packets
            channel.connection.addPacketListener(new PacketListener() {
                @Override
                public void processPacket(Packet packet) {
                    logger.debug("Received: ({})", packet.toXML());
                    Message incomingMessage = (Message) packet;
                    GcmPacketExtension gcmPacket = (GcmPacketExtension) incomingMessage.getExtension(GCM_NAMESPACE);
                    String json = gcmPacket.getJson();
                    try {
                        Map<String, Object> jsonObject = JacksonUtil.DEFAULT.mapper().readValue(json,
                                new TypeReference<Map<String, Object>>() {});
                        // present for ack, nack and control, null otherwise
                        Object messageType = jsonObject.get("message_type");
                        if (messageType == null) {
                            // Normal upstream data message
                            handleUpstreamMessage(jsonObject);
                            // Send ACK to CCS
                            String messageId = (String) jsonObject.get("message_id");
                            String from = (String) jsonObject.get("from");
                            String ack = createJsonAck(from, messageId);
                            channel.send(ack);
                        } else if ("ack".equals(messageType.toString())) {
                            // Process Ack
                            handleAckReceipt(jsonObject);
                        } else if ("nack".equals(messageType.toString())) {
                            // Process Nack
                            handleNackReceipt(jsonObject);
                        } else if ("control".equals(messageType.toString())) {
                            // Process control message
                            channel.handleControlMessage(jsonObject);
                        } else {
                            logger.error("Unrecognized message type ({})", messageType.toString());
                        }
                    } catch (Exception e) {
                        logger.error("Failed to process packet ({})", packet.toXML(), e);
                    }
                }
            }, new PacketTypeFilter(Message.class));

            // Log all outgoing packets
            channel.connection.addPacketInterceptor(new PacketInterceptor() {
                @Override
                public void interceptPacket(Packet packet) {
                    logger.debug("Sent: {}", packet.toXML());
                }
            }, new PacketTypeFilter(Message.class));

            channel.connection.login(ExternalConfig.gcmSenderId() + "@gcm.googleapis.com", ExternalConfig.gcmApiKey());
            return channel;
        } catch (Exception e) {
            logger.error(Logging.FATAL, "Error in creating channel for GCM communication", e);
            throw new RuntimeException(e);
        }
    }

    /**
     * XMPP Packet Extension for GCM Cloud Connection Server.
     */
    private static final class GcmPacketExtension extends DefaultPacketExtension {

        private final String json;

        public GcmPacketExtension(String json) {
            super(GCM_ELEMENT_NAME, GCM_NAMESPACE);
            this.json = json;
        }

        public String getJson() {
            return json;
        }

        @Override
        public String toXML() {
            return String.format("<%s xmlns=\"%s\">%s</%s>", GCM_ELEMENT_NAME, GCM_NAMESPACE,
                    StringUtils.escapeForXML(json), GCM_ELEMENT_NAME);
        }

        public Packet toPacket() {
            Message message = new Message();
            message.addExtension(this);
            return message;
        }
    }

    private static final class LoggingConnectionListener implements ConnectionListener {

        @Override
        public void connected(XMPPConnection xmppConnection) {
            logger.info("Connected.");
        }

        @Override
        public void authenticated(XMPPConnection xmppConnection) {
            logger.info("Authenticated.");
        }

        @Override
        public void reconnectionSuccessful() {
            logger.info("Reconnecting..");
        }

        @Override
        public void reconnectionFailed(Exception e) {
            logger.error("Reconnection failed.. ", e);
        }

        @Override
        public void reconnectingIn(int seconds) {
            logger.info("Reconnecting in {} secs", seconds);
        }

        @Override
        public void connectionClosedOnError(Exception e) {
            logger.info("Connection closed on error.");
        }

        @Override
        public void connectionClosed() {
            logger.info("Connection closed.");
        }
    }
}
like image 113
gladiator Avatar answered Oct 16 '22 06:10

gladiator


I'm also new in GCM and facing the same problem...I solved it by creating new SmackCcsClient() on CONNECTION_DRAINING message. Older connection should still exists and receiving messages, but not sending because:

protected volatile boolean connectionDraining = true;

Google says that connection will be closed by CCS:

CCS will handle initiating a connection close when it is ready.

Until connection is closed by CCS you will be able to receive messages from both connections, but able to send messages with just new one. When old connection is closed it should be destroyed, I'm not sure if garbage collector is called or not...trying to solve this issue

P.S.: I'm not 100% sure with this answer, but maybe it will open more space for discussion.

like image 32
cheou Avatar answered Oct 16 '22 04:10

cheou