I am surprised by a stack overflow in my async-based program. I suspect the main problem is with the following function, which is supposed to compose two async computations to execute in parallel and wait for both to finish:
let ( <|> ) (a: Async<unit>) (b: Async<unit>) =
async {
let! x = Async.StartChild a
let! y = Async.StartChild b
do! x
do! y
}
With this defined, I have the following mapReduce
program that attempts to exploit parallelism in both the map
and the reduce
part. Informally, the idea is to spark N
mappers and N-1
reducers using a shared channel, wait for them to finish, and read the result from the channel. I had my own Channel
implementation, here replaced by a ConcurrentBag
for shorter code (the problem affects both):
let mapReduce (map : 'T1 -> Async<'T2>)
(reduce : 'T2 -> 'T2 -> Async<'T2>)
(input : seq<'T1>) : Async<'T2> =
let bag = System.Collections.Concurrent.ConcurrentBag()
let rec read () =
async {
match bag.TryTake() with
| true, value -> return value
| _ -> do! Async.Sleep 100
return! read ()
}
let write x =
bag.Add x
async.Return ()
let reducer =
async {
let! x = read ()
let! y = read ()
let! r = reduce x y
return bag.Add r
}
let work =
input
|> Seq.map (fun x -> async.Bind(map x, write))
|> Seq.reduce (fun m1 m2 -> m1 <|> m2 <|> reducer)
async {
do! work
return! read ()
}
Now the following basic test starts to throw StackOverflowException on n=10000:
let test n =
let map x = async.Return x
let reduce x y = async.Return (x + y)
mapReduce map reduce [0..n]
|> Async.RunSynchronously
EDIT: An alternative implementation of the <|>
combinator makes the test succeed on N=10000:
let ( <|> ) (a: Async<unit>) (b: Async<unit>) =
Async.FromContinuations(fun (ok, _, _) ->
let count = ref 0
let ok () =
lock count (fun () ->
match !count with
| 0 -> incr count
| _ -> ok ())
Async.Start <|
async {
do! a
return ok ()
}
Async.Start <|
async {
do! b
return ok ()
})
This is really surprising to me because this is what I assumed Async.StartChild
is doing. Any thoughts on which solution would be optimal?
I think that the stack overflow exception happens when starting the asynchronous workflow created using the <|>
operator. The call to Async.StartChild
starts the first workflow, which is combined using <|>
and so it makes another call to Async.StartChild
etc.
An easy way to fix it is to schedule the workflow in a handler of a timer (so that the it isn't added to the current stack). Something like:
let ( <|> ) (a: Async<unit>) (b: Async<unit>) =
async {
do! Async.Sleep 1
let! x = Async.StartChild a
let! y = Async.StartChild b
do! x
do! y }
A better way to fix it would be to create your own Seq.reduce
- the current implementation folds it one-by-one so you'll get a tree of depth 10000, that contains just a single work item on the right, and all other work items on the left. If you created a ballanced binary tree of work items, then it shouldn't stackoverflow because the height will be only 15 or so.
EDIT Try replacing Seq.reduce
with the following function:
module Seq =
let reduceBallanced f input =
let arr = input |> Array.ofSeq
let rec reduce s t =
if s + 1 >= t then arr.[s]
else
let m = (s + t) / 2
f (reduce s m) (reduce m t)
reduce 0 arr.Length
I believe Tomas got the intuition right in the answer, but here it is in my own words and more detail, after spending quite a bit of time to figure this out.
The problem is that the above code does not implement the intended mapReduce
algorithm due to excessive synchronization. In particular, a <|> b <|> c
does not start c
before both a
and b
have completed, so in fact <|>
is useless for parallelism with more than two computations.
The second problem is that async.Return x
is isomorphic to Async.FromContinuations(fun (ok, _, _) -> ok x)
. The example then in fact executed sequentially, on the single thread, and the allocated closures blew the stack.
For the curious reader, below is my second attempt to design this algorithm, which seems to fare a little better (~1 sec on n=100000
and ~21 sec on n=100000
with map and reduce functions extended with Async.Sleep 1000
, I have Core i3).
let mapReduce (map : 'T1 -> Async<'T2>)
(reduce : 'T2 -> 'T2 -> Async<'T2>)
(input : seq<'T1>) : Async<'T2> =
let run (a: Async<'T>) (k: 'T -> unit) =
Async.StartWithContinuations(a, k, ignore, ignore)
Async.FromContinuations <| fun (ok, _, _) ->
let k = ref 0
let agent =
new MailboxProcessor<_>(fun chan ->
async {
for i in 2 .. k.Value do
let! x = chan.Receive()
let! y = chan.Receive()
return run (reduce x y) chan.Post
let! r = chan.Receive()
return ok r
})
k :=
(0, input)
||> Seq.fold (fun count x ->
run (map x) agent.Post
count + 1)
agent.Start()
Very interesting discussion! I had a similar issue with Async.Parallel
let (<||>) first second = async { let! results = Async.Parallel([|first; second|]) in return (results.[0], results.[1]) }
let test = async { do! Async.Sleep 100 }
(test, [1..10000])
||> List.fold (fun state value -> (test <||> state) |> Async.Ignore)
|> Async.RunSynchronously // stackoverflow
I was very frustrated... so I solved it by creating my own Parallel combinator.
let parallel<'T>(computations : Async<'T> []) : Async<'T []> =
Async.FromContinuations (fun (cont, exnCont, _) ->
let count = ref computations.Length
let results : 'T [] = Array.zeroCreate computations.Length
computations
|> Array.iteri (fun i computation ->
Async.Start <|
async {
try
let! res = computation
results.[i] <- res
with ex -> exnCont ex
let n = System.Threading.Interlocked.Decrement(count)
if n = 0 then
results |> cont
}))
And finally inspired by the discussion, I implemented the following mapReduce function
// (|f ,⊗|)
let mapReduce (mapF : 'T -> Async<'R>) (reduceF : 'R -> 'R -> Async<'R>) (input : 'T []) : Async<'R> =
let rec mapReduce' s e =
async {
if s + 1 >= e then return! mapF input.[s]
else
let m = (s + e) / 2
let! (left, right) = mapReduce' s m <||> mapReduce' m e
return! reduceF left right
}
mapReduce' 0 input.Length
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With