Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

RxJava: observable that contains an asynchronous call

I'm trying to understand RxJava and running into the following situation.

Consider the following method that returns an observable which calls NsdManager.registerService. The registerService method needs a listener which is called when registration succeeded (or failed).

public Observable<Boolean> registerService() {
    return Observable.create(new Observable.OnSubscribe<Boolean>() {

        @Override
        public void call(Subscriber<? super Boolean> subscriber) {
            nsdManager.registerService(serviceInfo, NsdManager.PROTOCOL_DNS_SD, registrationListener);

            // how to proceed?
        }
    });
}

The observer can provide a notification only after the listener has been called, but the listener is called asynchronously.

How can I do this with RxJava?


I came up with the following, using a BehaviorSubject. Don't know if it is the best solution, but it works.

private BehaviorSubject<Boolean> registrationSubject;

public Observable<Boolean> registerService() {
    registrationSubject = BehaviorSubject.create();

    Observable.create(new Observable.OnSubscribe<Boolean>() {
        @Override
        public void call(Subscriber<? super Boolean> subscriber) {
            NsdServiceInfo serviceInfo  = new NsdServiceInfo();
            serviceInfo.setServiceName(serviceName);
            serviceInfo.setServiceType(NSD_SERVICE_TYPE);
            serviceInfo.setPort(serverSocket.getLocalPort());

            nsdManager.registerService(serviceInfo, NsdManager.PROTOCOL_DNS_SD, registrationListener);
        }
    }).subscribe(registrationSubject);

    return registrationSubject;
}

private NsdManager.RegistrationListener registrationListener = new NsdManager.RegistrationListener() {
    @Override
    public void onRegistrationFailed(NsdServiceInfo serviceInfo, int errorCode) {
        registrationSubject.onNext(false);
        registrationSubject.onCompleted();
    }

    @Override
    public void onServiceRegistered(NsdServiceInfo serviceInfo) {
        registrationSubject.onNext(true);
        registrationSubject.onCompleted();
    }

    @Override
    public void onUnregistrationFailed(NsdServiceInfo serviceInfo, int errorCode) { }

    @Override
    public void onServiceUnregistered(NsdServiceInfo serviceInfo) {}
};
like image 943
Jaap van Hengstum Avatar asked Aug 12 '14 08:08

Jaap van Hengstum


2 Answers

I think it's better to avoid the use of Subjects whenever possible. In your solution you only use the subject to call onNext and onCompleted. However, within the Observable.create() method you already have access to the subscriber on which you can call these methods. In other words, you can wrap the complete setup of your event handler inside the Observable.create() method.

public Observable<Boolean> registerService() {
    return Observable.create(new Observable.OnSubscribe<Boolean>() {
        @Override
        public void call(final Subscriber<? super Boolean> subscriber) {
            NsdServiceInfo serviceInfo  = new NsdServiceInfo();
            serviceInfo.setServiceName(serviceName);
            serviceInfo.setServiceType(NSD_SERVICE_TYPE);
            serviceInfo.setPort(serverSocket.getLocalPort());

            nsdManager.registerService(serviceInfo, NsdManager.PROTOCOL_DNS_SD, 
                new NsdManager.RegistrationListener() {
                    @Override
                    public void onRegistrationFailed(NsdServiceInfo serviceInfo, int errorCode) {
                        if (!subscriber.isUnsubscribed()) {
                            subscriber.onNext(false);
                            subscriber.onCompleted();
                        }
                    }

                    @Override
                    public void onServiceRegistered(NsdServiceInfo serviceInfo) {
                        if (!subscriber.isUnsubscribed()) {
                            subscriber.onNext(true);
                            subscriber.onCompleted();
                        }
                    }

                    @Override
                    public void onUnregistrationFailed(NsdServiceInfo serviceInfo, int errorCode) { }

                    @Override
                    public void onServiceUnregistered(NsdServiceInfo serviceInfo) {}
                }
            );
        }
    });
}
like image 149
Georgi Khomeriki Avatar answered Sep 27 '22 21:09

Georgi Khomeriki


Inside the listener implementation call:

subscriber.onNext(result) 
subscriber.onComplete()

The result is a boolean passed to the listener.

like image 39
tomrozb Avatar answered Sep 27 '22 20:09

tomrozb