Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

RxJava + Websocket - How to add Observable to Websocket listener?

I have a ViewModel that is observing a RxJava Observable in my MainRepo class. I am trying to get my WebsocketListener in the MainRepo class to emit events, but I'm unsure how to do so.

MainRepo class:

private WebSocket ws;

public void createWsConnection() {
        Request request = new Request.Builder()
                .url(Constants.WEBSOCKET_ENDPOINT)
                .addHeader(Constants.WEBSOCKET_HEADERS_KEY, Constants.USER_ID)
                .build();

        OkHttpClient client = new OkHttpClient
                .Builder()
                .pingInterval(30, TimeUnit.SECONDS)
                .build();

        this.ws = client.newWebSocket(request, webSocketListener);
    }

This is where I'm confused. I don't know how I would use the websocket with the RxJava observable.

public Observable<String> createListener(){
        return Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<String> emitter) {
                 //I don't know what to put here in order to emit messages
                 //back to my ViewModel class using the websocket listener
            }
        });
    }

The websocket listener:

 private WebSocketListener webSocketListener = new WebSocketListener() {

        @Override
        public void onOpen(@NotNull WebSocket webSocket, Response response) {
            Timber.d("Ws connection opened...", response.toString());
        }

        @Override
        public void onClosing(@NotNull WebSocket webSocket, int code, @NotNull String reason) {
            Timber.d("Ws connection closing...");
        }

        @Override
        public void onClosed(@NotNull WebSocket webSocket, int code, @NotNull String reason) {
            Timber.d("Ws connection closed...");
        }

        @Override
        public void onMessage(@NotNull WebSocket webSocket, @NotNull String text) {
            Timber.d("Ws incoming message.");

        }

        @Override
        public void onFailure(@NotNull WebSocket webSocket, @NotNull Throwable t, Response response) {
            Timber.e(t, "Ws connection failure.", response.toString());

        }
    };

A function in the ViewModel class that is observing the Observable in my MainRepo class:

public void connectToWs(){
        mainRepo.createListener()
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(@NonNull Disposable d) {
                Timber.d("Subscribed");
            }

            @Override
            public void onNext(@NonNull String s) {
                Timber.d("Message: " + s);
            }

            @Override
            public void onError(@NonNull Throwable e) {
                Timber.e(e, "Something went wrong.");
            }

            @Override
            public void onComplete() {
                Timber.d("On complete.");
            }
        });
    }
like image 513
DIRTY DAVE Avatar asked Jun 01 '21 04:06

DIRTY DAVE


People also ask

Is it possible to simplify WebSockets in RxJS?

Simplifying WebSockets in RxJS. Using Redux-Observable for complex pipelines. I’m an avid user of Redux-Observable, and while it masks a lot of the difficulties in using RxJS, you still need a deeper understanding of RxJS to handle really complex use cases.

What is an observable in RxJava?

What is an Observable? In RxJava, Observable s are the source that emits data to the Observers. We can understand observable s as suppliers — they process and supply data to other components. It does some work and emits some values. The following are the different types of Observable s in RxJava

How do I send a WebSocket message to Redux and RxJS?

When a webSocketConnection$ message comes in, call the receivedWebSocketMessage action creator to format and make that message available to all of Redux and Redux-Observable. The one most-impressive operators in RxJS is startWith. I’ve used this operator in so many crazy ways.

How to detect failure of a WebSocket subject?

We can use the catch operator on the webSocketSubject to detect the 1st case and for the 2nd case, we can listen to the closeObserver subject that we provided while creating the WebSocket subject. We can update our connectEpic and connectedEpic like below to detect failures.


Video Answer


2 Answers

Create a PublishSubject and change your createListener method to return it:

private PublishSubject<String> publishSubject = PublishSubject.create<String>();

public Observable<String> createListener(){
    return publishSubject;
}

PublishSubject is an Observable so notice that you don't need to change your method signature, but I'd suggest you to rename the method name to something like observeMessages.

Then in your websocket listener you can emit the messages to the PublishSubject with onNext method. You should also call onComplete in the onClosed method and onError in the onFailure method:

 private WebSocketListener webSocketListener = new WebSocketListener() {

        @Override
        public void onOpen(@NotNull WebSocket webSocket, Response response) {
            Timber.d("Ws connection opened...", response.toString());
        }

        @Override
        public void onClosing(@NotNull WebSocket webSocket, int code, @NotNull String reason) {
            Timber.d("Ws connection closing...");
        }

        @Override
        public void onClosed(@NotNull WebSocket webSocket, int code, @NotNull String reason) {
            Timber.d("Ws connection closed...");

            publishSubject.onComplete();
        }

        @Override
        public void onMessage(@NotNull WebSocket webSocket, @NotNull String text) {
            Timber.d("Ws incoming message.");

            publishSubject.onNext(text);
        }

        @Override
        public void onFailure(@NotNull WebSocket webSocket, @NotNull Throwable t, Response response) {
            Timber.e(t, "Ws connection failure.", response.toString());

            publishSubject.onError(t);
        }
    };
like image 166
Gustavo Avatar answered Oct 19 '22 13:10

Gustavo


PublishSubject is the solution like Gustavo posted. But createListener() looks strange and i just want to show how i do it.

I'm not using OkHttp, but i do exactly same things with nv-websocket-client. It's just another websocket client.

In my scenario i do a lot of reactive stuff before, but the socket flow is:

  • connect to socket
  • register OnTextMessage listener - it should provides message to me
  • send messages to get permament answers from socket

Somewhere in OnCreate / OnCreateView:

// Creating socket with credentials
WebSocketFactory factory = new WebSocketFactory();

try {
    socket = factory.createSocket("wss://ws.example.com", 3000);
} catch (IOException e) {
    e.printStackTrace();
}

// Create a subject
PublishSubject<String> subject = PublishSubject.create();

I have done some HTTP/GET work before and save results in List<String> symbols - it also reactive way. After this i call subscribeToSymbols method which does a whole socket stuff:

public static Single<WebSocket> subscribeToSymbols(WebSocket socket,
                                                   PublishSubject<String> subject,
                                                   List<StockHttpGetData> symbols) {
    // connect -> register onTextMessage callback -> sendMessages to Socket
    return Single.fromCallable(socket::connect)
            .subscribeOn(Schedulers.io())
            .map(s -> s.addListener(new WebSocketAdapter() {
                @Override
                public void onTextMessage(WebSocket websocket, String text) {
                    subject.onNext(text);
                }
            }))
            .doAfterSuccess(s -> symbols.forEach(httpData -> sendMessageToSubscribe(s, httpData)));
}

You are not interested in what subscribeToSymbols returns. The key point that subject.onNext(text) provides received message from socket to you if you subscribed to this subject.

Finally, subscribe to subject what you've created before and do what you want with message:

subject.flatMap(t -> Observable.just(new GsonBuilder()
            .registerTypeAdapter(TreeSet.class, new SocketMessageDeserializer())
            .create()
            .fromJson(t, TreeSet.class)))
       .filter(treeSet -> !treeSet.isEmpty())
       .observeOn(AndroidSchedulers.mainThread())
       .doOnNext(StocksRepository.getInstance()::printSocketMessage)
       .subscribe(adapter::setStocksChanged, Throwable::printStackTrace);

The fact that it's complicated question because you should also handle connection errors, data pressure, config changes and be lifecycle-aware, but it also relevant for non-reactive way (e.g callback-hell), so PublishSubject is a start point.

like image 35
holdbetter Avatar answered Oct 19 '22 12:10

holdbetter