Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

F# Async hanging

I have some fairly straightforward F# async code to download a hundred random articles off of Wikipedia (for research).

For some reason, the code hangs at arbitrary points in time during the download. Sometimes it's after 50, sometimes it's after 80.

The async code itself is fairly straightforward:

let parseWikiAsync(url:string, count:int ref) =
    async {
            use wc = new WebClientWithTimeout(Timeout = 5000)
            let! html = wc.AsyncDownloadString(Uri(url))
            let ret =
                try html |> parseDoc |> parseArticle
                with | ex -> printfn "%A" ex; None
            lock count (fun () ->
                if !count % 10 = 0 then
                    printfn "%d" !count
                count := !count + 1
            )
            return ret
    }

Because I couldn't figure out through fsi what the problem was, I made WebClientWithTimeout, a System.Net.WebClient wrapper that allows me to specify a timeout:

type WebClientWithTimeout() =
    inherit WebClient()
    member val Timeout = 60000 with get, set

    override x.GetWebRequest uri =
        let r = base.GetWebRequest(uri)
        r.Timeout <- x.Timeout
        r

And then I use the async combinators to retrieve just over a hundred pages, and weed out all the articles that return parseWikiAsync calls that return None (most of which are "disambiguation pages") until I have exactly 100 articles:

let en100 =
    let count = ref 0
    seq { for _ in 1..110 -> parseWikiAsync("http://en.wikipedia.org/wiki/Special:Random", count) }
    |> Async.Parallel
    |> Async.RunSynchronously
    |> Seq.choose id
    |> Seq.take 100

When I compile the code and run it in the debugger, there are only three threads, of which only one is running actual code -- the Async pipeline. The other two have "not available" for location, and nothing in the call stack.

Which I think means that it's not stuck in AsyncDownloadString or in anywhere in parseWikiAsync. What else could be causing this?

Oh, also, initially it takes about a full minute before the async code actually starts. After that it goes at a fairly reasonable pace until it hangs again indefinitely.

Here's the call stack for the main thread:

>   mscorlib.dll!System.Threading.WaitHandle.InternalWaitOne(System.Runtime.InteropServices.SafeHandle waitableSafeHandle, long millisecondsTimeout, bool hasThreadAffinity, bool exitContext) + 0x22 bytes 
    mscorlib.dll!System.Threading.WaitHandle.WaitOne(int millisecondsTimeout, bool exitContext) + 0x28 bytes    
    FSharp.Core.dll!Microsoft.FSharp.Control.AsyncImpl.ResultCell<Microsoft.FSharp.Control.AsyncBuilderImpl.Result<Microsoft.FSharp.Core.FSharpOption<Program.ArticleData>[]>>.TryWaitForResultSynchronously(Microsoft.FSharp.Core.FSharpOption<int> timeout) + 0x36 bytes  
    FSharp.Core.dll!Microsoft.FSharp.Control.CancellationTokenOps.RunSynchronously<Microsoft.FSharp.Core.FSharpOption<Program.ArticleData>[]>(System.Threading.CancellationToken token, Microsoft.FSharp.Control.FSharpAsync<Microsoft.FSharp.Core.FSharpOption<Program.ArticleData>[]> computation, Microsoft.FSharp.Core.FSharpOption<int> timeout) + 0x1ba bytes 
    FSharp.Core.dll!Microsoft.FSharp.Control.FSharpAsync.RunSynchronously<Microsoft.FSharp.Core.FSharpOption<Program.ArticleData>[]>(Microsoft.FSharp.Control.FSharpAsync<Microsoft.FSharp.Core.FSharpOption<Program.ArticleData>[]> computation, Microsoft.FSharp.Core.FSharpOption<int> timeout, Microsoft.FSharp.Core.FSharpOption<System.Threading.CancellationToken> cancellationToken) + 0xb9 bytes   
    WikiSurvey.exe!<StartupCode$WikiSurvey>.$Program.main@() Line 97 + 0x55 bytes   F#
like image 273
Rei Miyasaka Avatar asked Aug 29 '12 00:08

Rei Miyasaka


1 Answers

Wikipedia is not to blame here, it's a result of how Async.Parallel works internally. The type signature for Async.Parallel is seq<Async<'T>> -> Async<'T[]>. It returns a single Async value containing all of the results from the sequence -- so it doesn't return until all of the computations in the seq<Async<'T>> return.

To illustrate, I modified your code so it tracks the number of outstanding requests, i.e., requests which have been sent to the server, but have not yet received / parsed the response.

open Microsoft.FSharp.Control
open Microsoft.FSharp.Control.WebExtensions
open System
open System.Net
open System.Threading

type WebClientWithTimeout() =
    inherit WebClient()

    let mutable timeout = -1
    member __.Timeout
        with get () = timeout
        and set value = timeout <- value

    override x.GetWebRequest uri =
        let r = base.GetWebRequest(uri)
        r.Timeout <- x.Timeout
        r

type ParsedDoc = ParsedDoc
type ParsedArticle = ParsedArticle

let parseDoc (str : string) = ParsedDoc
let parseArticle (doc : ParsedDoc) = Some ParsedArticle

/// A synchronized wrapper around Console.Out so we don't
/// get garbled console output.
let synchedOut =
    System.Console.Out
    |> System.IO.TextWriter.Synchronized

let parseWikiAsync(url : string, outstandingRequestCount : int ref) =
    async {
    use wc = new WebClientWithTimeout(Timeout = 5000)
    wc.Headers.Add ("User-Agent", "Friendly Bot 1.0 ([email protected])")

    // Increment the outstanding request count just before we send the request.
    do
        // NOTE : The message must be created THEN passed to synchedOut.WriteLine --
        // piping it (|>) into synchedOut.WriteLine or using fprintfn causes a closure
        // to be created which somehow defeats the synchronization and garbles the output.
        let msg =
            Interlocked.Increment outstandingRequestCount
            |> sprintf "Outstanding requests: %i"
        synchedOut.WriteLine msg

    let! html = wc.AsyncDownloadString(Uri(url))
    let ret =
        try html |> parseDoc |> parseArticle
        with ex ->
            let msg = sprintf "%A" ex
            synchedOut.WriteLine msg
            None

    // Decrement the outstanding request count now that we've
    // received a reponse and parsed it.
    do
        let msg =
            Interlocked.Decrement outstandingRequestCount
            |> sprintf "Outstanding requests: %i"
        synchedOut.WriteLine msg

    return ret
    }

/// Writes a message to the console, passing a value through
/// so it can be used within a function pipeline.
let inline passThruWithMessage (msg : string) value =
    Console.WriteLine msg
    value

let en100 =
    let outstandingRequestCount = ref 0
    seq { for _ in 1..120 ->
            parseWikiAsync("http://en.wikipedia.org/wiki/Special:Random", outstandingRequestCount) }
    |> Async.Parallel
    |> Async.RunSynchronously
    |> passThruWithMessage "Finished running all of the requests."
    |> Seq.choose id
    |> Seq.take 100

If you compile and run that code, you'll see output like this:

Outstanding requests: 4
Outstanding requests: 2
Outstanding requests: 1
Outstanding requests: 3
Outstanding requests: 5
Outstanding requests: 6
Outstanding requests: 7
Outstanding requests: 8
Outstanding requests: 9
Outstanding requests: 10
Outstanding requests: 12
Outstanding requests: 14
Outstanding requests: 15
Outstanding requests: 16
Outstanding requests: 17
Outstanding requests: 18
Outstanding requests: 13
Outstanding requests: 19
Outstanding requests: 20
Outstanding requests: 24
Outstanding requests: 22
Outstanding requests: 26
Outstanding requests: 27
Outstanding requests: 28
Outstanding requests: 29
Outstanding requests: 30
Outstanding requests: 25
Outstanding requests: 21
Outstanding requests: 23
Outstanding requests: 11
Outstanding requests: 29
Outstanding requests: 28
Outstanding requests: 27
Outstanding requests: 26
Outstanding requests: 25
Outstanding requests: 24
Outstanding requests: 23
Outstanding requests: 22
Outstanding requests: 21
Outstanding requests: 20
Outstanding requests: 19
Outstanding requests: 18
Outstanding requests: 17
Outstanding requests: 16
Outstanding requests: 15
Outstanding requests: 14
Outstanding requests: 13
Outstanding requests: 12
Outstanding requests: 11
Outstanding requests: 10
Outstanding requests: 9
Outstanding requests: 8
Outstanding requests: 7
Outstanding requests: 6
Outstanding requests: 5
Outstanding requests: 4
Outstanding requests: 3
Outstanding requests: 2
Outstanding requests: 1
Outstanding requests: 0
Finished running all of the requests.

As you can see, all of the requests are made before any of them are parsed -- so if you're on a slower connection, or you're trying to retrieve a large number of documents, the server could be dropping the connection because it may assume you're not retrieving the response it's trying to send. Another issue with the code is that you need to explicitly specify the number of elements to generate in the seq, which makes the code less reusable.

A better solution would be to retrieve and parse the pages as they're needed by some consuming code. (And if you think about it, that's exactly what an F# seq is good for.) We'll start by creating a function that takes a Uri and produces a seq<Async<'T>> -- i.e., it produces an infinite sequence of Async<'T> values, each of which will retrieve the content from the Uri, parse it, and return the result.

/// Given a Uri, creates an infinite sequence of whose elements are retrieved
/// from the Uri.
let createDocumentSeq (uri : System.Uri) =
    #if DEBUG
    let outstandingRequestCount = ref 0
    #endif

    Seq.initInfinite <| fun _ ->
        async {
        use wc = new WebClientWithTimeout(Timeout = 5000)
        wc.Headers.Add ("User-Agent", "Friendly Bot 1.0 ([email protected])")

        #if DEBUG
        // Increment the outstanding request count just before we send the request.
        do
            // NOTE : The message must be created THEN passed to synchedOut.WriteLine --
            // piping it (|>) into synchedOut.WriteLine or using fprintfn causes a closure
            // to be created which somehow defeats the synchronization and garbles the output.
            let msg =
                Interlocked.Increment outstandingRequestCount
                |> sprintf "Outstanding requests: %i"
            synchedOut.WriteLine msg
        #endif

        let! html = wc.AsyncDownloadString uri
        let ret =
            try Some html
            with ex ->
                let msg = sprintf "%A" ex
                synchedOut.WriteLine msg
                None

        #if DEBUG
        // Decrement the outstanding request count now that we've
        // received a reponse and parsed it.
        do
            let msg =
                Interlocked.Decrement outstandingRequestCount
                |> sprintf "Outstanding requests: %i"
            synchedOut.WriteLine msg
        #endif

        return ret
        }

Now we use this function to retrieve the pages as a stream:

//
let en100_Streaming =
    #if DEBUG
    let documentCount = ref 0
    #endif

    Uri ("http://en.wikipedia.org/wiki/Special:Random")
    |> createDocumentSeq
    |> Seq.choose (fun asyncDoc ->
        Async.RunSynchronously asyncDoc
        |> Option.bind (parseDoc >> parseArticle))
    #if DEBUG
    |> Seq.map (fun x ->
        let msg =
            Interlocked.Increment documentCount
            |> sprintf "Parsed documents: %i"
        synchedOut.WriteLine msg
        x)
    #endif
    |> Seq.take 50
    // None of the computations actually take place until
    // this point, because Seq.toArray forces evaluation of the sequence.
    |> Seq.toArray

If you run that code, you'll see that it pulls the results one at a time from the server and doesn't leave outstanding requests hanging around. Also, it's very easy to change the number of results you want to retrieve -- all you need to do is change the value you pass to Seq.take.

Now while that streaming code works just fine, it doesn't execute the requests in parallel so it could be slow for large numbers of documents. This is an easy problem to fix, though the solution may be a little non-intuitive. Instead of trying to execute the entire sequence of requests in parallel -- which is the problem in the original code -- let's create a function which uses Async.Parallel to execute small batches of requests in parallel, then uses Seq.collect to combine the results back into a flat sequence.

/// Given a sequence of Async<'T>, creates a new sequence whose elements
/// are computed in batches of a specified size.
let parallelBatch batchSize (sequence : seq<Async<'T>>) =
    sequence
    |> Seq.windowed batchSize
    |> Seq.collect (fun batch ->
        batch
        |> Async.Parallel
        |> Async.RunSynchronously)

To utilize this function, we just need a few small tweaks to the code from the streaming version:

let en100_Batched =
    let batchSize = 10
    #if DEBUG
    let documentCount = ref 0
    #endif

    Uri ("http://en.wikipedia.org/wiki/Special:Random")
    |> createDocumentSeq
    // Execute batches in parallel
    |> parallelBatch batchSize
    |> Seq.choose (Option.bind (parseDoc >> parseArticle))
    #if DEBUG
    |> Seq.map (fun x ->
        let msg =
            Interlocked.Increment documentCount
            |> sprintf "Parsed documents: %i"
        synchedOut.WriteLine msg
        x)
    #endif
    |> Seq.take 50
    // None of the computations actually take place until
    // this point, because Seq.toArray forces evaluation of the sequence.
    |> Seq.toArray

Again, it's easy to change the number of documents you want to retrieve, and the batch size can easily be modified (again, I suggest you keep it reasonably small). If you wanted to, you could make a few tweaks to the 'streaming' and 'batching' code so you could switch between them at run-time.

One last thing -- with my code the requests shouldn't time-out, so you can probably get rid of the WebClientWithTimeout class and just use WebClient directly.

like image 68
Jack P. Avatar answered Sep 25 '22 19:09

Jack P.