I'm trying to separate observable into windows (or for my purposes also Buffers are fine) while being able to close windows/buffers at custom location.
E.g. I have an observable which produces integers starting at 1 and moving up. I want to close a window at each number which is divisible by 7. My closing function would need to take in the item as parameter in that case.
There is an overload of Window
method:
Window<TSource, TWindowClosing>(IObservable<TSource>, Func<IObservable<TWindowClosing>>)
Either it cant be done using this overload, or I can't wrap my head around it. Documentation describes that it does exactly what I want but does not show an example. Also, it shows an example of non-deterministic closing, which depends on timing when closing observable collection emits items.
The Window operator breaks up an observable sequence into consecutive non-overlapping windows. The end of the current window and start of the next window is controlled by an observable sequence which is the result of the windowClosingSelect function which is passed as an input parameter to the operator. The operator could be used to group a set of events into a window. For example, states of a transaction could be the main sequence being observed. Those states could include: Preparing, Prepared, Active, and Committed/Aborted. The main sequence could include all of those states are they occur in that order. The windowClosingSelect function could return an observable sequence that only produces a value on the Committed or Abort states. This would close the window that represented transaction events for a particular transaction.
I'm thinking something like following would do the job, but I'd have to implement it myself:
Window<TSource, TWindowClosing>(IObservable<TSource>, Func<TSource, bool>)
Use the original sequence with a Where
clause as your closing sequence. If your source sequence is cold, then make use of Publish
and RefCount
to make it work correctly.
var source = ...;
var sharedSource = source.Publish().RefCount();
var closingSignal = sharedSource.Where(i => (i % 7) == 0);
var windows = sharedSource.Window(() => closingSignal);
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With