The Cancel
member of a CancellationTokenSource
object "communicates a request for cancellation" which, I assume, means it is fire and forget and does not wait until the cancellation has been completed (e.g. all exception handlers have been run). That's nice but I need to wait until an outstanding async has been fully cancelled before creating another async. Is there an easy way to accomplish this?
I don't think there is any direct way to do that using the standard library functions from the F# async libraries. The closest operation us Async.TryCancelled
which runs a callback when the workflow is (acutally) cancelled, but sending the message from the callback to the code that started the workflow has to be done by hand.
This is relatively easy to solve using events and an extension from the F# async extensions that I wrote (also included in FSharpX package) - the extension is GuardedAwaitObservable
that can be used to wait for an occurrence of an event (which can be triggered immediately by some operation).
The following Async.StartCancellable
method takes an asynchronous workflow and returns Async<Async<unit>>
. When you bind on the outer workflow, it starts the argument (like Async.StartChild
) and when you bind on the returned inner workflow, it cancels the computation and waits until it is actually cancelled:
open System.Threading
module Async =
/// Returns an asynchronous workflow 'Async<Async<unit>>'. When called
/// using 'let!', it starts the workflow provided as an argument and returns
/// a token that can be used to cancel the started work - this is an
/// (asynchronously) blocking operation that waits until the workflow
/// is actually cancelled
let StartCancellable work = async {
let cts = new CancellationTokenSource()
// Creates an event used for notification
let evt = new Event<_>()
// Wrap the workflow with TryCancelled and notify when cancelled
Async.Start(Async.TryCancelled(work, ignore >> evt.Trigger), cts.Token)
// Return a workflow that waits for 'evt' and triggers 'Cancel'
// after it attaches the event handler (to avoid missing event occurrence)
let waitForCancel = Async.GuardedAwaitObservable evt.Publish cts.Cancel
return async.TryFinally(waitForCancel, cts.Dispose) }
EDIT Wrapped the result in TryFinally
to dispose of the CancellationTokenSource
as suggested by Jon. I think this should be sufficient to make sure it is disposed of correctly.
Here is an example that uses the method. The loop
function is a simple workflow that I used for testing. The rest of the code starts it, waits 5.5 seconds and then cancels it:
/// Sample workflow that repeatedly starts and stops long running operation
let loop = async {
for i in 0 .. 9999 do
printfn "Starting: %d" i
do! Async.Sleep(1000)
printfn "Done: %d" i }
// Start the 'loop' workflow, wait for 5.5 seconds and then
// cancel it and wait until it finishes current operation
async { let! cancelToken = Async.StartCancellable(loop)
printfn "started"
do! Async.Sleep(5500)
printfn "cancelling"
do! cancelToken
printfn "done" }
|> Async.Start
For completeness, the sample with the necessary definitions from FSharpX is here on F# snippets.
This should not be hard given easy to use synchronization primitives. I particularly like write-once "logic" variables:
type Logic<'T> =
new : unit -> Logic<'T>
member Set : 'T -> unit
member Await : Async<'T>
It is easy to wrap an Async to set a logic variable upon completion, then wait on it, for example:
type IWork =
abstract member Cancel : unit -> Async<unit>
let startWork (work: Async<unit>) =
let v = Logic<unit>()
let s = new CancellationTokenSource()
let main = async.TryFinally(work, fun () -> s.Dispose(); v.Set())
Async.Start(main, s.Token)
{
new IWork with
member this.Cancel() = s.Cancel(); v.Await
}
A possible implementation of logic variables might be:
type LogicState<'T> =
| New
| Value of 'T
| Waiting of ('T -> unit)
[<Sealed>]
type Logic<'T>() =
let lockRoot = obj ()
let mutable st = New
let update up =
let k =
lock lockRoot <| fun () ->
let (n, k) = up st
st <- n
k
k ()
let wait (k: 'T -> unit) =
update <| function
| New -> (Waiting k, ignore)
| Value value as st -> (st, fun () -> k value)
| Waiting f -> (Waiting (fun x -> f x; k x), ignore)
let await =
Async.FromContinuations(fun (ok, _, _) -> wait ok)
member this.Set<'T>(value: 'T) =
update <| function
| New -> (Value value, ignore)
| Value _ as st -> (st, ignore)
| Waiting f as st -> (Value value, fun () -> f value)
member this.Await = await
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With