Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Asynchronous barrier in F#

I wrote a program in F# that asynchronously lists all directories on disk. An async task lists all files in a given directory and creates separate async tasks (daemons: I start them using Async.Start) to list subdirectories. They all communicate the results to the central MailboxProcessor.

My problem is, how do I detect that all the daemon tasks have finished and there will be no more files arriving. Essentially I need a barrier for all tasks that are (direct and indirect) children of my top task. I couldn't find anything like that in the F#'s async model.

What I did instead is to create a separate MailboxProcessor where I register each task's start and termination. When the active count goes to zero, I'm done. But I'm not happy with that solution. Any other suggestions?

like image 246
Bartosz Milewski Avatar asked Jan 05 '11 20:01

Bartosz Milewski


1 Answers

Have you tried using Async.Parallel? That is, rather than Async.Start each subdirectory, just combine the subdirectory tasks into a single async via Async.Parallel. Then you end up with a (nested) fork-join task that you can RunSynchronously and await the final result.

EDIT

Here is some approximate code, that shows the gist, if not the full detail:

open System.IO

let agent = MailboxProcessor.Start(fun mbox ->
    async {
        while true do
            let! msg = mbox.Receive()
            printfn "%s" msg
    })

let rec traverse dir =
    async {
        agent.Post(dir)
        let subDirs = Directory.EnumerateDirectories(dir)
        return! [for d in subDirs do yield traverse d] 
                 |> Async.Parallel |> Async.Ignore 
    }

traverse "d:\\" |> Async.RunSynchronously
// now all will be traversed, 
// though Post-ed messages to agent may still be in flight

EDIT 2

Here is the waiting version that uses replies:

open System.IO

let agent = MailboxProcessor.Start(fun mbox ->
    async {
        while true do
            let! dir, (replyChannel:AsyncReplyChannel<unit>) = mbox.Receive()
            printfn "%s" dir
            replyChannel.Reply()
    })

let rec traverse dir =
    async {
        let r = agent.PostAndAsyncReply(fun replyChannel -> dir, replyChannel)
        let subDirs = Directory.EnumerateDirectories(dir)
        do! [for d in subDirs do yield traverse d] 
                 |> Async.Parallel |> Async.Ignore 
        do! r // wait for Post to finish
    }

traverse "c:\\Projects\\" |> Async.RunSynchronously
// now all will be traversed to completion 
like image 58
Brian Avatar answered Nov 11 '22 16:11

Brian