Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to paste relative delay in Observable sequence with Rx (Reactive Extensions)

I'm starting the development with Reactive extensions (version 2.1, just in case) and for my sample application I need a sequence of int values pushed with some interval, i.e. every 1 second.

I know, that I can create a sequence with Observable.Range<int>(0,10) but I can't figure out how to set relative time between pushes. I've tried Delay() but shifts the sequence only once at the start.

I then found Observable.Generate() method that could be adjusted to this task in next way:

var delayed = Observable.
              Generate(0, i => i <= 10, i => i + 1, i => i,
                          i => TimeSpan.FromSeconds(1));

But that seems to be working only for simple 'for-each-like' defined sequences. So, in general, my question is, whether we can to get any source sequence and wrap it with some proxy that will pull messages from source and push it further with time delay?

S--d1--d2--d3--d4--d5-|
D--d1-delay-d2-delay-d3-delay-d4-delay-d5-|

P.S. If this approach contradicts to the concept of ReactiveExtensions, please also note this. I don't want to do it "by all means" and them get some other design problems in future.

P.P.S General Idea is to make sure that output sequence has a specified interval between events in spite of if the input sequence is finite or infinite and how often it pushes events.

like image 294
antonv Avatar asked Oct 05 '13 07:10

antonv


1 Answers

Observable.Interval is what you want to look at. It will generate a 0 based long value incrementing by 1 every interval of time that you specify e.g.:

Observable.Interval(TimeSpan.FromSeconds(1)).Subscribe(x => Console.WriteLine(x));

You can then use a projection (Select) to offset/change this value as needed.

You can also "pace" one stream with another by using the Zip operator - you may want to look at that too. Zip pairs events from two streams together, so it emits at the pace of the currently slowest stream. Zip is quite flexible too, it can zip any number of streams, or even zip an IObservable to an IEnumerable. Here's an example of that:

var pets = new List<string> { "Dog", "Cat", "Elephant" };
var pace = Observable.Interval(TimeSpan.FromSeconds(1))
    .Zip(pets, (n, p) => p)     
    .Subscribe(x => Console.WriteLine(x), () => Console.WriteLine("Done"));

This writes out pets at an interval of 1 second.

In light of the P.P.S. added above, I will give another answer - I'll leave this for reference as it is a useful technique anyway.

like image 77
James World Avatar answered Oct 07 '22 05:10

James World