Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Global state and Async Workflows in F#

Tags:

f#

A common example used to illustrate asynchronous workflows in F# is retrieving multiple webpages in parallel. One such example is given at: http://en.wikibooks.org/wiki/F_Sharp_Programming/Async_Workflows Code shown here in case the link changes in the future:

open System.Text.RegularExpressions
open System.Net

let download url =
    let webclient = new System.Net.WebClient()
    webclient.DownloadString(url : string)

let extractLinks html = Regex.Matches(html, @"http://\S+")

let downloadAndExtractLinks url =
    let links = (url |> download |> extractLinks)
    url, links.Count

let urls =
     [@"http://www.craigslist.com/";
     @"http://www.msn.com/";
     @"http://en.wikibooks.org/wiki/Main_Page";
     @"http://www.wordpress.com/";
     @"http://news.google.com/";]

let pmap f l =
    seq { for a in l -> async { return f a } }
    |> Async.Parallel
    |> Async.Run

let testSynchronous() = List.map downloadAndExtractLinks urls
let testAsynchronous() = pmap downloadAndExtractLinks urls

let time msg f =
    let stopwatch = System.Diagnostics.Stopwatch.StartNew()
    let temp = f()
    stopwatch.Stop()
    printfn "(%f ms) %s: %A" stopwatch.Elapsed.TotalMilliseconds msg temp

let main() =
    printfn "Start..."
    time "Synchronous" testSynchronous
    time "Asynchronous" testAsynchronous
    printfn "Done."

main()

What I would like to know is how one should handle changes in global state such as loss of a network connection? Is there an elegant way to do this?

One could check the state of the network prior to making the Async.Parallel call, but the state could change during execution. Assuming what one wanted to do was pause execution until the network was available again rather than fail, is there a functional way to do this?

like image 704
JonnyBoats Avatar asked Apr 07 '13 01:04

JonnyBoats


1 Answers

First of all, there is one issue with the example - it uses Async.Parallel to run multiple operations in parallel but the operations themselves are not implemented as asynchronous, so this will not avoid blocking excessive number of threads in the thread pool.

Asynchronous. To make the code fully asynchronous, the download and downloadAndExtractLinks functions should be asynchronous too, so that you can use AsyncDownloadString of the WebClient:

let asyncDownload url = async {
    let webclient = new System.Net.WebClient()
    return! webclient.AsyncDownloadString(System.Uri(url : string)) }

let asyncDownloadAndExtractLinks url = async {
    let! html = asyncDownload url
    let links = extractLinks html
    return url, links.Count }

let pmap f l =
    seq { for a in l -> async { return! f a } }
    |> Async.Parallel
    |> Async.RunSynchronously

Retrying. Now, to answer the question - there is no built-in mechanism for handling of errors such as network failure, so you will need to implement this logic yourself. What is the right approach depends on your situation. One common approach is to retry the operation certain number of times and throw the exception only if it does not succeed e.g. 10 times. You can write this as a primitive that takes other asynchronous workflow:

let rec asyncRetry times op = async {
  try
    return! op
  with e ->
    if times <= 1 then return (reraise e)
    else return! asyncRetry (times - 1) op }

Then you can change the main function to build a workflow that retries the download 10 times:

let testAsynchronous() = 
  pmap (asyncRetry 10 downloadAndExtractLinks) urls

Shared state. Another problem is that Async.Parallel will only return once all the downloads have completed (if there is one faulty web site, you will have to wait). If you want to show the results as they come back, you will need something more sophisticated.

One nice way to do this is to use F# agent - create an agent that stores the results obtained so far and can handle two messages - one that adds new result and another that returns the current state. Then you can start multiple async tasks that will send the result to the agent and, in a separate async workflow, you can use polling to check the current status (and e.g. update the user interface).

I wrote a MSDN series about agents and also two articles for developerFusion that have a plenty of code samples with F# agents.

like image 122
Tomas Petricek Avatar answered Sep 28 '22 04:09

Tomas Petricek