Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Design pattern about awaiting several tasks at the same time

Here is my approach. I have several IEventProviders,

interface IEventProvider
{
    Task<Event> GetEvent();
}

Then I got a container class to wrap them, and keep calling and awaiting the GetEvent() to wait for next Event, e.g. socket async receiving, timer ticks, etc.

class EventProviderContainer : IEventProvider
{
    private IEventProvider[] _providers;
    private Task<Event>[] _tasks;

    public EventProviderContainer(params IEventProvider[] providers)
    {
        _providers = providers;
    }

    public async Task<Event> GetEvent()
    {
        // Fill the _tasks first time we call the method.
        if (_tasks == null)
            _tasks = (from p in _providers select p.GetEvent()).ToArray();

        Task<Event> task = await Task<Event>.WhenAny(_tasks);

        // Get the provider index whose previous task is done.
        int index = Array.IndexOf(_tasks, task);
        // put next event of the provider into array.
        _tasks[index] = _providers[index].GetEvent();

        return await task;
    }
}

I think it is a bit of ugly. Is it a better way to do it?

like image 620
Logan Avatar asked Feb 18 '26 08:02

Logan


2 Answers

For a task that is actually not that straightforward, your code is quite short and understandable and personally I don't think it's ugly.

I don't think you're going to find a significantly better way to write this code, unless you want to change your whole interface. The only thing I would change is to move initializing _tasks to the constructor (but maybe you have a reason for that).

But I agree with Stephen's comment that for events, using “push” semantics is usually more appropriate than “pull”. And for that, Rx (IObservable<Event>) or TPL Dataflow (ISourceBlock<Event>) would be very useful. In both cases, writing EventProviderContainer would be relatively simple. Which one of the two is the better choice depends on how are you going to work with the results.

like image 163
svick Avatar answered Feb 19 '26 22:02

svick


If you want one event for each provider at a time then I recommend you check out the Processing tasks as they complete MSDN article which includes an Interleaved method. This method takes a collection of tasks and returns a new array of tasks that will yield in order of completion.

On the other hand, if you want to continuously receive events from each provider as they arrive, then I recommend you look at the Reactive Extensions (Rx) project from Microsoft.

Using Rx, your event provider interface would become something like:

public interface IEventProvider
{
    IObservable<Event> OnEvent();
}

Your container provider would then use the Observable.Merge extension method to combine the events of each child provider.

return _providers.Select(provider => provider.OnEvent()).Merge();

To actually receive the events, you subscribe to the observable by attaching a callback delegate that's executed each time a new event is available.

var provider = new EventProviderContainer(
    new TestEventProvider("a", 1000),
    new TestEventProvider("b", 1300),
    new TestEventProvider("c", 1600));
provider.OnEvent().Subscribe(Console.WriteLine);
Console.ReadLine();

The above example uses a test event provider that returns a continuous stream of events at the given period in milliseconds using the Observable.Timer extension method.

return Observable.Timer(TimeSpan.Zero, TimeSpan.FromMilliseconds(_period))
                 .Select(i => new TestEvent(_name, i));
like image 43
Nathan Baulch Avatar answered Feb 19 '26 23:02

Nathan Baulch