Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

f# mailboxprocessor - replying without waiting for delivery

I'm using an agent (MailboxProcessor) to do some stateful processing where a response is needed.

  • The caller posts a message using MailboxProcessor.PostAndAsyncReply
  • Within the agent, a response is given with AsyncReplyChannel.Reply

However, I've discovered by poking around the f# source code that the agent will not process the next message until the the response is delivered. This is a good thing in general. But in my case, it is more desirable for the agent to keep processing messages than to wait for response delivery.

Is it problematic to do something like this to deliver the response? (Or is there a better alternative?)

async { replyChannel.Reply response } |> Async.Start

I realize that this method does not guarantee that responses will be delivered in order. I'm okay with that.

Reference example

// agent code
let doWork data =
    async { ... ; return response }

let rec loop ( inbox : MailboxProcessor<_> ) =
    async {
        let! msg = inbox.Receive()
        match msg with
        | None ->
            return ()

        | Some ( data, replyChannel ) ->
            let! response = doWork data
            replyChannel.Reply response (* waits for delivery, vs below *)
            // async { replyChannel.Reply response } |> Async.Start
            return! loop inbox
    }

let agent =
    MailboxProcessor.Start(loop)

// caller code
async {
    let! response =
        agent.PostAndAsyncReply(fun replyChannel -> Some (data, replyChannel))
    ...
}
like image 482
Kasey Speakman Avatar asked Feb 08 '17 16:02

Kasey Speakman


1 Answers

FSharp.Control.AsyncSeq puts a friendlier face on top of mailbox processor. Async Sequences are a bit easier to follow, however the default implement mapping parallel has the same issue as described, waiting for the prior element in the sequence to be mapped to retain the order.

So I made a new function taht is just the original AsyncSeq.mapAsyncParallel, modified so that it no longer is a true map, since it's unordered, but it does map everything and the lazy seq does progress as work completes.

Full Source for AsyncSeq.mapAsyncParallelUnordered

let mapAsyncParallelUnordered (f:'a -> Async<'b>) (s:AsyncSeq<'a>) : AsyncSeq<'b> = asyncSeq {
  use mb = MailboxProcessor.Start (fun _ -> async.Return())
  let! err =
    s 
    |> AsyncSeq.iterAsyncParallel (fun a -> async {
      let! b = f a
      mb.Post (Some b) })
    |> Async.map (fun _ -> mb.Post None)
    |> Async.StartChildAsTask
  yield! 
    AsyncSeq.replicateUntilNoneAsync (Task.chooseTask (err |> Task.taskFault) (async.Delay mb.Receive))
  }

Below is an example of how I use it in a tool that uses SSLlabs free and very slow api that can easily get overloaded. parallelProcessHost returns an lazy AsyncSeq that is generated by the webapi requests, So AsyncSeq.mapAsyncParallelUnordered AsyncSeq.toListAsync actually runs the requests and allows the console to printout the results as the come in, independent of the order sent.

Full Source

let! es = 
    hosts
    |> Seq.indexed
    |> AsyncSeq.ofSeq
    |> AsyncSeq.map parallelProcessHost
    |> AsyncSeq.mapAsyncParallelUnordered AsyncSeq.toListAsync
    |> AsyncSeq.indexed
    |> AsyncSeq.map (fun (i, tail) -> (consoleN "-- %d of %i --- %O --" (i+1L) totalHosts (DateTime.UtcNow - startTime)) :: tail )
    |> AsyncSeq.collect AsyncSeq.ofSeq
    |> AsyncSeq.map stdoutOrStatus //Write out to console
    |> AsyncSeq.fold (|||) ErrorStatus.Okay
like image 121
jbtule Avatar answered Oct 05 '22 18:10

jbtule