Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

In Rx, how to group latest items after a period of time?

Sorry if the title isn't very clear, I couldn't think of anything better...

I'm receiving user input in the form of an IObservable<char>, and I'd like to transform it to an IObservable<char[]>, by grouping the chars every time the user stops typing for more than 1 second. So, for instance, if the input is as follows:

h
e
l
l
o
(pause)
w
o
r
l
d
(pause)
!
(pause)

I'd like the output observable to be:

['h', 'e', 'l', 'l', 'o']
['w', 'o', 'r', 'l', 'd']
['!']

I suspect the solution is fairly simple, but I can't find the right approach... I tried to use Buffer, GroupByUntil, Throttle and a few others, to no avail.

Any idea would be welcome!


EDIT: I've got something that almost works:

        _input.Buffer(() => _input.Delay(TimeSpan.FromSeconds(1)))
              .ObserveOnDispatcher()
              .Subscribe(OnCompleteInput);

But I need the delay to be reset every time a new character is typed...

like image 649
Thomas Levesque Avatar asked Apr 02 '12 23:04

Thomas Levesque


People also ask

How does RX scheduling work?

Rx introduces a very handy mechanism for introducing concurrency and multithreading to your code: Scheduling. In the Rx world, there are generally two things you want to control the concurrency model for: As you could probably guess, these are exposed via two extension methods to IObservable<T> called SubscribeOn and ObserveOn.

What is the difference between events and RX?

Like events, Rx is just a way of chaining callbacks together for a given notification. While Rx is a free-threaded model, this does not mean that subscribing or calling OnNext will introduce multi-threading to your sequence. Being free-threaded means that you are not restricted to which thread you choose to do your work.

Why Rx for data recovery?

Fast moving data can be too overwhelming for batch processing systems and human consumption. Rx provides the ability to aggregate and partition on the fly, enabling real-time reporting without the need for expensive CEP or OLAP products.

What is RX and why should I use it?

Unlike simple recursive methods in C#, we are also protected from stack overflows, as Rx provides an extra level of abstraction. Indeed, Rx takes our recursive method and transforms it to a loop structure instead.


1 Answers

Buffer and Throttle would be enough, if your source is hot. To make it hot, you can use .Publish().RefCount() to ensure you only end up with one subscription to the source.

IObservable<IList<T>> BufferWithInactivity<T>(this IObservable<T> source,
                                              TimeSpan dueTime)
{
    if (source == null) throw new ArgumentNullException("source");
    //defer dueTime checking to Throttle
    var hot = source.Publish().RefCount();
    return hot.Buffer(() => hot.Throttle(dueTime));
}
like image 93
Gideon Engelberth Avatar answered Sep 29 '22 02:09

Gideon Engelberth