I am trying to create a parallel event subscriber. This is my first attempt:
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using EventStore.ClientAPI;
namespace Sandbox
{
public class SomeEventSubscriber
{
private Position? _latestPosition;
private readonly Dictionary<Type, Action<object>> _eventHandlerMapping;
private IEventStoreConnection _connection;
public Dictionary<Type, Action<object>> EventHandlerMapping
{
get { return _eventHandlerMapping; }
}
public SomeEventSubscriber()
{
_eventHandlerMapping = CreateEventHandlerMapping();
_latestPosition = Position.Start;
}
public void Start()
{
ConnectToEventstore();
}
private void ConnectToEventstore()
{
_connection = EventStoreConnectionWrapper.Connect();
_connection.Connected +=
(sender, args) => _connection.SubscribeToAllFrom(_latestPosition, false, EventOccured, LiveProcessingStarted, HandleSubscriptionDropped);
}
private Dictionary<Type, Action<object>> CreateEventHandlerMapping()
{
return new Dictionary<Type, Action<object>>
{
{typeof (FakeEvent1), o => Handle(o as FakeEvent1)},
{typeof (FakeEvent2), o => Handle(o as FakeEvent2)},
};
}
private async Task Handle(FakeEvent1 eventToHandle)
{
SomethingLongRunning(eventToHandle);
}
private async Task Handle(FakeEvent2 eventToHandle)
{
SomethingLongRunning(eventToHandle);
}
private async Task SomethingLongRunning(BaseFakeEvent eventToHandle)
{
Console.WriteLine("Start Handling: " + eventToHandle.GetType());
var task = Task.Delay(10000);
await task;
Console.WriteLine("Finished Handling: " + eventToHandle.GetType());
}
private void EventOccured(EventStoreCatchUpSubscription eventStoreCatchUpSubscription,
ResolvedEvent resolvedEvent)
{
if (resolvedEvent.OriginalEvent.EventType.StartsWith("$") || resolvedEvent.OriginalEvent.EventStreamId.StartsWith("$"))
return;
var @event = EventSerialization.DeserializeEvent(resolvedEvent.OriginalEvent);
if (@event != null)
{
var eventType = @event.GetType();
if (_eventHandlerMapping.ContainsKey(eventType))
{
var task = Task.Factory.StartNew(() => _eventHandlerMapping[eventType](event));
Console.WriteLine("The task is running asynchronously...");
}
}
if (resolvedEvent.OriginalPosition != null) _latestPosition = resolvedEvent.OriginalPosition.Value;
}
private void HandleSubscriptionDropped(EventStoreCatchUpSubscription subscription, SubscriptionDropReason dropReason, Exception ex)
{
if (dropReason == SubscriptionDropReason.ProcessingQueueOverflow)
{
//TODO: Wait and reconnect probably with back off
}
if (dropReason == SubscriptionDropReason.UserInitiated)
return;
if (SubscriptionDropMayBeRecoverable(dropReason))
{
Start();
}
}
private static bool SubscriptionDropMayBeRecoverable(SubscriptionDropReason dropReason)
{
return dropReason == SubscriptionDropReason.Unknown || dropReason == SubscriptionDropReason.SubscribingError ||
dropReason == SubscriptionDropReason.ServerError || dropReason == SubscriptionDropReason.ConnectionClosed;
}
private static void LiveProcessingStarted(EventStoreCatchUpSubscription eventStoreCatchUpSubscription)
{
}
}
}
In your expert opinion, is this a valid approach? Could you please suggest any improvements?
PS:
Maybe:
Task.Run(() => _eventHandlerMapping[eventType](@event));
would be better?
You have a single EventOccured
delegate which is where you'd be notified for all events that occur in the EventStore
First consider running the pre code inside EventOccured
in a different dispatcher than the one on which the events are fired.
Secondly, would it be possible for you change this to a abstract class
with implementation for FakeEventBase
and then extend it and create individual instances for each FakeEvent
type.
That would be much cleaner solution.
Thirdly, consider have a custom ThreadScheduler
for queuing and running these Handle
tasks.
http://msdn.microsoft.com/en-us/library/system.threading.tasks.taskscheduler(v=vs.110).aspx
EDIT:
I would have a broadcaster class like below which knows when the operation has completed and raises the finished event.
public class EventBroadcaster
{
public event EventHandler SomeEventOccured;
public async void DoLongRunningOperationAndRaiseFinishedEvent()
{
var waitingTask = Task.Delay(TimeSpan.FromSeconds(2));
await waitingTask.ContinueWith(t => RaiseSomeEventOccured(), CancellationToken.None,
TaskContinuationOptions.OnlyOnRanToCompletion, TaskScheduler.Current);
}
private void RaiseSomeEventOccured()
{
EventHandler handler = SomeEventOccured;
if (handler != null) handler(this, EventArgs.Empty);
}
}
and then an EventListener
public class EventListner
{
private readonly string _id;
public EventListner(string id)
{
_id = id;
}
public void ListenTo(EventBroadcaster broadcaster)
{
broadcaster.SomeEventOccured += OnSomeEventOccured;
}
private async void OnSomeEventOccured(object sender, EventArgs eventArgs)
{
var currentTime = DateTime.Now;
Console.WriteLine("EventListner {0} received at {1}", _id,
currentTime.ToString("dd-MM-yyyy HH:mm:ss.fffffff"));
//Not required just to show this does not affect other instances.
//await Task.Delay(TimeSpan.FromSeconds(5));
}
}
then this would be the Program.cs for testing
public static class Program
{
public static void Main(string[] args)
{
var broadcaster = new EventBroadcaster();
var listners = new List<EventListner>();
for (int i = 1; i < 10; i++)
{
var listner = new EventListner(i.ToString(CultureInfo.InvariantCulture));
listner.ListenTo(broadcaster);
listners.Add(listner);
}
broadcaster.DoLongRunningOperationAndRaiseFinishedEvent();
Console.WriteLine("Waiting for operation to complete");
Console.ReadLine();
}
}
In this example the handler delegates are fired one by one in the order they were subscribed to.
Now modify the code in the Broadcaster to something like below
Note:I have changed method signature from EventHandler
to Action
for ease of coding.
private void RaiseSomeEventOccured()
{
Action handler = SomeEventOccured;
if (handler != null)
{
var parallelOption = new ParallelOptions() { MaxDegreeOfParallelism = Environment.ProcessorCount };
Parallel.Invoke(parallelOption, Array.ConvertAll(handler.GetInvocationList(), ConvertToAction));
handler();
}
}
private Action ConvertToAction(Delegate del)
{
return (Action)del;
}
Now you will see that the events are fired in random order.
I got better performance using option 1.
Note: always with TPL
and Parallel
programming you need to make sure there is a benefit before going for it.
I don't really see a point in creating parallel event subscriber (if I understand your intention correct - to be able to run event handlers in parallel, not one after another as with normal events).
It's much more clear to express intent to run in parallel if event handler itself shows it.
Something like (very primitive).
void SomeEventHandler(object sender, EventArgs e)
{
Task.Run(() =>
{
... // some code to run in parallel
});
}
You may want to create sort of manager (honestly, I have no clue how to occupy all cores, but I don't think this is complicated, I just never have to do it), but please, keep normal events.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With