I have problems with seemingly inconsistent behavior when cancelling different kinds of Asyncs.
To reproduce the problem, let's says there is a function that takes a list of "jobs" (Async<_> list), waits for them to complete and prints their results. The function also gets a cancellation token so it can be cancelled:
let processJobs jobs cancel =
Async.Start(async {
try
let! results = jobs |> Async.Parallel
printfn "%A" results
finally
printfn "stopped"
}, cancel)
The function is called like that:
let jobs = [job1(); job2(); job3(); job4(); job5()]
use cancel = new CancellationTokenSource()
processJobs jobs cancel.Token
And somewhat later it is cancelled:
Thread.Sleep(1000)
printfn "cancelling..."
cancel.Cancel()
When the cancellation token source is cancelled, the function should execute the finally-block and print "stopped".
That works fine for job1, 2 and 3, but doesn't work when there is a job4 or job5 in the list.
Job1 just Async.Sleeps:
let job1() = async {
do! Async.Sleep 1000000
return 10
}
Job2 starts some async childs and waits for them:
let job2() = async {
let! child1 = Async.StartChild(async {
do! Async.Sleep 1000000
return 10
})
let! child2 = Async.StartChild(async {
do! Async.Sleep 1000000
return 10
})
let! results = [child1; child2] |> Async.Parallel
return results |> Seq.sum
}
Job3 waits for some ugly wait handle that's set by some even uglier thread:
let job3() = async {
use doneevent = new ManualResetEvent(false)
let thread = Thread(fun () -> Thread.Sleep(1000000); doneevent.Set() |> ignore)
thread.Start()
do! Async.AwaitWaitHandle(doneevent :> WaitHandle) |> Async.Ignore
return 30
}
Job4 posts to and waits for a reply from a MailboxProcessor:
let job4() = async {
let worker = MailboxProcessor.Start(fun inbox -> async {
let! (msg:AsyncReplyChannel<int>) = inbox.Receive()
do! Async.Sleep 1000000
msg.Reply 40
})
return! worker.PostAndAsyncReply (fun reply -> reply) // <- cannot cancel this
}
Job5 waits for a Task (or TaskCompletionSource):
let job5() = async {
let tcs = TaskCompletionSource<int>()
Async.Start(async {
do! Async.Sleep 1000000
tcs.SetResult 50
})
return! (Async.AwaitTask tcs.Task) // <- cannot cancel this
}
Why can Job1, 2 and 3 be cancelled ("stopped" gets printed), while Job4 and 5 make the function hang "forever"?
So far I always relied on F# to handle cancellation behind the scenes - as long as I'm in async-blocks and use !s (let!, do!, return!,...) everything should be fine.. but that doesn't seem to be the case all the time.
Quote:
In F# asynchronous workflows, the CancellationToken object is passed around automatically under the cover. This means that we don't have to do anything special to support cancellation. When running asynchronous workflow, we can give it cancellation token and everything will work automatically.
Complete code is available here: http://codepad.org/euVO3xgP
EDIT
I noticed that piping an async through Async.StartAsTask followed by Async.AwaitTask makes it cancelable in all cases.
i.e. for Job4 that means changing the line:
return! worker.PostAndAsyncReply (fun reply -> reply)
to:
return! cancelable <| worker.PostAndAsyncReply (fun reply -> reply)
With cancelable being:
let cancelable (x:Async<_>) = async {
let! cancel = Async.CancellationToken
return! Async.StartAsTask(x, cancellationToken = cancel) |> Async.AwaitTask
}
The same works for making Job5 cancelable.
But.. that's just a workaround and I can hardly put that around each call to an unknown Async<_>.
Only the Async. methods handle using the default CancellationToken themselves.
In your MailboxProcessor example the cancel should go on the Start method
let! ct= Async.CancellationToken
use worker := MailboxProcessor.Start( theWork, ct)
In the TaskCompletionSource example, you are going to have to register a callback to cancel it.
let! ct = Async.CancellationToken
use canceler = ct.Register( fun () -> tcs.TrySetCanceled() |> ignore )
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With