Observable for a callback in Rx

I'm looking for an elegant way to create an Observable from a plain callback delegate with Rx, something similar to Observable.FromEventPattern?

Say, I'm wrapping Win32 EnumWindows API which calls back the EnumWindowsProc I provide.

I know I could create a temporary C# event adapter for this callback and pass it FromEventPattern. Also, I could probably implement IObservable manually, so it would call IObserver.OnNext from my EnumWindowsProc callback.

Is the there an existing pattern for wrapping a callback in Rx that I'm missing?

Keep in mind that callbacks like the one used in EnumWindows are subtly different than Rx. Specifically, the callback can communicate back to the caller through its return value. Rx observers cannot do this. Also, callbacks can receive multiple parameters, but Rx observers receive a single value. So you need to wrap the multiple parameters into a single object.

With that in mind, an alternative to using a Subject is to use Observable.Create. This way you only register the callback when there is actually an observer and you unregister it if that observer unsubscribes.

For the synchronous API you've used an example, you might do something like this. Note in this example there is not actually a way to unregister the callback mid-stream since it all happens synchronously before we can ever return the unsubscribe disposable.

public static IObservable<Foo> WrapFooApi(string arg1, string arg2)
    return Observable.Create<Foo>(observer =>
        FooApi.enumerate(arg1, arg2, e =>
            observer.OnNext(new Foo(e));
            return true;

        // In your case, FooApi.enumerate is actually synchronous
        // so when we get to this line of code, we know
        // the stream is complete.
        return Disposable.Empty;

// Usage
WrapFooApi("a", "b").Take(1).Subscribe(...); // only takes first item

We can fix the problem with being unable to stop early by introducing a little asynchronicity, which will give the observer time to get a disposable that it can dispose of to notify you. We can use CreateAsync to get a CancellationToken that will cancel when the observer unsubscribes. And we can run the FooApi code inside Task.Run:

public static IObservable<Foo> WrapFooApi(string arg1, string arg2)
    return Observable.CreateAsync<Foo>(async (observer, ct) =>
        await Task.Run(() => FooApi.register_callback(arg1, arg2, e =>

            // Returning false will stop the enumeration
            return !ct.IsCancellationRequested;

In a more traditional asynchronous callback API, where you register at some point and unregister at some other point, you might have something more like this:

public static IObservable<Foo> WrapFooApi(string args)
    return Observable.Create<Foo>(observer =>
        FooToken token = default(FooToken);
        var unsubscribe = Disposable.Create(() => FooApi.Unregister(token));
        token = FooApi.Register(args, e =>
            observer.OnNext(new Foo(e));

        return unsubscribe;
