I'm using an agent (MailboxProcessor) to do some stateful processing where a response is needed.
MailboxProcessor.PostAndAsyncReply
AsyncReplyChannel.Reply
However, I've discovered by poking around the f# source code that the agent will not process the next message until the the response is delivered. This is a good thing in general. But in my case, it is more desirable for the agent to keep processing messages than to wait for response delivery.
Is it problematic to do something like this to deliver the response? (Or is there a better alternative?)
async { replyChannel.Reply response } |> Async.Start
I realize that this method does not guarantee that responses will be delivered in order. I'm okay with that.
Reference example
// agent code
let doWork data =
async { ... ; return response }
let rec loop ( inbox : MailboxProcessor<_> ) =
async {
let! msg = inbox.Receive()
match msg with
| None ->
return ()
| Some ( data, replyChannel ) ->
let! response = doWork data
replyChannel.Reply response (* waits for delivery, vs below *)
// async { replyChannel.Reply response } |> Async.Start
return! loop inbox
}
let agent =
MailboxProcessor.Start(loop)
// caller code
async {
let! response =
agent.PostAndAsyncReply(fun replyChannel -> Some (data, replyChannel))
...
}
FSharp.Control.AsyncSeq puts a friendlier face on top of mailbox processor. Async Sequences are a bit easier to follow, however the default implement mapping parallel has the same issue as described, waiting for the prior element in the sequence to be mapped to retain the order.
So I made a new function taht is just the original AsyncSeq.mapAsyncParallel, modified so that it no longer is a true map, since it's unordered, but it does map everything and the lazy seq does progress as work completes.
Full Source for AsyncSeq.mapAsyncParallelUnordered
let mapAsyncParallelUnordered (f:'a -> Async<'b>) (s:AsyncSeq<'a>) : AsyncSeq<'b> = asyncSeq {
use mb = MailboxProcessor.Start (fun _ -> async.Return())
let! err =
s
|> AsyncSeq.iterAsyncParallel (fun a -> async {
let! b = f a
mb.Post (Some b) })
|> Async.map (fun _ -> mb.Post None)
|> Async.StartChildAsTask
yield!
AsyncSeq.replicateUntilNoneAsync (Task.chooseTask (err |> Task.taskFault) (async.Delay mb.Receive))
}
Below is an example of how I use it in a tool that uses SSLlabs free and very slow api that can easily get overloaded. parallelProcessHost
returns an lazy AsyncSeq
that is generated by the webapi requests, So AsyncSeq.mapAsyncParallelUnordered AsyncSeq.toListAsync
actually runs the requests and allows the console to printout the results as the come in, independent of the order sent.
Full Source
let! es =
hosts
|> Seq.indexed
|> AsyncSeq.ofSeq
|> AsyncSeq.map parallelProcessHost
|> AsyncSeq.mapAsyncParallelUnordered AsyncSeq.toListAsync
|> AsyncSeq.indexed
|> AsyncSeq.map (fun (i, tail) -> (consoleN "-- %d of %i --- %O --" (i+1L) totalHosts (DateTime.UtcNow - startTime)) :: tail )
|> AsyncSeq.collect AsyncSeq.ofSeq
|> AsyncSeq.map stdoutOrStatus //Write out to console
|> AsyncSeq.fold (|||) ErrorStatus.Okay
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With