Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

F# async stack overflow

I am surprised by a stack overflow in my async-based program. I suspect the main problem is with the following function, which is supposed to compose two async computations to execute in parallel and wait for both to finish:

let ( <|> ) (a: Async<unit>) (b: Async<unit>) =
    async {
        let! x = Async.StartChild a
        let! y = Async.StartChild b
        do! x
        do! y
    }

With this defined, I have the following mapReduce program that attempts to exploit parallelism in both the map and the reduce part. Informally, the idea is to spark N mappers and N-1 reducers using a shared channel, wait for them to finish, and read the result from the channel. I had my own Channel implementation, here replaced by a ConcurrentBag for shorter code (the problem affects both):

let mapReduce (map    : 'T1 -> Async<'T2>)
              (reduce : 'T2 -> 'T2 -> Async<'T2>)
              (input  : seq<'T1>) : Async<'T2> =
    let bag = System.Collections.Concurrent.ConcurrentBag()

    let rec read () =
        async {
            match bag.TryTake() with
            | true, value -> return value
            | _           -> do! Async.Sleep 100
                             return! read ()
        }

    let write x =
        bag.Add x
        async.Return ()

    let reducer =
        async {
            let! x = read ()
            let! y = read ()
            let! r = reduce x y
            return bag.Add r
        }

    let work =
        input
        |> Seq.map (fun x -> async.Bind(map x, write))
        |> Seq.reduce (fun m1 m2 -> m1 <|> m2 <|> reducer)

    async {
        do! work
        return! read ()
    }

Now the following basic test starts to throw StackOverflowException on n=10000:

let test n  =
    let map x      = async.Return x
    let reduce x y = async.Return (x + y)
    mapReduce map reduce [0..n]
    |> Async.RunSynchronously

EDIT: An alternative implementation of the <|> combinator makes the test succeed on N=10000:

let ( <|> ) (a: Async<unit>) (b: Async<unit>) =
  Async.FromContinuations(fun (ok, _, _) ->
    let count = ref 0
    let ok () =
        lock count (fun () ->
            match !count with
            | 0 -> incr count
            | _ -> ok ())
    Async.Start <|
        async {
            do! a
            return ok ()
        }
    Async.Start <|
        async {
            do! b
            return ok ()
        })

This is really surprising to me because this is what I assumed Async.StartChild is doing. Any thoughts on which solution would be optimal?

like image 997
t0yv0 Avatar asked Aug 06 '11 21:08

t0yv0


3 Answers

I think that the stack overflow exception happens when starting the asynchronous workflow created using the <|> operator. The call to Async.StartChild starts the first workflow, which is combined using <|> and so it makes another call to Async.StartChild etc.

An easy way to fix it is to schedule the workflow in a handler of a timer (so that the it isn't added to the current stack). Something like:

let ( <|> ) (a: Async<unit>) (b: Async<unit>) =
    async {
        do! Async.Sleep 1
        let! x = Async.StartChild a
        let! y = Async.StartChild b
        do! x
        do! y }

A better way to fix it would be to create your own Seq.reduce - the current implementation folds it one-by-one so you'll get a tree of depth 10000, that contains just a single work item on the right, and all other work items on the left. If you created a ballanced binary tree of work items, then it shouldn't stackoverflow because the height will be only 15 or so.

EDIT Try replacing Seq.reduce with the following function:

module Seq = 
  let reduceBallanced f input =
    let arr = input |> Array.ofSeq
    let rec reduce s t =
      if s + 1 >= t then arr.[s]
      else 
        let m = (s + t) / 2
        f (reduce s m) (reduce m t)
    reduce 0 arr.Length
like image 130
Tomas Petricek Avatar answered Nov 17 '22 06:11

Tomas Petricek


I believe Tomas got the intuition right in the answer, but here it is in my own words and more detail, after spending quite a bit of time to figure this out.

  1. The problem is that the above code does not implement the intended mapReduce algorithm due to excessive synchronization. In particular, a <|> b <|> c does not start c before both a and b have completed, so in fact <|> is useless for parallelism with more than two computations.

  2. The second problem is that async.Return x is isomorphic to Async.FromContinuations(fun (ok, _, _) -> ok x). The example then in fact executed sequentially, on the single thread, and the allocated closures blew the stack.

For the curious reader, below is my second attempt to design this algorithm, which seems to fare a little better (~1 sec on n=100000 and ~21 sec on n=100000 with map and reduce functions extended with Async.Sleep 1000, I have Core i3).

let mapReduce (map    : 'T1 -> Async<'T2>)
              (reduce : 'T2 -> 'T2 -> Async<'T2>)
              (input  : seq<'T1>) : Async<'T2> =
    let run (a: Async<'T>) (k: 'T -> unit) =
        Async.StartWithContinuations(a, k, ignore, ignore)
    Async.FromContinuations <| fun (ok, _, _) ->
        let k = ref 0
        let agent =
            new MailboxProcessor<_>(fun chan ->
                async {
                    for i in 2 .. k.Value do
                        let! x = chan.Receive()
                        let! y = chan.Receive()
                        return run (reduce x y) chan.Post
                    let! r = chan.Receive()
                    return ok r
                })
        k :=
            (0, input)
            ||> Seq.fold (fun count x ->
                run (map x) agent.Post
                count + 1)
        agent.Start()
like image 2
2 revs, 2 users 99% Avatar answered Nov 17 '22 07:11

2 revs, 2 users 99%


Very interesting discussion! I had a similar issue with Async.Parallel

let (<||>) first second = async { let! results = Async.Parallel([|first; second|]) in return   (results.[0], results.[1]) } 

let test = async { do! Async.Sleep 100 } 
(test, [1..10000]) 
||> List.fold (fun state value -> (test <||> state) |> Async.Ignore) 
|> Async.RunSynchronously // stackoverflow

I was very frustrated... so I solved it by creating my own Parallel combinator.

let parallel<'T>(computations : Async<'T> []) : Async<'T []> =
  Async.FromContinuations (fun (cont, exnCont, _) ->
    let count = ref computations.Length
    let results : 'T [] = Array.zeroCreate computations.Length
    computations 
        |> Array.iteri (fun i computation ->
            Async.Start <|
                async { 
                    try
                        let! res = computation
                        results.[i] <- res 
                    with ex -> exnCont ex

                    let n = System.Threading.Interlocked.Decrement(count)
                    if n = 0 then 
                        results |> cont 
                }))

And finally inspired by the discussion, I implemented the following mapReduce function

// (|f ,⊗|)

let mapReduce (mapF : 'T -> Async<'R>) (reduceF : 'R -> 'R -> Async<'R>) (input : 'T []) : Async<'R> = 
let rec mapReduce' s e =
    async { 
        if s + 1 >= e then return! mapF input.[s]
        else 
            let m = (s + e) / 2
            let! (left, right) =  mapReduce' s m <||> mapReduce' m e
            return! reduceF left right
    }
mapReduce' 0 input.Length
like image 2
Nick Palladinos Avatar answered Nov 17 '22 05:11

Nick Palladinos