Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Mixing IObservable and Async<'a> in F#

I have an IObservable provided by a library, which listens to events from an external service:

let startObservable () : IObservable<'a> = failwith "Given"

For each received event I want to execute an action which returns Async:

let action (item: 'a) : Async<unit> = failwith "Given"

I'm trying to implement a processor in lines of

let processor () : Async<unit> =
    startObservable()
    |> Observable.mapAsync action
    |> Async.AwaitObservable

I've made up mapAsync and AwaitObservable: ideally they would be provided by some library, which I'm failing to find so far.

Extra requirements:

  • Actions should be executed sequentially, so subsequent events get buffered while a previous event is handled.

  • If an action throws an error, I want my processor to complete. Otherwise, it never completes.

  • Cancellation token passed via Async.Start should be respected.

Any hints about the library that I should be using?

like image 731
Mikhail Shilkov Avatar asked Dec 02 '25 22:12

Mikhail Shilkov


1 Answers

Since you want to convert a push-based model (IObservable<>) into pull-based (Async<>), you'll need to queue to buffer data coming from observable. If queue is size-limited - which tbh. should be to to make entire pipeline safe to not overflood the memory - then also a strategy for buffer overflow is needed.

  1. One way is to implement a MailboxProcessor<> and custom observable, which would Post data to it. Since MP is a native F# actor implementation, it's able to make ordered processing with queue for buffering spikes.
  2. Another option is to use FSharp.Control.AsyncSeq (and specifically AsyncSeq.ofObservableBuffered function) which will turn observable into pull-based async enumerable - underneath it uses mailbox processor from 1st point:

    startObservable()
    |> AsyncSeq.ofObservableBuffered
    |> AsyncSeq.iterAsync action
    
like image 167
Bartosz Sypytkowski Avatar answered Dec 05 '25 00:12

Bartosz Sypytkowski



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!