I'm looking for an elegant way to create an Observable
from a plain callback delegate with Rx, something similar to Observable.FromEventPattern
?
Say, I'm wrapping Win32 EnumWindows
API which calls back the EnumWindowsProc
I provide.
I know I could create a temporary C# event adapter for this callback and pass it FromEventPattern
. Also, I could probably implement IObservable
manually, so it would call IObserver.OnNext
from my EnumWindowsProc
callback.
Is the there an existing pattern for wrapping a callback in Rx that I'm missing?
An Observable is like a speaker that emits a value. It does some work and emits some values. An Operator is like a translator which translates/modifies data from one form to another form. An Observer gets those values.
fromArray example: Integer[] array = new Integer[10]; for (int i = 0; i < array. length; i++) { array[i] = i; } Observable<Integer> observable = Observable. fromArray(array); observable. subscribe(item -> System.
Similarly, in RxJava, Observable is something that emits some data or event, and an observer is something that receives that data or event.
RxJava provides many methods in its library to create an Observable. Choosing which one to use can be difficult. My goal from this article is to help you in making this choice simpler by providing you with a mental map of different scenarios and which methods to use in each scenario.
Keep in mind that callbacks like the one used in EnumWindows
are subtly different than Rx. Specifically, the callback can communicate back to the caller through its return value. Rx observers cannot do this. Also, callbacks can receive multiple parameters, but Rx observers receive a single value. So you need to wrap the multiple parameters into a single object.
With that in mind, an alternative to using a Subject
is to use Observable.Create
. This way you only register the callback when there is actually an observer and you unregister it if that observer unsubscribes.
For the synchronous API you've used an example, you might do something like this. Note in this example there is not actually a way to unregister the callback mid-stream since it all happens synchronously before we can ever return the unsubscribe disposable.
public static IObservable<Foo> WrapFooApi(string arg1, string arg2)
{
return Observable.Create<Foo>(observer =>
{
FooApi.enumerate(arg1, arg2, e =>
{
observer.OnNext(new Foo(e));
return true;
});
// In your case, FooApi.enumerate is actually synchronous
// so when we get to this line of code, we know
// the stream is complete.
observer.OnCompleted();
return Disposable.Empty;
});
}
// Usage
WrapFooApi("a", "b").Take(1).Subscribe(...); // only takes first item
We can fix the problem with being unable to stop early by introducing a little asynchronicity, which will give the observer time to get a disposable that it can dispose of to notify you. We can use CreateAsync
to get a CancellationToken
that will cancel when the observer unsubscribes. And we can run the FooApi
code inside Task.Run
:
public static IObservable<Foo> WrapFooApi(string arg1, string arg2)
{
return Observable.CreateAsync<Foo>(async (observer, ct) =>
{
await Task.Run(() => FooApi.register_callback(arg1, arg2, e =>
{
observer.OnNext(e);
// Returning false will stop the enumeration
return !ct.IsCancellationRequested;
}));
observer.OnCompleted();
});
}
In a more traditional asynchronous callback API, where you register at some point and unregister at some other point, you might have something more like this:
public static IObservable<Foo> WrapFooApi(string args)
{
return Observable.Create<Foo>(observer =>
{
FooToken token = default(FooToken);
var unsubscribe = Disposable.Create(() => FooApi.Unregister(token));
token = FooApi.Register(args, e =>
{
observer.OnNext(new Foo(e));
});
return unsubscribe;
});
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With