Here is my approach. I have several IEventProviders,
interface IEventProvider
{
Task<Event> GetEvent();
}
Then I got a container class to wrap them, and keep calling and awaiting the GetEvent() to wait for next Event, e.g. socket async receiving, timer ticks, etc.
class EventProviderContainer : IEventProvider
{
private IEventProvider[] _providers;
private Task<Event>[] _tasks;
public EventProviderContainer(params IEventProvider[] providers)
{
_providers = providers;
}
public async Task<Event> GetEvent()
{
// Fill the _tasks first time we call the method.
if (_tasks == null)
_tasks = (from p in _providers select p.GetEvent()).ToArray();
Task<Event> task = await Task<Event>.WhenAny(_tasks);
// Get the provider index whose previous task is done.
int index = Array.IndexOf(_tasks, task);
// put next event of the provider into array.
_tasks[index] = _providers[index].GetEvent();
return await task;
}
}
I think it is a bit of ugly. Is it a better way to do it?
For a task that is actually not that straightforward, your code is quite short and understandable and personally I don't think it's ugly.
I don't think you're going to find a significantly better way to write this code, unless you want to change your whole interface. The only thing I would change is to move initializing _tasks to the constructor (but maybe you have a reason for that).
But I agree with Stephen's comment that for events, using “push” semantics is usually more appropriate than “pull”. And for that, Rx (IObservable<Event>) or TPL Dataflow (ISourceBlock<Event>) would be very useful. In both cases, writing EventProviderContainer would be relatively simple. Which one of the two is the better choice depends on how are you going to work with the results.
If you want one event for each provider at a time then I recommend you check out the Processing tasks as they complete MSDN article which includes an Interleaved method. This method takes a collection of tasks and returns a new array of tasks that will yield in order of completion.
On the other hand, if you want to continuously receive events from each provider as they arrive, then I recommend you look at the Reactive Extensions (Rx) project from Microsoft.
Using Rx, your event provider interface would become something like:
public interface IEventProvider
{
IObservable<Event> OnEvent();
}
Your container provider would then use the Observable.Merge extension method to combine the events of each child provider.
return _providers.Select(provider => provider.OnEvent()).Merge();
To actually receive the events, you subscribe to the observable by attaching a callback delegate that's executed each time a new event is available.
var provider = new EventProviderContainer(
new TestEventProvider("a", 1000),
new TestEventProvider("b", 1300),
new TestEventProvider("c", 1600));
provider.OnEvent().Subscribe(Console.WriteLine);
Console.ReadLine();
The above example uses a test event provider that returns a continuous stream of events at the given period in milliseconds using the Observable.Timer extension method.
return Observable.Timer(TimeSpan.Zero, TimeSpan.FromMilliseconds(_period))
.Select(i => new TestEvent(_name, i));
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With