Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Lock-Free way to Signal between Asyncs

Tags:

.net

f#

lock-free

I am looking for a lock-free way to signal between two Asyncs in F#. I have two tail-recursive async functions, and I want one to yield until signaled by the other before proceeding to the next recursion. I could use an event for this, but it looks like .NET events use locks internally. The only solution I've found so far would be to use Keyed Events from ntdll.dll, but I would prefer a solution that does not require a direct reference to a platform-specific DLL. Is there some way I can use System.Threading.Interlocked or another .NET technique to achieve this?

Here's a simple example of what I want to achieve:

let rec loop1 () =
    async {
        // do work
        // somehow signal loop2
        return! loop1 ()
    }

let rec loop2 state = 
    async {
        // wait for signal from loop1
        // do work
        return! loop2 state  // This would actually be a new state, not the old state
    }
like image 378
Aaron M. Eshbach Avatar asked May 17 '18 15:05

Aaron M. Eshbach


1 Answers

I took a look at Szer's suggestion to model the event on Hopac's IVar, and I examined how the standard F# Event is implemented. Combining the two, I came up with this:

open System
open System.Threading

type LockFreeEvent<'args>() =
    let mutable multicast: Handler<'args> = null
    let wait = 
        let spin = SpinWait()
        spin.SpinOnce

    member __.Trigger arg = 
        match multicast with 
        | null -> ()
        | d -> d.Invoke(null, arg) |> ignore

    member __.Publish = 
        {new IEvent<'args> with
            member __.AddHandler handler = 
                let snapshot = multicast
                while snapshot <> Interlocked.CompareExchange<Handler<'args>>(&multicast, Delegate.Combine(multicast, handler) :?> Handler<'args>, snapshot) do
                    wait ()
            member __.RemoveHandler handler =
                let snapshot = multicast
                while snapshot <> Interlocked.CompareExchange(&multicast, Delegate.Remove(multicast, handler) :?> Handler<'args>, snapshot) do 
                    wait ()
            member this.Subscribe observer =
                let handler = new Handler<_>(fun sender args -> observer.OnNext(args))
                (this :?> IEvent<_,_>).AddHandler(handler)
                { new IDisposable with 
                    member __.Dispose() = (this :?> IEvent<_,_>).RemoveHandler(handler) 
                }
        }

How does this look? I think this should implement the same functionality as a standard F# event, but without locking, unless there's locking going on in Delegate.Combine. I think I might need to do Trigger differently as well.

like image 181
Aaron M. Eshbach Avatar answered Oct 24 '22 06:10

Aaron M. Eshbach