Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

"Chaining" asynchronous functions in F#

I have created a function in F# to recover historical data from Yahoo (the classic asynchronous example for F#):

let getCSV ticker dStart dEnd =
async   {
        let query = getFileUrl ticker dStart dEnd
        let req = WebRequest.Create(query)
        use! resp = req.AsyncGetResponse()
        use stream= resp.GetResponseStream()
        use reader = new StreamReader(stream)
        let content = reader.ReadToEnd()
        let ts = parseData content
        return ts
        }

Now, I can run this function asynchronously by doing the following:

let test=
    ["MSFT";"YHOO"]
    |>List.map (fun x -> getCSV x (DateTime.Parse("01.01.2000")) (DateTime.Parse("01.01.2010")))
    |> Async.Parallel
    |> Async.RunSynchronously

Ok that's cool.

Now, what I would like to know is how to apply some function to this which is the history of prices:

For example:

let getReturns (prices:(DateTime *float)list) =
    [for i in 1..(prices.Length-1) -> i]
    |> List.map (fun i ->(fst (List.nth prices i), (snd (List.nth prices i))/(snd (List.nth prices (i-1) )) - 1.0))

So the trivial way of doing it is:

let test2=
    ["MSFT";"YHOO"]
    |>List.map (fun x -> getCSV x (DateTime.Parse("01.01.2000")) (DateTime.Parse("01.01.2010")))
    |> Async.Parallel
    |> Async.RunSynchronously
    |> Array.map getReturns;;

However, the getReturns function is executed once every file is downloaded and parsed.

What I would like to know, is if it is possible to start execution the second function while the downloads are still taking place: once MSFT is done, no need to wait until YHOO is done to compute its return...

I know that I could modify getCSV but I would like to know if there is a way to "chain" the getReturn function without having to change a previously written module...

like image 530
SRKX Avatar asked Dec 04 '22 21:12

SRKX


2 Answers

I would typically write the call to the function directly inside an asynchronous workflow. This is mostly a matter of style or preference - I think that code written using asynchronous workflows is generally more explicit and doesn't use higher-order functions as often (though they're still sometimes useful):

let test=
    [ for stock in ["MSFT";"YHOO"] ->
        async { let! data = getCSV stock (DateTime(2000, 1, 1)) (DateTime(2010, 1, 1))
                return getReturns data } ]
    |> Async.Parallel
    |> Async.RunSynchronously 

This means that the workflows executed in parallel first get the data and then call getRteurns to extract the data. The entire operation is then parallelized.

Alternatively, you could either use Joel's solution (modify the getReturns function so that it takes an asynchronous workflow and returns an asynchronous workflow) or define a function Async.map that takes an asynchronous workflow and constructs a new one that applies some function to the result.

Using your original getReturns function, you can then write:

let test=
    ["MSFT";"YHOO"]
    // For every stock name, generate an asynchronous workflow
    |> List.map (fun x -> getCSV x (DateTime(2000, 1, 1)) (DateTime(2010, 1, 1)))
    // For every workflow, transform it into a workflow that 
    // applies 'getReturns' to the result of the original workflow
    |> List.map (Async.map getReturns)
    // Run them all in parallel
    |> Async.Parallel
    |> Async.RunSynchronously

The definition of Async.map is quite simple:

module Async =
  let map f workflow = async {
    let! res = workflow
    return f res }
like image 200
Tomas Petricek Avatar answered Jan 10 '23 06:01

Tomas Petricek


If you defined your getReturns function like this...

let getReturns (prices:Async<(DateTime * float) list>) = async {
    let! prices = prices
    return [for i in 1..(prices.Length-1) -> i]
           |> List.map (fun i ->(fst (List.nth prices i), (snd (List.nth prices i))/(snd (List.nth prices (i-1)))))
}

Then you would be able to do this:

let test=
    ["MSFT";"YHOO"]
    |> List.map (fun x -> getCSV x (DateTime(2000, 1, 1)) (DateTime(2010, 1, 1)))
    |> List.map getReturns
    |> Async.Parallel
    |> Async.RunSynchronously

You could clean it up further by changing getCSV so that ticker is the last parameter instead of the first. This allows you to partially apply the date arguments to produce a function that only requires a ticker to execute. Then you can chain that function with getReturns.

let test =
    let getRange = getCSV (DateTime(2000, 1, 1)) (DateTime(2010, 1, 1))
    ["MSFT"; "YHOO"]
    |> List.map (getRange >> getReturns)
    |> Async.Parallel
    |> Async.RunSynchronously

Edit:

All those List.nth calls in your getReturns function make me itchy. I'd rather use pattern-matching myself. I think you could write that function like this instead:

let getReturns2 (prices: Async<(DateTime * float) list>) = async {
    let! prices = prices
    let rec loop items output =
        match items with
        | (_, last) :: (time, current) :: rest ->
            loop rest ((time, (last / current)) :: output)
        | [ item ] ->
            List.rev (item :: output)
        | [] ->
            List.rev output
    return loop prices []
}
like image 32
Joel Mueller Avatar answered Jan 10 '23 05:01

Joel Mueller