I have an IObservable provided by a library, which listens to events from an external service:
let startObservable () : IObservable<'a> = failwith "Given"
For each received event I want to execute an action which returns Async:
let action (item: 'a) : Async<unit> = failwith "Given"
I'm trying to implement a processor in lines of
let processor () : Async<unit> =
startObservable()
|> Observable.mapAsync action
|> Async.AwaitObservable
I've made up mapAsync and AwaitObservable: ideally they would be provided by some library, which I'm failing to find so far.
Extra requirements:
Actions should be executed sequentially, so subsequent events get buffered while a previous event is handled.
If an action throws an error, I want my processor to complete. Otherwise, it never completes.
Cancellation token passed via Async.Start should be respected.
Any hints about the library that I should be using?
Since you want to convert a push-based model (IObservable<>) into pull-based (Async<>), you'll need to queue to buffer data coming from observable. If queue is size-limited - which tbh. should be to to make entire pipeline safe to not overflood the memory - then also a strategy for buffer overflow is needed.
MailboxProcessor<> and custom observable, which would Post data to it. Since MP is a native F# actor implementation, it's able to make ordered processing with queue for buffering spikes.Another option is to use FSharp.Control.AsyncSeq (and specifically AsyncSeq.ofObservableBuffered function) which will turn observable into pull-based async enumerable - underneath it uses mailbox processor from 1st point:
startObservable()
|> AsyncSeq.ofObservableBuffered
|> AsyncSeq.iterAsync action
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With