Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to properly convert Listeners to Reactive (Observables) using RxJava?

I'm using a multiplayer Game Client that's called AppWarp (http://appwarp.shephertz.com), where you can add event listeners to be called back when event's happen, let's assume we'll be talking about the Connection Listener, where you need to implement this interface:

public interface ConnectionRequestListener {
    void onConnectDone(ConnectEvent var1);
    void onDisconnectDone(ConnectEvent var1);
    void onInitUDPDone(byte var1);
}

My goal here is to mainly create a Reactive version of this client to be used in my Apps Internally instead of using the Client itself directly (I'll also rely on interfaces later instead of just depending on the WarpClient itself as in the example, but that's not the important point, please read my question at the very end).

So what I did is as follows:

1) I introduced a new event, named it RxConnectionEvent (Which mainly groups Connection-Related events) as follows:

public class RxConnectionEvent {
    // This is the original connection event from the source client
    private final ConnectEvent connectEvent;
    // this is to identify if it was Connection / Disconnection
    private final int eventType;

    public RxConnectionEvent(ConnectEvent connectEvent, int eventType) {
        this.connectEvent = connectEvent;
        this.eventType = eventType;
    }

    public ConnectEvent getConnectEvent() {
        return connectEvent;
    }

    public int getEventType() {
        return eventType;
    }
}

2) Created some event types as follows:

public class RxEventType {
    // Connection Events
    public final static int CONNECTION_CONNECTED = 20;
    public final static int CONNECTION_DISCONNECTED = 30;
}

3) Created the following observable which emits my new RxConnectionEvent

import com.shephertz.app42.gaming.multiplayer.client.WarpClient;
import com.shephertz.app42.gaming.multiplayer.client.events.ConnectEvent;
import rx.Observable;
import rx.Subscriber;
import rx.functions.Action0;
import rx.subscriptions.Subscriptions;

public class ConnectionObservable extends BaseObservable<RxConnectionEvent> {

    private ConnectionRequestListener connectionListener;

    // This is going to be called from my ReactiveWarpClient (Factory) Later.
    public static Observable<RxConnectionEvent> createConnectionListener(WarpClient warpClient) {
        return Observable.create(new ConnectionObservable(warpClient));
    }

    private ConnectionObservable(WarpClient warpClient) {
        super(warpClient);
    }

    @Override
    public void call(final Subscriber<? super RxConnectionEvent> subscriber) {
        subscriber.onStart();
        connectionListener = new ConnectionRequestListener() {
            @Override
            public void onConnectDone(ConnectEvent connectEvent) {
                super.onConnectDone(connectEvent);
                callback(new RxConnectionEvent(connectEvent, RxEventType.CONNECTION_CONNECTED));
            }

            @Override
            public void onDisconnectDone(ConnectEvent connectEvent) {
                super.onDisconnectDone(connectEvent);
                callback(new RxConnectionEvent(connectEvent, RxEventType.CONNECTION_DISCONNECTED));
            }

            // not interested in this method (for now)
            @Override
            public void onInitUDPDone(byte var1) { }

            private void callback(RxConnectionEvent rxConnectionEvent)
            {
                if (!subscriber.isUnsubscribed()) {
                    subscriber.onNext(rxConnectionEvent);
                } else {
                    warpClient.removeConnectionRequestListener(connectionListener);
                }
            }
        };

        warpClient.addConnectionRequestListener(connectionListener);
        subscriber.add(Subscriptions.create(new Action0() {
            @Override
            public void call() {
                onUnsubscribed(warpClient);
            }
        }));
    }

    @Override
    protected void onUnsubscribed(WarpClient warpClient) {
        warpClient.removeConnectionRequestListener(connectionListener);
    }
}

4) and finally my BaseObservable looks like the following:

public abstract class BaseObservable<T> implements Observable.OnSubscribe<T> {

    protected WarpClient warpClient;

    protected BaseObservable (WarpClient warpClient)
    {
        this.warpClient = warpClient;
    }

    @Override
    public abstract void call(Subscriber<? super T> subscriber);

    protected abstract void onUnsubscribed(WarpClient warpClient);
}

My question is mainly: is my implementation above correct or should I instead create separate observable for each event, but if so, this client has more than 40-50 events do I have to create separate observable for each event?

I also use the code above as follows (used it in a simple "non-final" integration test):

public void testConnectDisconnect() {
    connectionSubscription = reactiveWarpClient.createOnConnectObservable(client)
            .subscribe(new Action1<RxConnectionEvent>() {
                @Override
                public void call(RxConnectionEvent rxEvent) {
                    assertEquals(WarpResponseResultCode.SUCCESS, rxEvent.getConnectEvent().getResult());
                    if (rxEvent.getEventType() == RxEventType.CONNECTION_CONNECTED) {
                        connectionStatus = connectionStatus | 0b0001;
                        client.disconnect();
                    } else {
                        connectionStatus = connectionStatus | 0b0010;
                        connectionSubscription.unsubscribe();
                        haltExecution = true;
                    }
                }
            }, new Action1<Throwable>() {
                @Override
                public void call(Throwable throwable) {
                    fail("Unexpected error: " + throwable.getMessage());
                    haltExecution = true;
                }
            });

    client.connectWithUserName("test user");
    waitForSomeTime();
    assertEquals(0b0011, connectionStatus);
    assertEquals(true, connectionSubscription.isUnsubscribed());
}
like image 291
Shehabic Avatar asked Feb 10 '16 13:02

Shehabic


1 Answers

I suggest you avoid extending the BaseObservable directly since it's very error prone. Instead, try using the tools Rx itself gives you to create your observable.

The easiest solution is using a PublishSubject, which is both an Observable and a Subscriber. The listener simply needs to invoke the subject's onNext, and the subject will emit the event. Here's a simplified working example:

public class PublishSubjectWarpperDemo {

    public interface ConnectionRequestListener {
        void onConnectDone();

        void onDisconnectDone();

        void onInitUDPDone();
    }

    public static class RxConnectionEvent {
        private int type;

        public RxConnectionEvent(int type) {
            this.type = type;
        }

        public int getType() {
            return type;
        }

        public String toString() {
            return "Event of Type " + type;
        }
    }

    public static class SimpleCallbackWrapper {
        private final PublishSubject<RxConnectionEvent> subject = PublishSubject.create();

        public ConnectionRequestListener getListener() {
            return new ConnectionRequestListener() {

                @Override
                public void onConnectDone() {
                    subject.onNext(new RxConnectionEvent(1));
                }

                @Override
                public void onDisconnectDone() {
                    subject.onNext(new RxConnectionEvent(2));
                }

                @Override
                public void onInitUDPDone() {
                    subject.onNext(new RxConnectionEvent(3));
                }
            };
        }

        public Observable<RxConnectionEvent> getObservable() {
            return subject;
        }

    }

    public static void main(String[] args) throws IOException {
        SimpleCallbackWrapper myWrapper = new SimpleCallbackWrapper();
        ConnectionRequestListener listner = myWrapper.getListener();// Get the listener and attach it to the game here.
        myWrapper.getObservable().observeOn(Schedulers.newThread()).subscribe(event -> System.out.println(event));

        listner.onConnectDone(); // Call the listener a few times, the observable should print the event
        listner.onDisconnectDone();
        listner.onInitUDPDone();

        System.in.read(); // Wait for enter
    }
}

A more complex solution would be to use one of the onSubscribe implementations to create an observable using Observable.create(). For example AsyncOnSubscibe. This solution has the benefit of handling backperssure properly, so your event subscriber doesn't become overwhelmed with events. But in your case, that sounds like an unlikely scenario, so the added complexity is probably not worth it.

like image 162
Malt Avatar answered Nov 05 '22 09:11

Malt