Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Rx Observable Window with closing function with parameter

I'm trying to separate observable into windows (or for my purposes also Buffers are fine) while being able to close windows/buffers at custom location.

E.g. I have an observable which produces integers starting at 1 and moving up. I want to close a window at each number which is divisible by 7. My closing function would need to take in the item as parameter in that case.

There is an overload of Window method:

Window<TSource, TWindowClosing>(IObservable<TSource>, Func<IObservable<TWindowClosing>>)

Either it cant be done using this overload, or I can't wrap my head around it. Documentation describes that it does exactly what I want but does not show an example. Also, it shows an example of non-deterministic closing, which depends on timing when closing observable collection emits items.

The Window operator breaks up an observable sequence into consecutive non-overlapping windows. The end of the current window and start of the next window is controlled by an observable sequence which is the result of the windowClosingSelect function which is passed as an input parameter to the operator. The operator could be used to group a set of events into a window. For example, states of a transaction could be the main sequence being observed. Those states could include: Preparing, Prepared, Active, and Committed/Aborted. The main sequence could include all of those states are they occur in that order. The windowClosingSelect function could return an observable sequence that only produces a value on the Committed or Abort states. This would close the window that represented transaction events for a particular transaction.

I'm thinking something like following would do the job, but I'd have to implement it myself:

Window<TSource, TWindowClosing>(IObservable<TSource>, Func<TSource, bool>)
  • Is such windowing possible with built-in functions (I know I can build one myself)?
  • Is it possible to close a window based on emitted item or only non-deterministically, once an item is emitted from windowing observable?
like image 300
Nikola Radosavljević Avatar asked Feb 13 '23 02:02

Nikola Radosavljević


1 Answers

Use the original sequence with a Where clause as your closing sequence. If your source sequence is cold, then make use of Publish and RefCount to make it work correctly.

var source = ...;
var sharedSource = source.Publish().RefCount();
var closingSignal = sharedSource.Where(i => (i % 7) == 0);
var windows = sharedSource.Window(() => closingSignal);
like image 99
Brandon Avatar answered Feb 14 '23 15:02

Brandon