Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Rising events without blocking and receiving events in the right order

This required some explanation first. There is a working thread which has to rise some event:

Task.Run(() =>
{
    for(int i = 0; i < 123456789; i++)
    {
        ... // some job
        OnSomeEvent(i);
    }
});

Rising events synchronously will block the job until all event handlers have finished:

void OnSomeEvent(int i) => SomeEvent?.Invoke(this, new SomeEventArgs(i));

Asynchronous event rising will not block the job anymore (yay!)

void OnSomeEvent(int i) => Task.Run(() => SomeEvent?.Invoke(this, new SomeEventArgs(i)));

but now there is another problem: events are not received in the right order:

OnSomeEvent(1);
OnSomeEvent(2);
OnSomeEvent(3);
...

// event handler
SomeEvent += (s, e) => Console.WriteLine(e.I);

// possible output
1
3
2

Question: how to implement asynchronous events rising which occurs in the right order?

Recently I learned what Dispatcher.InvokeAsync uses queue. It looks like I have to do something similar. And if I have to then: 1) should it be a job of a caller or 2) should I keep rising events synchronously and receiver has to organize producer/consumer to prevent job from blocking? Or maybe there is another way?

P.S.: this has nothing to do with ContinueWhith.. unless storing list of tasks is a proper solution. My concern is how to implement fire-and-forget events where: a) caller is not blocked 2) events are received in same order.

P.P.S.: I don't know how to make MCVE to reproduce the problem. It appears in real project with heavy UI, lots of threads, etc.

like image 685
Sinatr Avatar asked Jan 07 '23 10:01

Sinatr


2 Answers

You can use the following TaskQueue in order to add asynchronous operations to a queue so that each one is started when the previous item in the queue finishes:

public class TaskQueue
{
    private Task previous = Task.FromResult(false);
    private object key = new object();

    public Task<T> Enqueue<T>(Func<Task<T>> taskGenerator)
    {
        lock (key)
        {
            var next = previous.ContinueWith(t => taskGenerator()).Unwrap();
            previous = next;
            return next;
        }
    }
    public Task Enqueue(Func<Task> taskGenerator)
    {
        lock (key)
        {
            var next = previous.ContinueWith(t => taskGenerator()).Unwrap();
            previous = next;
            return next;
        }
    }
}

This allows you to write:

private TaskQueue taskQueue = new TaskQueue();
private void OnSomeEvent(int i) => 
    taskQueue.Enqueue(() => Task.Run(() => SomeEvent?.Invoke(this, new SomeEventArgs(i))));
like image 92
Servy Avatar answered Jan 15 '23 01:01

Servy


You could use the ActionBlock queue from the TPL Dataflow to maintain a queue of events.

You would create the queue as follows:

queue = new ActionBlock<SomeEventArgs>(item => SomeEvent?.Invoke(item));

Then you would add events to the queue like so:

queue.Post(new SomeEventArgs(value));

When the queue is no longer needed you do this:

queue.Complete();

After that, if you need to wait until any items still in the queue are processed, you can do this:

queue.Completion.Wait();

However, note that queue.Completion is in fact a Task so you would often use it with await.

Here's a complete example showing one approach (which doesn't keep a thread alive all the time just to process the event queue):

using System;
using System.Threading;
using System.Threading.Tasks.Dataflow;

namespace Demo
{
    public class SomeEventArgs : EventArgs
    {
        public SomeEventArgs(int value)
        {
            Value = value;
        }

        public int Value { get; }
    }

    internal class Program
    {
        public delegate void SomeEventHandler(SomeEventArgs e);

        public event SomeEventHandler SomeEvent;

        ActionBlock<SomeEventArgs> queue;

        private void run()
        {
            queue = new ActionBlock<SomeEventArgs>(item => SomeEvent?.Invoke(item));

            // Subscribe to my own event (this just for demonstration purposes!)

            this.SomeEvent += Program_SomeEvent;

            // Raise 100 events.

            for (int i = 0; i < 100; ++i)
            {
                OnSomeEvent(i);
                Console.WriteLine("Raised event " + i);
            }

            Console.WriteLine("Signalling that queue is complete.");
            queue.Complete();

            Console.WriteLine("Waiting for queue to be processed.");
            queue.Completion.Wait();

            Console.WriteLine("Done.");
        }

        private void Program_SomeEvent(SomeEventArgs e)
        {
            Console.WriteLine("Handled " + e.Value);
            Thread.Sleep(1); // Simulate load.
        }

        private void OnSomeEvent(int value)
        {
            queue.Post(new SomeEventArgs(value));
        }

        private static void Main()
        {
            new Program().run();
        }
    }
}
like image 33
Matthew Watson Avatar answered Jan 15 '23 00:01

Matthew Watson