Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Using a F# event and asynchronous in multi-threaded code

EDIT/Notice: Event is now thread-safe in current F# implementation.


I'm working a lot with asynchronous workflows and agents in F#. While I was going a little bit deeper into events I noticed that the Event<_>() type is not thread-safe.

Here I'm not talking about the common problem of raising an event. I'm actually talking about subscribing and removing/disposing from an event. For testing purposes, I have written this short program:

let event = Event<int>()
let sub   = event.Publish

[<EntryPoint>]
let main argv =
    let subscribe sub x = async {
        let mutable disposables = []
        for i=0 to x do
            let dis = Observable.subscribe (fun x -> printf "%d" x) sub
            disposables <- dis :: disposables
        for dis in disposables do
            dis.Dispose()
    }

    Async.RunSynchronously(async{
        let! x = Async.StartChild (subscribe sub 1000)
        let! y = Async.StartChild (subscribe sub 1000)
        do! x
        do! y
        event.Trigger 1
        do! Async.Sleep 2000
    })
    0

The program is simple. I create an event and a function that subscribes a specific amount of events to it, and after that dispose every handler. I use another asynchronous computation to spawn two instances of those function with Async.StartChild. After both functions finished I trigger the event to see if there are some handlers still left.

But when event.Trigger(1) is called the result is that there are still some handlers registered to the event. As some "1" will be printed to the console. That in general means that subscribing and/or Disposing is not thread-safe.

And that is what I didn't expected. If subscribing and disposing is not thread-safe, how can events in general safely be used?

Sure events also can be used outside of threads, and a trigger don't spawn any function in parallel or on different threads. But it is somehow normal to me that events are used in Async, agent-based code or in general with threads. They are often used as a communication to gather information of Backroundworker threads.

With Async.AwaitEvent it is possible to subscribe to an event. If subscribing and disposing is not thread-safe, how is it possible to use events in such an environment? And which purpose has Async.AwaitEvent? Considering that an asynchronous workflow does thread, hoping just using Async.AwaitEvent is basically "broken by design" if subscribing/disposing to an event is not thread-safe by default.

The general question I'm facing is: Is it correct that subscribing and disposing is not thread-safe? From my example it seems to look like it, but probably I missed some important detail. I currently use events a lot in my design, and I usually have MailboxProcessors and use events for notification. So the question is. If events are not thread-safe the whole design I'm currently using is not thread-safe at all. So what is an fix for this situation? Creating a whole new thread-safe event implementation? Do some implementations already exist that face this problem? Or are there other options to use events safely in a highly threaded environment?

like image 617
David Raab Avatar asked Jan 31 '16 01:01

David Raab


1 Answers

FYI; the implementation for Event<int> can be found here.

The interesting bit seems to be:

member e.AddHandler(d) =
  x.multicast <- (System.Delegate.Combine(x.multicast, d) :?> Handler<'T>)
member e.RemoveHandler(d) = 
  x.multicast <- (System.Delegate.Remove(x.multicast, d) :?> Handler<'T>)

Subscribing to an event combines the current event handler with the event handler passed into subscribe. This combined event handler replaces the current one.

The problem from a concurrency perspective is that here we have a race-condition in that concurrent subscribers might use the came current event handler to combine with and the "last" one that writes back the handler win (last is a difficult concept in concurrency these days but nvm).

What could be done here is to introduce a CAS loop using Interlocked.CompareAndExchange but that adds performance overhead that hurts non-concurrent users. It's something one could make a PR off though and see if it viewed favourably by the F# community.

WRT to your second question on what to do about it I can just say what I would do. I would go for the option of creating a version of FSharpEvent that supports protected subscribe/unsubscribe. Perhaps base it of FSharpEvent if your company FOSS policy allows it. If it turns out a success then it could form a future PR to F# core libary.

I don't know your requirements but it's also possible that if what you need is coroutines (ie Async) and not threads then it's possible to rewrite the program to use only 1 thread and thus you won't be affected by this race condition.

like image 114
Just another metaprogrammer Avatar answered Nov 15 '22 09:11

Just another metaprogrammer