Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to create simple okhttp3 websocket connection?

Can someone please show me an example on how to establish the connection to the wss:// address with specific Authorization header, using okhttp3 okhttp-ws library?

All I have is the url of WS server and Authorization string token.

Later, I must be able to send request to that connection, listen to upcoming data from WS server and than close connection. I have a difficulties with this new to me WS world, always been working only with REST (with okhttp3 too)

like image 963
Dmitry Glushonkov Avatar asked Dec 03 '22 14:12

Dmitry Glushonkov


2 Answers

So generally this sample is most of what you need

https://github.com/square/okhttp/blob/master/samples/guide/src/main/java/okhttp3/recipes/WebSocketEcho.java

But you will have two changes

  1. Use wss instead of ws in your URL

  2. Call request.addHeader to add your token

    request.addHeader("Authorization", "Bearer " + token)

like image 74
Yuri Schimke Avatar answered Dec 09 '22 15:12

Yuri Schimke


I know this is an old question, but when I try to use websocket with okhttp3 there are a lot of options that I want and it was not in the library. So I create a class that handle WS connection with extra functionalities. I hope it will help some body. Gist link

import android.os.Handler;
import android.os.Looper;
import android.support.annotation.NonNull;
import android.util.Log;

import org.json.JSONException;
import org.json.JSONObject;

import java.net.ProtocolException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;
import okhttp3.WebSocket;
import okhttp3.WebSocketListener;
import okhttp3.internal.ws.RealWebSocket;
import okhttp3.logging.HttpLoggingInterceptor;
import okio.ByteString;

/**
 * Websocket class based on OkHttp3 with {event->data} message format to make your life easier.
 *
 * @author Ali Yusuf
 * @since 3/13/17
 */

public class Socket {

    private final static String TAG = Socket.class.getSimpleName();
    private final static String CLOSE_REASON = "End of session";
    private final static int MAX_COLLISION = 7;

    public final static String EVENT_OPEN = "open";
    public final static String EVENT_RECONNECT_ATTEMPT = "reconnecting";
    public final static String EVENT_CLOSED = "closed";

    /**
     * Main socket states
     */
    public enum State {
        CLOSED, CLOSING, CONNECT_ERROR, RECONNECT_ATTEMPT, RECONNECTING, OPENING, OPEN
    }

    private static HttpLoggingInterceptor logging =
            new HttpLoggingInterceptor()
                    .setLevel(BuildConfig.DEBUG ? HttpLoggingInterceptor.Level.HEADERS : HttpLoggingInterceptor.Level.NONE);

    private static OkHttpClient.Builder httpClient =
            new OkHttpClient.Builder()
                    .addInterceptor(logging);

    public static class Builder {

        private Request.Builder request;

        private Builder(Request.Builder request) {
            this.request = request;
        }

        public static Builder with(@NonNull String url) {
            // Silently replace web socket URLs with HTTP URLs.
            if (!url.regionMatches(true, 0, "ws:", 0, 3) && !url.regionMatches(true, 0, "wss:", 0, 4))
                throw new IllegalArgumentException("web socket url must start with ws or wss, passed url is " + url);

            return new Builder(new Request.Builder().url(url));
        }

        public Builder setPingInterval(long interval, @NonNull TimeUnit unit){
            httpClient.pingInterval(interval, unit);
            return this;
        }

        public Builder addHeader(@NonNull String name, @NonNull String value) {
            request.addHeader(name, value);
            return this;
        }

        public Socket build() {
            return new Socket(request.build());
        }
    }

    /**
     * Websocket state
     */
    private static State state;
    /**
     * Websocket main request
     */
    private static Request request;
    /**
     * Websocket connection
     */
    private static RealWebSocket realWebSocket;
    /**
     * Reconnection post delayed handler
     */
    private static Handler delayedReconnection;
    /**
     * Websocket events listeners
     */
    private static Map<String,OnEventListener> eventListener;
    /**
     * Websocket events new message listeners
     */
    private static Map<String,OnEventResponseListener> eventResponseListener;
    /**
     * Message list tobe send onEvent open {@link State#OPEN} connection state
     */
    private static Map<String,String> onOpenMessageQueue = new HashMap<>();
    /**
     * Websocket state change listener
     */
    private static OnStateChangeListener onChangeStateListener;
    /**
     * Websocket new message listener
     */
    private static OnMessageListener messageListener;
    /**
     * Number of reconnection attempts
     */
    private static int reconnectionAttempts;
    private static boolean skipOnFailure;

    private Socket(Request request) {
        Socket.request = request;
        state = State.CLOSED;
        eventListener = new HashMap<>();
        eventResponseListener = new HashMap<>();
        delayedReconnection = new Handler(Looper.getMainLooper());
        skipOnFailure = false;
    }

    /**
     * Start socket connection if i's not already started
     */
    public Socket connect() {
        if (httpClient == null) {
            throw new IllegalStateException("Make sure to use Socket.Builder before using Socket#connect.");
        }
        if (realWebSocket == null) {
            realWebSocket = (RealWebSocket) httpClient.build().newWebSocket(request, webSocketListener);
            changeState(State.OPENING);
        } else if (state == State.CLOSED) {
            realWebSocket.connect(httpClient.build());
            changeState(State.OPENING);
        }
        return this;
    }

    /**
     * Set listener which fired every time message received with contained data.
     *
     * @param listener message on arrive listener
     */
    public Socket onEvent(@NonNull String event, @NonNull OnEventListener listener){
        eventListener.put(event,listener);
        return this;
    }

    /**
     * Set listener which fired every time message received with contained data.
     *
     * @param listener message on arrive listener
     */
    public Socket onEventResponse(@NonNull String event, @NonNull OnEventResponseListener listener){
        eventResponseListener.put(event,listener);
        return this;
    }

    /**
     * Send message in {event->data} format
     *
     * @param event event name that you want sent message to
     * @param data message data in JSON format
     * @return true if the message send/on socket send quest; false otherwise
     */
    public boolean send(@NonNull String event, @NonNull String data){
        try {
            JSONObject text = new JSONObject();
            text.put("event", event);
            text.put("data", new JSONObject(data));
            Log.v(TAG,"Try to send data "+text.toString());
            return realWebSocket.send(text.toString());
        } catch (JSONException e) {
            Log.e(TAG,"Try to send data with wrong JSON format, data: "+data);
        }
        return false;
    }

    /**
     * Set state listener which fired every time {@link Socket#state} changed.
     *
     * @param listener state change listener
     */
    public Socket setOnChangeStateListener(@NonNull OnStateChangeListener listener) {
        onChangeStateListener = listener;
        return this;
    }

    /**
     * Message listener will be called in any message received even if it's not
     * in a {event -> data} format.
     *
     * @param listener message listener
     */
    public Socket setMessageListener(@NonNull OnMessageListener listener) {
        messageListener = listener;
        return this;
    }

    public void removeEventListener(@NonNull String event) {
        eventListener.remove(event);
        onOpenMessageQueue.remove(event);
    }

    /**
     * Clear all socket listeners in one line
     */
    public void clearListeners() {
        eventListener.clear();
        messageListener = null;
        onChangeStateListener = null;
    }

    /**
     * Send normal close request to the host
     */
    public void close() {
        if (realWebSocket != null) {
            realWebSocket.close(1000, CLOSE_REASON);
        }
    }

    /**
     * Send close request to the host
     */
    public void close(int code, @NonNull String reason) {
        if (realWebSocket != null) {
            realWebSocket.close(code, reason);
        }
    }

    /**
     * Terminate the socket connection permanently
     */
    public void terminate() {
        skipOnFailure = true; // skip onFailure callback
        if (realWebSocket != null) {
            realWebSocket.cancel(); // close connection
            realWebSocket = null; // clear socket object
        }
    }

    /**
     * Add message in a queue if the socket not open and send them
     * if the socket opened
     *
     * @param event event name that you want sent message to
     * @param data message data in JSON format
     */
    public void sendOnOpen(@NonNull String event, @NonNull String data) {
        if (state != State.OPEN)
            onOpenMessageQueue.put(event,data);
        else
            send(event,data);
    }

    /**
     * Retrieve current socket connection state {@link State}
     */
    public State getState() {
        return state;
    }

    /**
     * Change current state and call listener method with new state
     * {@link OnStateChangeListener#onChange(Socket, State)}
     * @param newState new state
     */
    private void changeState(State newState) {
        state = newState;
        if (onChangeStateListener != null) {
            onChangeStateListener.onChange(Socket.this, state);
        }
    }

    /**
     * Try to reconnect to the websocket after delay time using <i>Exponential backoff</i> method.
     * @see <a href="https://en.wikipedia.org/wiki/Exponential_backoff"></a>
     */
    private void reconnect() {
        if (state != State.CONNECT_ERROR) // connection not closed !!
            return;

        changeState(State.RECONNECT_ATTEMPT);

        if (realWebSocket != null) {
            // Cancel websocket connection
            realWebSocket.cancel();
            // Clear websocket object
            realWebSocket = null;
        }

        if (eventListener.get(EVENT_RECONNECT_ATTEMPT) != null) {
            eventListener.get(EVENT_RECONNECT_ATTEMPT).onMessage(Socket.this, EVENT_RECONNECT_ATTEMPT);
        }

        // Calculate delay time
        int collision = reconnectionAttempts > MAX_COLLISION ? MAX_COLLISION : reconnectionAttempts;
        long delayTime = Math.round((Math.pow(2, collision)-1)/2) * 1000;

        // Remove any pending posts of callbacks
        delayedReconnection.removeCallbacksAndMessages(null);
        // Start new post delay
        delayedReconnection.postDelayed(new Runnable() {
            @Override
            public void run() {
                changeState(State.RECONNECTING);
                reconnectionAttempts++; // Increment connections attempts
                connect(); // Establish new connection
            }
        }, delayTime);
    }

    private WebSocketListener webSocketListener = new WebSocketListener() {
        @Override
        public void onOpen(WebSocket webSocket, Response response) {
            Log.v(TAG,"Socket has been opened successfully.");
            // reset connections attempts counter
            reconnectionAttempts = 0;

            // fire open event listener
            if (eventListener.get(EVENT_OPEN) != null) {
                eventListener.get(EVENT_OPEN).onMessage(Socket.this, EVENT_OPEN);
            }

            // Send data in queue
            for (String event : onOpenMessageQueue.keySet()) {
                send(event, onOpenMessageQueue.get(event));
            }
            // clear queue
            onOpenMessageQueue.clear();

            changeState(State.OPEN);
        }

        /**
         * Accept only Json data with format:
         * <b> {"event":"event name","data":{some data ...}} </b>
         */
        @Override
        public void onMessage(WebSocket webSocket, String text) {
            // print received message in log
            Log.v(TAG, "New Message received "+text);

            // call message listener
            if (messageListener != null)
                messageListener.onMessage(Socket.this, text);

            try {
                // Parse message text
                JSONObject response = new JSONObject(text);
                String event = response.getString("event");
                JSONObject data = response.getJSONObject("data");

                // call event listener with received data
                if (eventResponseListener.get(event) != null) {
                    eventResponseListener.get(event).onMessage(Socket.this, event, data);
                }
                // call event listener
                if (eventListener.get(event) != null) {
                    eventListener.get(event).onMessage(Socket.this, event);
                }
            } catch (JSONException e) {
                // Message text not in JSON format or don't have {event}|{data} object
                Log.e(TAG, "Unknown message format.");
            }
        }

        @Override
        public void onMessage(WebSocket webSocket, ByteString bytes) {
            // TODO: some action
        }

        @Override
        public void onClosing(WebSocket webSocket, int code, String reason) {
            Log.v(TAG,"Close request from server with reason '"+reason+"'");
            changeState(State.CLOSING);
            webSocket.close(1000,reason);
        }

        @Override
        public void onClosed(WebSocket webSocket, int code, String reason) {
            Log.v(TAG,"Socket connection closed with reason '"+reason+"'");
            changeState(State.CLOSED);
            if (eventListener.get(EVENT_CLOSED) != null) {
                eventListener.get(EVENT_CLOSED).onMessage(Socket.this, EVENT_CLOSED);
            }
        }

        /**
         * This method call if:
         * - Fail to verify websocket GET request  => Throwable {@link ProtocolException}
         * - Can't establish websocket connection after upgrade GET request => response null, Throwable {@link Exception}
         * - First GET request had been failed => response null, Throwable {@link java.io.IOException}
         * - Fail to send Ping => response null, Throwable {@link java.io.IOException}
         * - Fail to send data frame => response null, Throwable {@link java.io.IOException}
         * - Fail to read data frame => response null, Throwable {@link java.io.IOException}
         */
        @Override
        public void onFailure(WebSocket webSocket, Throwable t, Response response) {
            if (!skipOnFailure) {
                skipOnFailure = false; // reset flag
                Log.v(TAG, "Socket connection fail, try to reconnect. (" + reconnectionAttempts + ")");
                changeState(State.CONNECT_ERROR);
                reconnect();
            }
        }
    };

    public abstract static class OnMessageListener {
        public abstract void onMessage (String data);

        /**
         * Method called from socket to execute listener implemented in
         * {@link #onMessage(String)} on main thread
         *
         * @param socket Socket that receive the message
         * @param data Data string received
         */
        private void onMessage (Socket socket, final String data) {
            new Handler(Looper.getMainLooper()).post(new Runnable() {
                @Override
                public void run() {
                    onMessage(data);
                }
            });
        }
    }

    public abstract static class OnEventListener {
        public abstract void onMessage (String event);

        private void onMessage (Socket socket, final String event) {
            new Handler(Looper.getMainLooper()).post(new Runnable() {
                @Override
                public void run() {
                    onMessage(event);
                }
            });
        }
    }

    public abstract static class OnEventResponseListener extends OnEventListener {
        /**
         * Method need to override in listener usage
         */
        public abstract void onMessage (String event, String data);

        /**
         * Just override the inherited method
         */
        @Override
        public void onMessage(String event) {}

        /**
         * Method called from socket to execute listener implemented in
         * {@link #onMessage(String, String)} on main thread
         *
         * @param socket Socket that receive the message
         * @param event Message received event
         * @param data Data received in the message
         */
        private void onMessage (Socket socket, final String event, final JSONObject data) {
            new Handler(Looper.getMainLooper()).post(new Runnable() {
                @Override
                public void run() {
                    onMessage(event, data.toString());
                    onMessage(event);
                }
            });
        }
    }

    public abstract static class OnStateChangeListener {
        /**
         * Method need to override in listener usage
         */
        public abstract void onChange (State status);

        /**
         * Method called from socket to execute listener implemented in
         * {@link #onChange(State)} on main thread
         *
         * @param socket Socket that receive the message
         * @param status new status
         */
        private void onChange (Socket socket, final State status){
            new Handler(Looper.getMainLooper()).post(new Runnable() {
                @Override
                public void run() {
                    onChange(status);
                }
            });
        }
    }

}

Usage example:

Socket socket = Socket.Builder.with(WEBSOCKET_BASE_URL).build().connect();
socket.onEvent(Socket.EVENT_OPEN, socketOpenListener);
socket.onEvent(Socket.EVENT_RECONNECT_ATTEMPT, .....);
socket.onEvent(Socket.EVENT_CLOSED, .....);
socket.onEventResponse("Some event", socketPairListener);
socket.send("Some event", "{"some data":"in JSON format"}");
socket.sendOnOpen("Some event", "{"some data":"in JSON format"}");
like image 34
Ali Yusuf Avatar answered Dec 09 '22 15:12

Ali Yusuf