Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

RxJava: Observing messages emitted from a socket

I'm still trying to learn RxJava. There's one thing that I just can't wrap my head around right now. Every article that has tried to learn me how to use Rx has shown me how to create Observables based on sources that are already "predictable", i.e. sequences of a set amount of items (A single value or, for example, a simple Iterable).

Mostly you'll see something along the lines of Observable.just()

Observable<String> observerable = Observable.just("Hello, world!");

Or Observable.from():

Observable.from("apple", "orange", "banana").subscribe(fruit -> System.out.println(fruit));

That's all nice, but what about the following usecase?

I have messages that are constantly pushed through a socket (I didn't build it, I'm merely integrating). I need to "Observe" the sequence of data that is being pushed through the socket.

Many people seem to point to Obserable.using() (Here's an example), but I don't think it's the right solution either. Messages pushed through the socket are incomplete because they have a maximum length. I need to 'frame' the messages myself, so I need to buffer between each push from the socket.

In other words, I'm looking for a way to frame the messages myself from the data pushed from the socket and then push them into the Observable. I've been looking for the proper way to do this all over the place, but I can't seem to find a proper solution.

like image 995
romatthe Avatar asked Dec 13 '15 18:12

romatthe


1 Answers

What about Observable with fully customizable behavior?

Observable.create(subscriber -> {
  Socket socket = getSocket();
  socket.subscribe(new SocketListener() {

    @Override public void onNewFrame(Frame frame) {
      // Process frame and prepare payload to the subscriber.

      if (payloadReadyForExternalObserver) {
        if (subscriber.isUnsubscribed()) {
          // Subscriber unsubscribed, let's close the socket.
          socket.close();
        } else {
          subscriber.onNext(payload);
        }
      }
    }

    @Override public void onSocketError(IOException exception) {
      subscriber.onError(exception); // Terminal state.
    }

    @Override public void onSocketClosed() {
      subscriber.onCompleted(); // Terminal state.
    }
  });
})

But make sure you implement Observable contract correctly, for more info please read https://github.com/ReactiveX/RxJava/wiki/Implementing-Your-Own-Operators

like image 195
Artem Zinnatullin Avatar answered Sep 29 '22 01:09

Artem Zinnatullin