Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Synchronization issue in multiple event subscriptions and getting latest snapshot

Tags:

c#

events

What's the best way to fix the below synchronization issue by enhancing OrderManager? OrderForm needs to get the latest list of orders and trades and subscribe to those events while OrderManager generates order and trade by another thread.

public class OrderManager
{
    public event EventHandler<OrderEventArgs> OrderAdded;
    public event EventHandler<OrderEventArgs> OrderUpdated;
    public event EventHandler<OrderEventArgs> OrderDeleted;
    public event EventHandler<TradeEventArgs> TradeAdded;

    public List<Order> Orders { get; private set; }
    public List<Trade> Trades { get; private set; }
    ...
}

public class OrderForm
{
    public OrderForm(OrderManager manager)
    {
        manager.OrderAdded += manager_OrderAdded;
        manager.OrderUpdated += manager_OrderUpdated;
        manager.OrderDeleted += manager_OrderDeleted;
        manager.TradeAdded += manager_TradeAdded;

        Populate(manager.Orders);
        Populate(manager.Trades);
    }
    ...
}

Should I remove event pattern and implement like this? Any other better way?

public class OrderListener
{
    public Action<Order> OrderAdded { get; set; }
    public Action<Order> OrderUpdated { get; set; }
    public Action<Order> OrderDeleted { get; set; }
    public Action<Trade> TradeAdded { get; set; }
}

public class OrderManager
{
    ...
    List<Order> orders;
    List<Trade> trades;
    List<OrderListener> listeners;

    public IDisposable Subscribe(OrderListener listener)
    {
        lock (orderTradeLock)
        {
            listeners.Add(listener);
            orders.ForEach(listener.OrderAdded);
            trades.ForEach(listener.TradeAdded);
            // Allow caller to dispose the return object to unsubscribe.
            return Disposable.Create(() => { lock (orderTradeLock) { listeners.Remove(listener); } });
        }
    }

    void OnOrderAdded(Order order)
    {
        lock (orderTradeLock)
        {
            orders.Add(order);
            listeners.ForEach(x => x.OrderAdded(order));
        }
    }

    void OnTradeAdded(Trade trade)
    {
        lock (orderTradeLock)
        {
            trades.Add(trade);
            listeners.ForEach(x => x.TradeAdded(trade));
        }
    }
    ...
}

public class OrderForm
{
    IDisposable subscriptionToken;
    public OrderForm(OrderManager manager)
    {
        subscriptionToken = manager.Subscribe(new OrderListener
        {
           OrderAdded = manager_OrderAdded;
           OrderUpdated = manager_OrderUpdated;
           OrderDeleted = manager_OrderDeleted;
           TradeAdded = manager_TradeAdded;
        }
    }
    ...
}
like image 858
alex Avatar asked Sep 12 '14 01:09

alex


1 Answers

There are couple of options for the scenario that you mentioned. I'll try to go through them:

  1. The better way to solve your problem is:

Avoid using events in a concurrent scenario, if you consider a multi-threaded situation there is no perfect solution for a .NET event.

Especially if you have a high level of concurrency among the consumers/subscribers (high number of threads adding/removing delegates), You can check more information in this article: http://www.codeproject.com/Articles/37474/Threadsafe-Events

By the day I wrote an article a few days ago that has everything that you need to know about an event in .NET. In addition some solutions to solve the problems/limitations, how to be simple and how to write a good code: http://www.codeproject.com/Articles/864690/Simplifying-Events-in-NET

  1. Introducing a synchronization mechanism

You just need to use the same synchronization mechanism for each event or for all of them, can be a ManualResetEventSlim, or a Semaphore, or a lock, among the producers (who will raise the event) and consumers (who will add/remove the delegates) you can guarantee that you won't lose information.

The biggest problem in this solution is about raising the event. If you set the invocation of the delegates inside the synchronization mechanism it would be possible that occurs a deadlock or a poor performance because the responsibility will be focused on what the subscribers/consumers delegates will do, which in general is a bad design.

  1. Using Reactive Extensions

In this case with Rx, you could use the same Scheduler to subscribe and to raise the event, which introduces synchronization, in this part is important to change the Scheduler in the execution to avoid the same problems that I mentioned about using a manual synchronization mechanism. For example:

static EventHandler _handler;
static EventLoopScheduler _eventLoopScheduler = new EventLoopScheduler(); // synchronization for the consumers/subscribers and producers

static ManualResetEventSlim _finish = new ManualResetEventSlim(); // just to check when this test end

static int samples = 500; // number of times that will raise the event
static int count = 0; // number of times that the event was received by the subscriber
static void Main()
{
    var subscription = Observable.FromEventPattern(add => _handler += add, rem => _handler -= rem) // creating event composition
              .SubscribeOn(_eventLoopScheduler) // used to introduce synchronization when someone call subscribe in the composition
              //.ObserveOn(Scheduler.Default).Do(arg => DoSomething()) // Used to change the thread to let the _eventScheduler free for new consumers/subscribers
              .SelectMany(arg => Observable.FromAsync(a => Task.Run(() => DoSomething()))) // Used to change the thread to let the _eventScheduler free.... Used to introduce concurrency in the method execution
              .Subscribe(); // subscribe to receive the event

    Parallel.For(0, samples, new ParallelOptions{ MaxDegreeOfParallelism = 4 }, a =>
    {
        Console.WriteLine(a.ToString("D4") + " | Trying to raise the event in thread:" + Thread.CurrentThread.ManagedThreadId);
        _eventLoopScheduler.Schedule(() => 
        {
            Console.WriteLine(a.ToString("D4") + " | Raising event in thread:" + Thread.CurrentThread.ManagedThreadId);
            _handler(null, EventArgs.Empty);
        });
    });

    _finish.Wait();
    subscription.Dispose();

    _eventLoopScheduler.Dispose();
    Console.WriteLine("Completed");

}

static void DoSomething()
{
    //var current = count++; // use this code ONLY if you are not introducing concurrency (which is without wrap the execution in another thread)
    var current  = Interlocked.Increment(ref count); // to synchronize the count value, in this case it is necessary if you have the execution in multiple threads (such as using Task.Run or ThreadPool)

    var random = new Random(current);

    Console.WriteLine(current.ToString("D4") + " | Doing Something in thread:" + Thread.CurrentThread.ManagedThreadId);
    Thread.Sleep(random.Next(0,500)); // Simulate some Process

    if (current == samples)
    {
        _finish.Set();
    }
}

In this example above I just used one subscriber, but you will be perfectly fine with more than one. I know sometimes it is not so easy to understand because multithreading is a complicated thing, but feel free to add information or to ask questions.

like image 102
J. Lennon Avatar answered Nov 14 '22 22:11

J. Lennon