Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to convert callback based API into one based on Observable?

The library I'm using emits a series of Message objects using callback object.

interface MessageCallback {
    onMessage(Message message);
}

The callback is added using some libraryObject.setCallback(MessageCallback) call and the process is started using non-blocking libraryObject.start() method call.

What is the best way of creating an Observable<Message> that will emit those objects?

What if the libraryObject.start() is blocking?

like image 302
atok Avatar asked Mar 13 '15 19:03

atok


1 Answers

1. Callback invoked infinite number of times

We can convert it to Observable like this (example for RxJava 2):

Observable<Message> source = Observable.create(emitter -> {
        MessageCallback callback = message -> emitter.onNext(message);
        libraryObject.setCallback(callback);
        Schedulers.io().scheduleDirect(libraryObject::start);
        emitter.setCancellable(() -> libraryObject.removeCallback(callback));
    })
    .share(); // make it hot

share makes this observable hot, i.e. multiple subscribers will share single subscription, i.e. there will be at most one callback registered with libraryObject.

I used io scheduler to schedule start call to be made from background thread, so it does not delay first subscription.

2. Single message callback

It is quite common scenario as well. Let's say we have the following callback-style asynchronous method:

libraryObject.requestDataAsync(Some parameters, MessageCallback callback);

Then we can convert it to Observable like this (example for RxJava 2):

Observable<Message> makeRequest(parameters) {
    return Observable.create(emitter -> {
        libraryObject.requestDataAsync(parameters, message -> {
            emitter.onNext(message);
            emitter.onComplete();
        });
    });
}
like image 108
Yaroslav Stavnichiy Avatar answered Oct 05 '22 23:10

Yaroslav Stavnichiy