Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

parallel event subscriber .net 4.5

I am trying to create a parallel event subscriber. This is my first attempt:

using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using EventStore.ClientAPI;

namespace Sandbox
{
    public class SomeEventSubscriber
    {
        private Position? _latestPosition;
        private readonly Dictionary<Type, Action<object>> _eventHandlerMapping;
        private IEventStoreConnection _connection;

        public Dictionary<Type, Action<object>> EventHandlerMapping
        {
            get { return _eventHandlerMapping; }
        }

        public SomeEventSubscriber()
        {
            _eventHandlerMapping = CreateEventHandlerMapping();
            _latestPosition = Position.Start;
        }

        public void Start()
        {
            ConnectToEventstore();
        }

        private void ConnectToEventstore()
        {
            _connection = EventStoreConnectionWrapper.Connect();
            _connection.Connected +=
            (sender, args) => _connection.SubscribeToAllFrom(_latestPosition, false, EventOccured, LiveProcessingStarted, HandleSubscriptionDropped);
        }

        private Dictionary<Type, Action<object>> CreateEventHandlerMapping()
        {
            return new Dictionary<Type, Action<object>>
            {
                {typeof (FakeEvent1), o => Handle(o as FakeEvent1)},
                {typeof (FakeEvent2), o => Handle(o as FakeEvent2)},
            };
        }

        private async Task Handle(FakeEvent1 eventToHandle)
        {
            SomethingLongRunning(eventToHandle);
        }

        private async Task Handle(FakeEvent2 eventToHandle)
        {
            SomethingLongRunning(eventToHandle);
        }

        private async Task SomethingLongRunning(BaseFakeEvent eventToHandle)
        {
            Console.WriteLine("Start Handling: " + eventToHandle.GetType());
            var task = Task.Delay(10000);
            await task;
            Console.WriteLine("Finished Handling: " + eventToHandle.GetType());
        }

        private void EventOccured(EventStoreCatchUpSubscription eventStoreCatchUpSubscription,
            ResolvedEvent resolvedEvent)
        {
            if (resolvedEvent.OriginalEvent.EventType.StartsWith("$") || resolvedEvent.OriginalEvent.EventStreamId.StartsWith("$"))
                return;

            var @event = EventSerialization.DeserializeEvent(resolvedEvent.OriginalEvent);
            if (@event != null)
            {
                var eventType = @event.GetType();
                if (_eventHandlerMapping.ContainsKey(eventType))
                {
                    var task = Task.Factory.StartNew(() => _eventHandlerMapping[eventType](event));
                    Console.WriteLine("The task is running asynchronously...");
                }
            }
            if (resolvedEvent.OriginalPosition != null) _latestPosition = resolvedEvent.OriginalPosition.Value;
        }

        private void HandleSubscriptionDropped(EventStoreCatchUpSubscription subscription, SubscriptionDropReason dropReason, Exception ex)
        {
            if (dropReason == SubscriptionDropReason.ProcessingQueueOverflow)
            {
                //TODO: Wait and reconnect probably with back off
            }

            if (dropReason == SubscriptionDropReason.UserInitiated)
                return;

            if (SubscriptionDropMayBeRecoverable(dropReason))
            {
                Start();
            }
        }

        private static bool SubscriptionDropMayBeRecoverable(SubscriptionDropReason dropReason)
        {
            return dropReason == SubscriptionDropReason.Unknown || dropReason == SubscriptionDropReason.SubscribingError ||
                   dropReason == SubscriptionDropReason.ServerError || dropReason == SubscriptionDropReason.ConnectionClosed;
        }

        private static void LiveProcessingStarted(EventStoreCatchUpSubscription eventStoreCatchUpSubscription)
        {

        }
    }
}

In your expert opinion, is this a valid approach? Could you please suggest any improvements?

PS:

Maybe:

Task.Run(() => _eventHandlerMapping[eventType](@event));

would be better?

like image 260
cs0815 Avatar asked Dec 19 '14 10:12

cs0815


2 Answers

You have a single EventOccured delegate which is where you'd be notified for all events that occur in the EventStore
First consider running the pre code inside EventOccured in a different dispatcher than the one on which the events are fired.
Secondly, would it be possible for you change this to a abstract class with implementation for FakeEventBase and then extend it and create individual instances for each FakeEvent type. That would be much cleaner solution.
Thirdly, consider have a custom ThreadScheduler for queuing and running these Handle tasks. http://msdn.microsoft.com/en-us/library/system.threading.tasks.taskscheduler(v=vs.110).aspx

EDIT:
I would have a broadcaster class like below which knows when the operation has completed and raises the finished event.

public class EventBroadcaster
{
    public event EventHandler SomeEventOccured;

    public async void DoLongRunningOperationAndRaiseFinishedEvent()
    {
        var waitingTask = Task.Delay(TimeSpan.FromSeconds(2));

        await waitingTask.ContinueWith(t => RaiseSomeEventOccured(), CancellationToken.None,
            TaskContinuationOptions.OnlyOnRanToCompletion, TaskScheduler.Current);
    }

    private void RaiseSomeEventOccured()
    {
        EventHandler handler = SomeEventOccured;
        if (handler != null) handler(this, EventArgs.Empty);
    }
}

and then an EventListener

public class EventListner
{
    private readonly string _id;

    public EventListner(string id)
    {
        _id = id;
    }

    public void ListenTo(EventBroadcaster broadcaster)
    {
        broadcaster.SomeEventOccured += OnSomeEventOccured;
    }

    private async void OnSomeEventOccured(object sender, EventArgs eventArgs)
    {
        var currentTime = DateTime.Now;
        Console.WriteLine("EventListner {0} received at {1}", _id,
            currentTime.ToString("dd-MM-yyyy HH:mm:ss.fffffff"));

        //Not required just to show this does not affect other instances.
        //await Task.Delay(TimeSpan.FromSeconds(5));
    }
}

then this would be the Program.cs for testing

public static class Program
{
    public static void Main(string[] args)
    {
        var broadcaster = new EventBroadcaster();

        var listners = new List<EventListner>();

        for (int i = 1; i < 10; i++)
        {
            var listner = new EventListner(i.ToString(CultureInfo.InvariantCulture));
            listner.ListenTo(broadcaster);
            listners.Add(listner);
        }

        broadcaster.DoLongRunningOperationAndRaiseFinishedEvent();

        Console.WriteLine("Waiting for operation to complete");

        Console.ReadLine();

    }
}

In this example the handler delegates are fired one by one in the order they were subscribed to.

Now modify the code in the Broadcaster to something like below Note:I have changed method signature from EventHandler to Action for ease of coding.

    private void RaiseSomeEventOccured()
    {
        Action handler = SomeEventOccured;
        if (handler != null)
        {
            var parallelOption = new ParallelOptions() { MaxDegreeOfParallelism = Environment.ProcessorCount };
            Parallel.Invoke(parallelOption, Array.ConvertAll(handler.GetInvocationList(), ConvertToAction));
            handler();
        }
    }

    private Action ConvertToAction(Delegate del)
    {
        return (Action)del;
    }

Now you will see that the events are fired in random order.
I got better performance using option 1.
Note: always with TPL and Parallel programming you need to make sure there is a benefit before going for it.

like image 167
Vignesh.N Avatar answered Oct 07 '22 08:10

Vignesh.N


I don't really see a point in creating parallel event subscriber (if I understand your intention correct - to be able to run event handlers in parallel, not one after another as with normal events).

It's much more clear to express intent to run in parallel if event handler itself shows it.

Something like (very primitive).

void SomeEventHandler(object sender, EventArgs e)
{
    Task.Run(() =>
    {
       ... // some code to run in parallel
    });
}

You may want to create sort of manager (honestly, I have no clue how to occupy all cores, but I don't think this is complicated, I just never have to do it), but please, keep normal events.

like image 1
Sinatr Avatar answered Oct 07 '22 06:10

Sinatr