Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Inconsistent behaviour when cancelling different kinds of Asyncs

Tags:

f#

I have problems with seemingly inconsistent behavior when cancelling different kinds of Asyncs.

To reproduce the problem, let's says there is a function that takes a list of "jobs" (Async<_> list), waits for them to complete and prints their results. The function also gets a cancellation token so it can be cancelled:

let processJobs jobs cancel =
  Async.Start(async {
    try
      let! results = jobs |> Async.Parallel
      printfn "%A" results
    finally
      printfn "stopped"
  }, cancel)

The function is called like that:

let jobs = [job1(); job2(); job3(); job4(); job5()]
use cancel = new CancellationTokenSource()

processJobs jobs cancel.Token

And somewhat later it is cancelled:

Thread.Sleep(1000)
printfn "cancelling..."
cancel.Cancel()

When the cancellation token source is cancelled, the function should execute the finally-block and print "stopped".

That works fine for job1, 2 and 3, but doesn't work when there is a job4 or job5 in the list.

Job1 just Async.Sleeps:

let job1() = async {
  do! Async.Sleep 1000000
  return 10
}

Job2 starts some async childs and waits for them:

let job2() = async {
  let! child1 = Async.StartChild(async {
    do! Async.Sleep 1000000
    return 10
  })

  let! child2 = Async.StartChild(async {
    do! Async.Sleep 1000000
    return 10
  })

  let! results = [child1; child2] |> Async.Parallel
  return results |> Seq.sum
}

Job3 waits for some ugly wait handle that's set by some even uglier thread:

let job3() = async {
  use doneevent = new ManualResetEvent(false)

  let thread = Thread(fun () -> Thread.Sleep(1000000); doneevent.Set() |> ignore)
  thread.Start()

  do! Async.AwaitWaitHandle(doneevent :> WaitHandle) |> Async.Ignore

  return 30
}

Job4 posts to and waits for a reply from a MailboxProcessor:

let job4() = async {
  let worker = MailboxProcessor.Start(fun inbox -> async {
    let! (msg:AsyncReplyChannel<int>) = inbox.Receive()
    do! Async.Sleep 1000000
    msg.Reply 40
  })

  return! worker.PostAndAsyncReply (fun reply -> reply) // <- cannot cancel this
}

Job5 waits for a Task (or TaskCompletionSource):

let job5() = async {
  let tcs = TaskCompletionSource<int>()

  Async.Start(async {
    do! Async.Sleep 1000000
    tcs.SetResult 50
  })

  return! (Async.AwaitTask tcs.Task) // <- cannot cancel this
}

Why can Job1, 2 and 3 be cancelled ("stopped" gets printed), while Job4 and 5 make the function hang "forever"?

So far I always relied on F# to handle cancellation behind the scenes - as long as I'm in async-blocks and use !s (let!, do!, return!,...) everything should be fine.. but that doesn't seem to be the case all the time.

Quote:

In F# asynchronous workflows, the CancellationToken object is passed around automatically under the cover. This means that we don't have to do anything special to support cancellation. When running asynchronous workflow, we can give it cancellation token and everything will work automatically.

Complete code is available here: http://codepad.org/euVO3xgP

EDIT

I noticed that piping an async through Async.StartAsTask followed by Async.AwaitTask makes it cancelable in all cases.

i.e. for Job4 that means changing the line:

return! worker.PostAndAsyncReply (fun reply -> reply)

to:

return! cancelable <| worker.PostAndAsyncReply (fun reply -> reply)

With cancelable being:

let cancelable (x:Async<_>) = async {
  let! cancel = Async.CancellationToken
  return! Async.StartAsTask(x, cancellationToken = cancel) |> Async.AwaitTask
}

The same works for making Job5 cancelable.

But.. that's just a workaround and I can hardly put that around each call to an unknown Async<_>.

like image 440
stmax Avatar asked May 12 '14 17:05

stmax


1 Answers

Only the Async. methods handle using the default CancellationToken themselves.

In your MailboxProcessor example the cancel should go on the Start method

let! ct= Async.CancellationToken
use worker := MailboxProcessor.Start( theWork, ct) 

In the TaskCompletionSource example, you are going to have to register a callback to cancel it.

let! ct = Async.CancellationToken
use canceler = ct.Register( fun () -> tcs.TrySetCanceled() |> ignore )
like image 152
jyoung Avatar answered Sep 23 '22 18:09

jyoung