Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Waiting on the cancellation of an asynchronous workflow

The Cancel member of a CancellationTokenSource object "communicates a request for cancellation" which, I assume, means it is fire and forget and does not wait until the cancellation has been completed (e.g. all exception handlers have been run). That's nice but I need to wait until an outstanding async has been fully cancelled before creating another async. Is there an easy way to accomplish this?

like image 480
J D Avatar asked Feb 19 '23 18:02

J D


2 Answers

I don't think there is any direct way to do that using the standard library functions from the F# async libraries. The closest operation us Async.TryCancelled which runs a callback when the workflow is (acutally) cancelled, but sending the message from the callback to the code that started the workflow has to be done by hand.

This is relatively easy to solve using events and an extension from the F# async extensions that I wrote (also included in FSharpX package) - the extension is GuardedAwaitObservable that can be used to wait for an occurrence of an event (which can be triggered immediately by some operation).

The following Async.StartCancellable method takes an asynchronous workflow and returns Async<Async<unit>>. When you bind on the outer workflow, it starts the argument (like Async.StartChild) and when you bind on the returned inner workflow, it cancels the computation and waits until it is actually cancelled:

open System.Threading

module Async = 
  /// Returns an asynchronous workflow 'Async<Async<unit>>'. When called
  /// using 'let!', it starts the workflow provided as an argument and returns
  /// a token that can be used to cancel the started work - this is an
  /// (asynchronously) blocking operation that waits until the workflow
  /// is actually cancelled 
  let StartCancellable work = async {
    let cts = new CancellationTokenSource()
    // Creates an event used for notification
    let evt = new Event<_>()
    // Wrap the workflow with TryCancelled and notify when cancelled
    Async.Start(Async.TryCancelled(work, ignore >> evt.Trigger), cts.Token)
    // Return a workflow that waits for 'evt' and triggers 'Cancel'
    // after it attaches the event handler (to avoid missing event occurrence)
    let waitForCancel = Async.GuardedAwaitObservable evt.Publish cts.Cancel
    return async.TryFinally(waitForCancel, cts.Dispose) }

EDIT Wrapped the result in TryFinally to dispose of the CancellationTokenSource as suggested by Jon. I think this should be sufficient to make sure it is disposed of correctly.

Here is an example that uses the method. The loop function is a simple workflow that I used for testing. The rest of the code starts it, waits 5.5 seconds and then cancels it:

/// Sample workflow that repeatedly starts and stops long running operation
let loop = async {
  for i in 0 .. 9999 do
    printfn "Starting: %d" i
    do! Async.Sleep(1000)
    printfn "Done: %d" i }

// Start the 'loop' workflow, wait for 5.5 seconds and then
// cancel it and wait until it finishes current operation  
async { let! cancelToken = Async.StartCancellable(loop)
        printfn "started"
        do! Async.Sleep(5500)
        printfn "cancelling"
        do! cancelToken
        printfn "done" }
|> Async.Start

For completeness, the sample with the necessary definitions from FSharpX is here on F# snippets.

like image 130
Tomas Petricek Avatar answered Mar 01 '23 02:03

Tomas Petricek


This should not be hard given easy to use synchronization primitives. I particularly like write-once "logic" variables:

type Logic<'T> =
    new : unit -> Logic<'T>
    member Set : 'T -> unit
    member Await : Async<'T>

It is easy to wrap an Async to set a logic variable upon completion, then wait on it, for example:

type IWork =
    abstract member Cancel : unit -> Async<unit>

let startWork (work: Async<unit>) =
    let v = Logic<unit>()
    let s = new CancellationTokenSource()
    let main = async.TryFinally(work, fun () -> s.Dispose(); v.Set())
    Async.Start(main, s.Token)
    {
        new IWork with
            member this.Cancel() = s.Cancel(); v.Await
    }

A possible implementation of logic variables might be:

type LogicState<'T> =
    | New
    | Value of 'T
    | Waiting of ('T -> unit)

[<Sealed>]
type Logic<'T>() =
    let lockRoot = obj ()
    let mutable st = New
    let update up =
        let k =
            lock lockRoot <| fun () ->
                let (n, k) = up st
                st <- n
                k
        k ()

    let wait (k: 'T -> unit) =
        update <| function
            | New -> (Waiting k, ignore)
            | Value value as st -> (st, fun () -> k value)
            | Waiting f -> (Waiting (fun x -> f x; k x), ignore)

    let await =
        Async.FromContinuations(fun (ok, _, _) -> wait ok)

    member this.Set<'T>(value: 'T) =
        update <| function
            | New -> (Value value, ignore)
            | Value _ as st -> (st, ignore)
            | Waiting f as st -> (Value value, fun () -> f value)

    member this.Await = await
like image 36
t0yv0 Avatar answered Mar 01 '23 00:03

t0yv0