I have such code (simplified here) which awaits finishing task:
var task_completion_source = new TaskCompletionSource<bool>();
observable.Subscribe(b =>
{
if (b)
task_completion_source.SetResult(true);
});
await task_completion_source.Task;
The idea is to subscribe and wait for the true
in the stream of booleans. This finishes the "task" and I can move on beyond the await
.
However I would like to cancel -- but not subscription, but awaiting. I would like to pass cancel token (somehow) to task_completion_source
so when I cancel the token source, the await
will move on.
How to do it?
Update: CancellationTokenSource
is external to this code, all I have here is the token from it.
CancellationTokenSource is quite a heavyweight object and its not normally cancelled; however it can't be pooled or reused because its registrations cannot be cleared.
Cancellation tokens are generally thread safe by design so passing them between threads and checking them should not be a problem.
A CancellationToken enables cooperative cancellation between threads, thread pool work items, or Task objects. You create a cancellation token by instantiating a CancellationTokenSource object, which manages cancellation tokens retrieved from its CancellationTokenSource.
You can get the CancellationToken by accessing internal fields with reflection. Hint: You can search for such things on your own with ILSpy . Show activity on this post. The purpose is to avoid the access of these properties and those properties are not always necessary.
What you need to do is change Take to accept a CancellationToken as a parameter, and it should register a handler so that when it is cancelled the TaskCompletionSource is cancelled. I highly recommend you use BufferBlock<T>, which has cancellation support built-in.
TaskCompletionSourceclass was introduced in .NET 4.0 in a pre async-era for controlling a task’s lifetime manually. By default all the task’s continuations are executed synchronously unless TaskCreationOptions.RunContinuationsAsynchronouslyoption is specified.
CancellationTokenSource - This is the object responsible for creating a cancellation token and sending a cancellation request to all copies of that token. CancellationToken - This is the structure used by listeners to monitor the token’s current state.
It’s also a good API pattern to keep your CancellationToken as the last parameter your method accepts. This fits nicely with optional parameters anyway since they have to show up after any required parameters.
If I understand you correctly, you can do it like this:
using (cancellationToken.Register(() => {
// this callback will be executed when token is cancelled
task_comletion_source.TrySetCanceled();
})) {
// ...
await task_comletion_source.Task;
}
Note that it will throw an exception on your await, which you have to handle.
I recommend that you do not build this yourself. There are a number of edge cases around cancellation tokens that are tedious to get right. For example, if the registration returned from Register
is never disposed, you can end up with a resource leak.
Instead, you can use the Task.WaitAsync
extension method from my AsyncEx.Tasks
library:
var task_completion_source = new TaskCompletionSource<bool>();
observable.Subscribe(b =>
{
if (b)
task_completion_source.SetResult(true);
});
await task_completion_source.Task.WaitAsync(cancellationToken);
On a side note, I'd strongly encourage you to use ToTask
rather than an explicit TaskCompletionSource
. Again, ToTask
handles edge cases nicely for you.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With