Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to achieve Asynchrony instead of Parallelism in F#

(Sticking to a common example with async fetch of many web pages)

How would I spin off multiple (hundreds) of web page requests asynchronously, and then wait for all requests to complete before going to the next step? Async.AsParallel processes a few requests at a time, controlled by number of cores on the CPU. Grabbing a web page is not a CPU-bound operation. Not satisfied with the speedup of Async.AsParallel, I am looking for alternatives.

I tried to connect the dots between Async.StartAsTask and Task[].WaitAll. Instinctively, I wrote the following code, but it does not compile.

let processItemsConcurrently (items : int seq) = 
  let tasks = items |> Seq.map (fun item -> Async.StartAsTask(fetchAsync item))
  Tasks.Task.WaitAll(tasks) 

How would you approach this?

like image 845
GregC Avatar asked Nov 30 '10 06:11

GregC


2 Answers

Async.Parallel is almost definitely right here. Not sure what you're not happy with; the strength of F# asyncs lies more in async computing than in task-parallel CPU-bound stuff (which is more tailored to Tasks and the .NET 4.0 TPL). Here's a full example:

open System.Diagnostics 
open System.IO
open System.Net
open Microsoft.FSharp.Control.WebExtensions 

let sites = [|
    "http://bing.com"
    "http://google.com"
    "http://cnn.com"
    "http://stackoverflow.com"
    "http://yahoo.com"
    "http://msdn.com"
    "http://microsoft.com"
    "http://apple.com"
    "http://nfl.com"
    "http://amazon.com"
    "http://ebay.com"
    "http://expedia.com"
    "http://twitter.com"
    "http://reddit.com"
    "http://hulu.com"
    "http://youtube.com"
    "http://wikipedia.org"
    "http://live.com"
    "http://msn.com"
    "http://wordpress.com"
    |]

let print s = 
    // careful, don't create a synchronization bottleneck by printing
    //printf "%s" s
    ()

let printSummary info fullTimeMs =
    Array.sortInPlaceBy (fun (i,_,_) -> i) info
//  for i, size, time in info do
//      printfn "%2d  %7d  %5d" i size time
    let longest = info |> Array.map (fun (_,_,time) -> time) |> Array.max
    printfn "longest request took %dms" longest
    let bytes = info |> Array.sumBy (fun (_,size,_) -> float size)
    let seconds = float fullTimeMs / 1000.
    printfn "sucked down %7.2f KB/s" (bytes / 1024.0 / seconds)

let FetchAllSync() =
    let allsw = Stopwatch.StartNew()
    let info = sites |> Array.mapi (fun i url ->
        let sw = Stopwatch.StartNew()
        print "S"
        let req = WebRequest.Create(url) 
        use resp = req.GetResponse()
        use stream = resp.GetResponseStream()
        use reader = new StreamReader(stream,
                            System.Text.Encoding.UTF8, true, 4096) 
        print "-"
        let contents = reader.ReadToEnd()
        print "r"
        i, contents.Length, sw.ElapsedMilliseconds)
    let time = allsw.ElapsedMilliseconds 
    printSummary info time
    time, info |> Array.sumBy (fun (_,size,_) -> size)

let FetchAllAsync() =
    let allsw = Stopwatch.StartNew()
    let info = sites |> Array.mapi (fun i url -> async {
        let sw = Stopwatch.StartNew()
        print "S"
        let req = WebRequest.Create(url) 
        use! resp = req.AsyncGetResponse()
        use stream = resp.GetResponseStream()
        use reader = new AsyncStreamReader(stream, // F# PowerPack
                           System.Text.Encoding.UTF8, true, 4096) 
        print "-"
        let! contents = reader.ReadToEnd()  // in F# PowerPack
        print "r"
        return i, contents.Length, sw.ElapsedMilliseconds })
                    |> Async.Parallel 
                    |> Async.RunSynchronously 
    let time = allsw.ElapsedMilliseconds 
    printSummary info time
    time, info |> Array.sumBy (fun (_,size,_) -> size)

// By default, I think .NET limits you to 2 open connections at once
ServicePointManager.DefaultConnectionLimit <- sites.Length 

for i in 1..3 do // to warmup and show variance
    let time1,r1 = FetchAllSync()
    printfn "Sync took %dms, result was %d" time1 r1
    let time2,r2 = FetchAllAsync()
    printfn "Async took %dms, result was %d  (speedup=%2.2f)" 
        time2 r2 (float time1/ float time2)
    printfn ""

On my 4-core box, this consistently gives a nearly 4x speedup.

EDIT

In reply to your comment, I've updated the code. You're right in that I've added more sites and am not seeing the expected speedup (still holding steady around 4x). I've started adding a little debugging output above, will continue investigating to see if something else is throttling the connections...

EDIT

Editted the code again. Well, I found what might be the bottleneck. Here's the implementation of AsyncReadToEnd in the PowerPack:

type System.IO.StreamReader with
   member s.AsyncReadToEnd () = 
       FileExtensions.UnblockViaNewThread (fun () -> s.ReadToEnd())

In other words, it just blocks a threadpool thread and reads synchronously. Argh!!! Let me see if I can work around that.

EDIT

Ok, the AsyncStreamReader in the PowerPack does the right thing, and I'm using that now.

However, the key issue seems to be variance.

When you hit, say, cnn.com, a lot of the time the result will come back in like 500ms. But every once in a while you get that one request that takes 4s, and this of course potentially kills the apparent async perf, since the overall time is the time of the unluckiest request.

Running the program above, I see speedups from about 2.5x to 9x on my 2-core box at home. It is very highly variable, though. It's still possible there's some bottleneck in the program that I've missed, but I think the variance-of-the-web may account for all of what I'm seeing at this point.

like image 56
Brian Avatar answered Nov 15 '22 22:11

Brian


Using the Reactive Extensions for .NET combined with F#, you can write a very elegant solution - check out the sample at http://blog.paulbetts.org/index.php/2010/11/16/making-async-io-work-for-you-reactive-style/ (this uses C#, but using F# is easy too; the key is using the Begin/End methods instead of the sync method, which even if you can make it compile, it will block up n ThreadPool threads unnecessarily, instead of the Threadpool just picking up completion routines as they come in)

like image 33
Ana Betts Avatar answered Nov 15 '22 20:11

Ana Betts