Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Rx: Count of Grouped Events in Moving Window

I have started looking at using Reactive Extensions with EventStore. As a proof of concept, I'd like to see if I can get Rx to consume an event stream and output the count of events grouped by type for a window of one second.

So, say that I am consuming a stream with the name "orders", I'd like to see something like the following appear in the console:

OrderCreated 201
OrderUpdated 111

(a second passes..)

OrderCreated 123
OrderUpdated 132

And so on.

So far, I have been able to get an output of the count of all events per second. But can't seem to be able to group them by event type.

The code I am using is based on a gist by James Nugent:

internal class EventStoreRxSubscription
{
    public Subject<ResolvedEvent> ResolvedEvents { get; }

    public Subject<SubscriptionDropReason>  DroppedReasons { get; }
    public EventStoreSubscription Subscription { get; }

    public EventStoreRxSubscription(EventStoreSubscription subscription, Subject<ResolvedEvent> resolvedEvent, Subject<SubscriptionDropReason> droppedReasons)
    {
        Subscription = subscription;
        ResolvedEvents = resolvedEvent;
        DroppedReasons = droppedReasons;
    }
}

static class EventStoreConnectionExtensions
{
    public static Task<EventStoreRxSubscription> SubscribeTo(this IEventStoreConnection connection, string streamName, bool resolveLinkTos)
    {
        return Task<EventStoreRxSubscription>.Factory.StartNew(() => {

            var resolvedEvents = new Subject<ResolvedEvent>();
            var droppedReasons = new Subject<SubscriptionDropReason>();

            var subscriptionTask = connection.SubscribeToStreamAsync(streamName, resolveLinkTos, 
                                                                    (subscription, @event) => resolvedEvents.OnNext(@event), 
                                                                    (subscription, dropReason, arg3) => droppedReasons.OnNext(dropReason));
            subscriptionTask.Wait();

            return new EventStoreRxSubscription(subscriptionTask.Result, resolvedEvents, droppedReasons);
        });
    }
}

class Program
{
    static void Main(string[] args)
    {
         var connection = EventStoreConnection.Create(new IPEndPoint(IPAddress.Loopback, 1113));
         connection.ConnectAsync();

         var subscriptionTask = connection.SubscribeTo("orders", true);
         subscriptionTask.Wait();

         var events = subscriptionTask.Result.ResolvedEvents;

         var query = events.Timestamp()
                .Buffer(TimeSpan.FromSeconds(1))
                .Select(e => e.Count);

         query.Subscribe(Console.WriteLine);

         Console.ReadLine();
    }
 }
like image 347
David Brower Avatar asked Oct 19 '22 14:10

David Brower


1 Answers

I have done something similar to this before, I used Throttle to group all events within a set frequency, however you could use Buffer to get the count/collection for every period.

The example below provides an abstract example of how I achieved this, where AggregateType and AggregateFunction would be replaced by your own type and aggregation.

GroupByUntil allows you to group by a type within a set period.

subscription = observable
    .GroupByUntil(e => e.Key, e => e.Buffer(TimeSpan.FromSeconds(1)))
    .SelectMany(e => e.Aggregate(new AggregateType(), (a, e) => AggregateFunction(a, e))
    .Subscribe(onNext, onError, onCompleted);

EDIT

Below is a quick example I've knocked up to show how it can be done

public class EventType
{
    public string Type { get; set; }
}

public class AggregatedType
{
    public string EventType { get; set; }
    public int Count { get; set; }
}

class Program
{
    public delegate void ExampleEventHandler(EventType e);

    public static event ExampleEventHandler Event;

    static void Main(string[] args)
    {
        var subscription = Observable.FromEvent<ExampleEventHandler, EventType>(e => Event += e, e => Event -= e)
            .GroupByUntil(e => e.Type, e => e.Buffer(TimeSpan.FromSeconds(1)))
            .SelectMany(e => e
                .Select(ev => new AggregatedType {  EventType = ev.Type })
                .Aggregate(new AggregatedType(), (a, ev) => new AggregatedType { EventType = ev.EventType, Count = a.Count + 1 }))
            .Subscribe(OnAggregaredEvent, OnException, OnCompleted);

        Event(new EventType { Type = "A" });
        Event(new EventType { Type = "A" });
        Event(new EventType { Type = "B" });
        Event(new EventType { Type = "B" });

        SpinWait.SpinUntil(()=> false, TimeSpan.FromSeconds(2));

        Event(new EventType { Type = "A" });
        Event(new EventType { Type = "A" });
        Event(new EventType { Type = "B" });
        Event(new EventType { Type = "B" });

        Console.ReadLine();
    }

    static void OnAggregaredEvent(AggregatedType aggregated)
    {
        Console.WriteLine("Type: {0}, Count: {1}", aggregated.EventType, aggregated.Count);
    }

    static void OnException(Exception ex)
    {
    }

    static void OnCompleted()
    {
    }
}
like image 169
Oliver Avatar answered Oct 27 '22 00:10

Oliver