Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Observable stream from StackExchange Redis Pub Sub subscription

OBJECTIVE:

I am using StackExchange Redis Client. My objective is to create an Observable stream from the Pub Sub Subscriber exposed by the Client, that can then in turn support 1-n subscriptions by Observables, that each have their own filter via LINQ. (Publishing is working as planned, the issue is purely around Subscription to the Event Stream on a specific channel.)

BACKGROUND:

I am using Redis Pub Sub as part of an Event Sourced CQRS application. The specific use case is to publish Events to multiple subscribers that then update the various Read Models, send emails etc.

Each of these Subscribers need to filter the Event Types that they handle, and for this I am looking to use Rx .Net (Reactive Extensions), with LINQ, to provide a filter criteria on the Event Stream, to efficiently handle reacting only to Events of interest. Using this approach removes the need for registering Handlers with an event bus implementation, and allows me to add new projections to the system by deploying 1-n Microservices that each have 1-n Observables subscribed to the Event Stream with their own specific filters.

WHAT I HAVE TRIED:

1) I have created a class inheriting from ObservableBase, overriding the SubscribeCore method, which receives subscription requests from Observables, stores them in a ConcurrentDictionary, and as each Redis notification arrives from the channel, loops through the registered Observable subscribers and calls their OnNext method passing the RedisValue.

2) I have created a Subject, that also accepts subscriptions from Observables, and calls their OnNext method. Again, the use of Subjects appears to be frowned upon by many.

THE ISSUE:

The approaches I have tried do function (at least superficially), with varying levels of performance, but feel like a hack, and that I am not using Rx in the way it was intended.

I see many comments that the built-in Observable methods should be used where at all possible, for example Observable.FromEvent, but that seems to be impossible to do with the StackExchange Redis Clients Subscription API, at least to my eyes.

I also understand that the preferred method for receiving a stream and forwarding to multiple Observers is to use a ConnectableObservable, which would seem to be designed for the very scenario I face (Each Microservice will internally have 1-n Observables subscribed). At the moment, I cannot get my head around how to connect a ConnectableObservable to the notifications from StackExchange Redis, or if it offers real benefit over an Observable.

UPDATE:

Although completion is not an issue in my scenario (Disposal is fine), error handling is important; e.g. isolating errors detected in one subscriber to prevent all subscriptions terminating.

like image 748
dmcquiggin Avatar asked Nov 24 '16 15:11

dmcquiggin


1 Answers

Here is an extension method you can use to create an IObservable<RedisValue> from an ISubscriber and a RedisChannel:

public static IObservable<RedisValue> WhenMessageReceived(this ISubscriber subscriber, RedisChannel channel)
{
    return Observable.Create<RedisValue>(async (obs, ct) =>
    {
        // as the SubscribeAsync callback can be invoked concurrently
        // a thread-safe wrapper for OnNext is needed
        var syncObs = Observer.Synchronize(obs);
        await subscriber.SubscribeAsync(channel, (_, message) =>
        {
            syncObs.OnNext(message);
        }).ConfigureAwait(false);

        return Disposable.Create(() => subscriber.Unsubscribe(channel));
    });
}

As there is no completion of Redis channels the resulting IObservable will never complete, however you may drop the IDisposable subscription to unsubscribe from the Redis channel (this will be done automatically by many Rx operators).

Usage could be like so:

var subscriber = connectionMultiplexer.GetSubscriber();

var gotMessage = await subscriber.WhenMessageReceived("my_channel")
    .AnyAsync(msg => msg == "expected_message")
    .ToTask()
    .ConfigureAwait(false);

Or as per your example:

var subscriber = connectionMultiplexer.GetSubscriber();

var sendEmailEvents = subscriber.WhenMessageReceived("my_channel")
    .Select(msg => ParseEventFromMessage(msg))
    .Where(evt => evt.Type == EventType.SendEmails);

await sendEmailEvents.ForEachAsync(evt => 
{
    SendEmails(evt);
}).ConfigureAwait(false);

Other microservices may filter differently.

like image 92
Lukazoid Avatar answered Oct 10 '22 16:10

Lukazoid