I have two streams. One is a flow of data (could be any type), the other is a boolean stream acting as a gate. I need to combine these into a stream that has the following behaviour:
I am not really sure how to put this together. The inputs I have been testing with are like this:
// a demo data stream that emits every second
var dataStream = Observable.Interval(TimeSpan.FromSeconds(1));
// a demo flag stream that toggles every 5 seconds
var toggle = false;
var gateStream = Observable.Interval(TimeSpan.FromSeconds(5))
.Select(_ => toggle = !toggle);
I would do this as follows:
DistinctUntilChanged
on the gate stream to ensure no repeated valuesSelect
that gives each element an index number. With this we can tell if we need to buffer or just emit the window as is, because we know that even-numbered windows are for buffering (because we made sure the gate stream starts with false)ToList()
to buffer each even window until it closes - this is the actually equivalent of a Buffer()
that waits until OnCompleted
SelectMany
to flatten the buffered windowsIt looks like this:
dataStream.Window(gateStream.StartWith(false).DistinctUntilChanged())
.Select((w, i) => i % 2 == 0 ? w.ToList().SelectMany(x => x) : w)
.Concat()
.Subscribe(Console.WriteLine);
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With