So my requirement is to have my function wait for the first instance an event Action<T>
coming from another class and another thread, and handle it on my thread, allowing the wait to be interrupted by either timeout or CancellationToken
.
I want to create a generic function I can reuse. I managed to create a couple options that do (I think) what I need, but both seem more complicated than I'd imagine it should have to be.
Just to be clear, a sample use of this function would look like this, where serialDevice
is spitting out events on a separate thread:
var eventOccurred = Helper.WaitForSingleEvent<StatusPacket>(
cancellationToken,
statusPacket => OnStatusPacketReceived(statusPacket),
a => serialDevice.StatusPacketReceived += a,
a => serialDevice.StatusPacketReceived -= a,
5000,
() => serialDevice.RequestStatusPacket());
This option isn't bad, but the Dispose
handling of the ManualResetEventSlim
is messier than it seems like it should be. It gives ReSharper fits that I'm accessing modified/disposed things within the closure, and it's genuinely hard to follow so I'm not even sure it's correct. Maybe there's something I'm missing that can clean this up, which would be my preference, but I don't see it offhand. Here's the code.
public static bool WaitForSingleEvent<TEvent>(this CancellationToken token, Action<TEvent> handler, Action<Action<TEvent>> subscribe, Action<Action<TEvent>> unsubscribe, int msTimeout, Action initializer = null)
{
var eventOccurred = false;
var eventResult = default(TEvent);
var o = new object();
var slim = new ManualResetEventSlim();
Action<TEvent> setResult = result =>
{
lock (o) // ensures we get the first event only
{
if (!eventOccurred)
{
eventResult = result;
eventOccurred = true;
// ReSharper disable AccessToModifiedClosure
// ReSharper disable AccessToDisposedClosure
if (slim != null)
{
slim.Set();
}
// ReSharper restore AccessToDisposedClosure
// ReSharper restore AccessToModifiedClosure
}
}
};
subscribe(setResult);
try
{
if (initializer != null)
{
initializer();
}
slim.Wait(msTimeout, token);
}
finally // ensures unsubscription in case of exception
{
unsubscribe(setResult);
lock(o) // ensure we don't access slim
{
slim.Dispose();
slim = null;
}
}
lock (o) // ensures our variables don't get changed in middle of things
{
if (eventOccurred)
{
handler(eventResult);
}
return eventOccurred;
}
}
WaitHandle
The WaitForSingleEvent
function here is much cleaner. I'm able to use ConcurrentQueue
and thus don't even need a lock. But I just don't like the polling function Sleep
, and I don't see any way around it with this approach. I'd like to pass in a WaitHandle
instead of a Func<bool>
to clean up Sleep
, but the second I do that I've got the whole Dispose
mess to clean up again.
public static bool WaitForSingleEvent<TEvent>(this CancellationToken token, Action<TEvent> handler, Action<Action<TEvent>> subscribe, Action<Action<TEvent>> unsubscribe, int msTimeout, Action initializer = null)
{
var q = new ConcurrentQueue<TEvent>();
subscribe(q.Enqueue);
try
{
if (initializer != null)
{
initializer();
}
token.Sleep(msTimeout, () => !q.IsEmpty);
}
finally // ensures unsubscription in case of exception
{
unsubscribe(q.Enqueue);
}
TEvent eventResult;
var eventOccurred = q.TryDequeue(out eventResult);
if (eventOccurred)
{
handler(eventResult);
}
return eventOccurred;
}
public static void Sleep(this CancellationToken token, int ms, Func<bool> exitCondition)
{
var start = DateTime.Now;
while ((DateTime.Now - start).TotalMilliseconds < ms && !exitCondition())
{
token.ThrowIfCancellationRequested();
Thread.Sleep(1);
}
}
I don't particularly care for either of these solutions, nor am I 100% sure either of them are 100% correct. Is either one of these solutions better than the other (idiomaticity, efficiency, etc), or is there an easier way or built-in function to meet what I need to do here?
A modification of the TaskCompletionSource
solution below. No long closures, locks, or anything required. Seems pretty straightforward. Any errors here?
public static bool WaitForSingleEvent<TEvent>(this CancellationToken token, Action<TEvent> onEvent, Action<Action<TEvent>> subscribe, Action<Action<TEvent>> unsubscribe, int msTimeout, Action initializer = null)
{
var tcs = new TaskCompletionSource<TEvent>();
Action<TEvent> handler = result => tcs.TrySetResult(result);
var task = tcs.Task;
subscribe(handler);
try
{
if (initializer != null)
{
initializer();
}
task.Wait(msTimeout, token);
}
finally
{
unsubscribe(handler);
// Do not dispose task http://blogs.msdn.com/b/pfxteam/archive/2012/03/25/10287435.aspx
}
if (task.Status == TaskStatus.RanToCompletion)
{
onEvent(task.Result);
return true;
}
return false;
}
Turns out that BlockingCollection
works just like ConcurrentQueue
but also has methods accepting a timeout and cancellation token. One nice thing about this solution is that it can be updated to make a WaitForNEvents
fairly easily:
public static bool WaitForSingleEvent<TEvent>(this CancellationToken token, Action<TEvent> handler, Action<Action<TEvent>> subscribe, Action<Action<TEvent>> unsubscribe, int msTimeout, Action initializer = null)
{
var q = new BlockingCollection<TEvent>();
Action<TEvent> add = item => q.TryAdd(item);
subscribe(add);
try
{
if (initializer != null)
{
initializer();
}
TEvent eventResult;
if (q.TryTake(out eventResult, msTimeout, token))
{
handler(eventResult);
return true;
}
return false;
}
finally
{
unsubscribe(add);
q.Dispose();
}
}
The thread that needs to process first just takes the resetEvent, and waits until the end to Set the event. The thread that needs to wait can hold a handle to it, and call resetEvent. WaitOne(). This will block that thread until the first completes.
Delegates are the library class in System namespace. These are the type-safe pointer of any method. Delegates are mainly used in implementing the call-back methods and events. Delegates can be chained together as two or more methods can be called on a single event.
You can use TaskCompletetionSource
to create a Task
that you can mark as completed or cancelled. Here's a possible implementation for a specific event:
public Task WaitFirstMyEvent(Foo target, CancellationToken cancellationToken)
{
var tcs = new TaskCompletionSource<object>();
Action handler = null;
var registration = cancellationToken.Register(() =>
{
target.MyEvent -= handler;
tcs.TrySetCanceled();
});
handler = () =>
{
target.MyEvent -= handler;
registration.Dispose();
tcs.TrySetResult(null);
};
target.MyEvent += handler;
return tcs.Task;
}
In C# 5 you can use it like this:
private async Task MyMethod()
{
...
await WaitFirstMyEvent(foo, cancellationToken);
...
}
If you want to wait for the event synchronously, you can also use the Wait
method:
private void MyMethod()
{
...
WaitFirstMyEvent(foo, cancellationToken).Wait();
...
}
Here's a more generic version, but it still works only for events with Action
signature:
public Task WaitFirstEvent(
Action<Action> subscribe,
Action<Action> unsubscribe,
CancellationToken cancellationToken)
{
var tcs = new TaskCompletionSource<object>();
Action handler = null;
var registration = cancellationToken.Register(() =>
{
unsubscribe(handler);
tcs.TrySetCanceled();
});
handler = () =>
{
unsubscribe(handler);
registration.Dispose();
tcs.TrySetResult(null);
};
subscribe(handler);
return tcs.Task;
}
You can use it like this:
await WaitFirstEvent(
handler => foo.MyEvent += handler,
handler => foo.MyEvent -= handler,
cancellationToken);
If you want it to work with other event signatures (e.g. EventHandler
), you will have to create separate overloads. I don't think there's an easy way to make it work for any signature, especially since the number of parameters isn't always the same.
You can use Rx to convert the event to an observable, then to a task, and finally wait on that task with your token/timeout.
One advantage this has over any of the existing solutions, is that it calls unsubscribe
on the event's thread, ensuring that your handler won't be called twice. (In your first solution you work around this by tcs.TrySetResult
instead of tcs.SetResult
, but it's always nice to get rid of a "TryDoSomething" and simply ensure DoSomething always works).
Another advantage is the code's simplicity. It's essentially one line. So you don't even particularly need an independent function. You can inline it so that it's more clear what exactly your code does, and you can make variations on the theme without needing a ton of optional parameters (like your optional initializer
, or allow waiting on N events, or foregoing timeouts/cancellation in instances where they're not necessary). And you'd have both the bool
return val and the actual result
in scope when it's finished, if that's useful at all.
using System.Reactive.Linq;
using System.Reactive.Threading.Tasks;
...
public static bool WaitForSingleEvent<TEvent>(this CancellationToken token, Action<TEvent> onEvent, Action<Action<TEvent>> subscribe, Action<Action<TEvent>> unsubscribe, int msTimeout, Action initializer = null) {
var task = Observable.FromEvent(subscribe, unsubscribe).FirstAsync().ToTask();
if (initializer != null) {
initializer();
}
try {
var finished = task.Wait(msTimeout, token);
if (finished) onEvent(task.Result);
return finished;
} catch (OperationCanceledException) { return false; }
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With