Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Observable for a callback in Rx

I'm looking for an elegant way to create an Observable from a plain callback delegate with Rx, something similar to Observable.FromEventPattern?

Say, I'm wrapping Win32 EnumWindows API which calls back the EnumWindowsProc I provide.

I know I could create a temporary C# event adapter for this callback and pass it FromEventPattern. Also, I could probably implement IObservable manually, so it would call IObserver.OnNext from my EnumWindowsProc callback.

Is the there an existing pattern for wrapping a callback in Rx that I'm missing?

like image 907
avo Avatar asked Jul 10 '14 11:07

avo


People also ask

What is an Observable in RxJava?

An Observable is like a speaker that emits a value. It does some work and emits some values. An Operator is like a translator which translates/modifies data from one form to another form. An Observer gets those values.

How do you make an Observable in RxJava?

fromArray example: Integer[] array = new Integer[10]; for (int i = 0; i < array. length; i++) { array[i] = i; } Observable<Integer> observable = Observable. fromArray(array); observable. subscribe(item -> System.

What is observer and Observable in RxJava?

Similarly, in RxJava, Observable is something that emits some data or event, and an observer is something that receives that data or event.

Can we create our own Observable in RxJava?

RxJava provides many methods in its library to create an Observable. Choosing which one to use can be difficult. My goal from this article is to help you in making this choice simpler by providing you with a mental map of different scenarios and which methods to use in each scenario.


1 Answers

Keep in mind that callbacks like the one used in EnumWindows are subtly different than Rx. Specifically, the callback can communicate back to the caller through its return value. Rx observers cannot do this. Also, callbacks can receive multiple parameters, but Rx observers receive a single value. So you need to wrap the multiple parameters into a single object.

With that in mind, an alternative to using a Subject is to use Observable.Create. This way you only register the callback when there is actually an observer and you unregister it if that observer unsubscribes.

For the synchronous API you've used an example, you might do something like this. Note in this example there is not actually a way to unregister the callback mid-stream since it all happens synchronously before we can ever return the unsubscribe disposable.

public static IObservable<Foo> WrapFooApi(string arg1, string arg2)
{
    return Observable.Create<Foo>(observer =>
    {
        FooApi.enumerate(arg1, arg2, e =>
        {
            observer.OnNext(new Foo(e));
            return true;
        });

        // In your case, FooApi.enumerate is actually synchronous
        // so when we get to this line of code, we know
        // the stream is complete.
        observer.OnCompleted();
        return Disposable.Empty;
    });
}

// Usage
WrapFooApi("a", "b").Take(1).Subscribe(...); // only takes first item

We can fix the problem with being unable to stop early by introducing a little asynchronicity, which will give the observer time to get a disposable that it can dispose of to notify you. We can use CreateAsync to get a CancellationToken that will cancel when the observer unsubscribes. And we can run the FooApi code inside Task.Run:

public static IObservable<Foo> WrapFooApi(string arg1, string arg2)
{
    return Observable.CreateAsync<Foo>(async (observer, ct) =>
    {
        await Task.Run(() => FooApi.register_callback(arg1, arg2, e =>
        {
            observer.OnNext(e);

            // Returning false will stop the enumeration
            return !ct.IsCancellationRequested;
        }));
        observer.OnCompleted();
    });
}

In a more traditional asynchronous callback API, where you register at some point and unregister at some other point, you might have something more like this:

public static IObservable<Foo> WrapFooApi(string args)
{
    return Observable.Create<Foo>(observer =>
    {
        FooToken token = default(FooToken);
        var unsubscribe = Disposable.Create(() => FooApi.Unregister(token));
        token = FooApi.Register(args, e =>
        {
            observer.OnNext(new Foo(e));
        });

        return unsubscribe;
    });
}
like image 118
Brandon Avatar answered Sep 19 '22 17:09

Brandon