I'm trying to understand RxJava and running into the following situation.
Consider the following method that returns an observable which calls NsdManager.registerService
. The registerService method needs a listener which is called when registration succeeded (or failed).
public Observable<Boolean> registerService() {
return Observable.create(new Observable.OnSubscribe<Boolean>() {
@Override
public void call(Subscriber<? super Boolean> subscriber) {
nsdManager.registerService(serviceInfo, NsdManager.PROTOCOL_DNS_SD, registrationListener);
// how to proceed?
}
});
}
The observer can provide a notification only after the listener has been called, but the listener is called asynchronously.
How can I do this with RxJava?
I came up with the following, using a BehaviorSubject. Don't know if it is the best solution, but it works.
private BehaviorSubject<Boolean> registrationSubject;
public Observable<Boolean> registerService() {
registrationSubject = BehaviorSubject.create();
Observable.create(new Observable.OnSubscribe<Boolean>() {
@Override
public void call(Subscriber<? super Boolean> subscriber) {
NsdServiceInfo serviceInfo = new NsdServiceInfo();
serviceInfo.setServiceName(serviceName);
serviceInfo.setServiceType(NSD_SERVICE_TYPE);
serviceInfo.setPort(serverSocket.getLocalPort());
nsdManager.registerService(serviceInfo, NsdManager.PROTOCOL_DNS_SD, registrationListener);
}
}).subscribe(registrationSubject);
return registrationSubject;
}
private NsdManager.RegistrationListener registrationListener = new NsdManager.RegistrationListener() {
@Override
public void onRegistrationFailed(NsdServiceInfo serviceInfo, int errorCode) {
registrationSubject.onNext(false);
registrationSubject.onCompleted();
}
@Override
public void onServiceRegistered(NsdServiceInfo serviceInfo) {
registrationSubject.onNext(true);
registrationSubject.onCompleted();
}
@Override
public void onUnregistrationFailed(NsdServiceInfo serviceInfo, int errorCode) { }
@Override
public void onServiceUnregistered(NsdServiceInfo serviceInfo) {}
};
I think it's better to avoid the use of Subjects whenever possible.
In your solution you only use the subject to call onNext
and onCompleted
. However, within the Observable.create()
method you already have access to the subscriber
on which you can call these methods. In other words, you can wrap the complete setup of your event handler inside the Observable.create()
method.
public Observable<Boolean> registerService() {
return Observable.create(new Observable.OnSubscribe<Boolean>() {
@Override
public void call(final Subscriber<? super Boolean> subscriber) {
NsdServiceInfo serviceInfo = new NsdServiceInfo();
serviceInfo.setServiceName(serviceName);
serviceInfo.setServiceType(NSD_SERVICE_TYPE);
serviceInfo.setPort(serverSocket.getLocalPort());
nsdManager.registerService(serviceInfo, NsdManager.PROTOCOL_DNS_SD,
new NsdManager.RegistrationListener() {
@Override
public void onRegistrationFailed(NsdServiceInfo serviceInfo, int errorCode) {
if (!subscriber.isUnsubscribed()) {
subscriber.onNext(false);
subscriber.onCompleted();
}
}
@Override
public void onServiceRegistered(NsdServiceInfo serviceInfo) {
if (!subscriber.isUnsubscribed()) {
subscriber.onNext(true);
subscriber.onCompleted();
}
}
@Override
public void onUnregistrationFailed(NsdServiceInfo serviceInfo, int errorCode) { }
@Override
public void onServiceUnregistered(NsdServiceInfo serviceInfo) {}
}
);
}
});
}
Inside the listener implementation call:
subscriber.onNext(result)
subscriber.onComplete()
The result
is a boolean
passed to the listener.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With