The library I'm using emits a series of Message
objects using callback object.
interface MessageCallback {
onMessage(Message message);
}
The callback is added using some libraryObject.setCallback(MessageCallback)
call and the process is started using non-blocking libraryObject.start()
method call.
What is the best way of creating an Observable<Message>
that will emit those objects?
What if the libraryObject.start()
is blocking?
We can convert it to Observable like this (example for RxJava 2):
Observable<Message> source = Observable.create(emitter -> {
MessageCallback callback = message -> emitter.onNext(message);
libraryObject.setCallback(callback);
Schedulers.io().scheduleDirect(libraryObject::start);
emitter.setCancellable(() -> libraryObject.removeCallback(callback));
})
.share(); // make it hot
share
makes this observable hot, i.e. multiple subscribers will share single subscription, i.e. there will be at most one callback registered with libraryObject
.
I used io
scheduler to schedule start
call to be made from background thread, so it does not delay first subscription.
It is quite common scenario as well. Let's say we have the following callback-style asynchronous method:
libraryObject.requestDataAsync(Some parameters, MessageCallback callback);
Then we can convert it to Observable like this (example for RxJava 2):
Observable<Message> makeRequest(parameters) {
return Observable.create(emitter -> {
libraryObject.requestDataAsync(parameters, message -> {
emitter.onNext(message);
emitter.onComplete();
});
});
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With