Sorry if the title isn't very clear, I couldn't think of anything better...
I'm receiving user input in the form of an IObservable<char>
, and I'd like to transform it to an IObservable<char[]>
, by grouping the chars every time the user stops typing for more than 1 second. So, for instance, if the input is as follows:
h
e
l
l
o
(pause)
w
o
r
l
d
(pause)
!
(pause)
I'd like the output observable to be:
['h', 'e', 'l', 'l', 'o']
['w', 'o', 'r', 'l', 'd']
['!']
I suspect the solution is fairly simple, but I can't find the right approach... I tried to use Buffer
, GroupByUntil
, Throttle
and a few others, to no avail.
Any idea would be welcome!
EDIT: I've got something that almost works:
_input.Buffer(() => _input.Delay(TimeSpan.FromSeconds(1)))
.ObserveOnDispatcher()
.Subscribe(OnCompleteInput);
But I need the delay to be reset every time a new character is typed...
Rx introduces a very handy mechanism for introducing concurrency and multithreading to your code: Scheduling. In the Rx world, there are generally two things you want to control the concurrency model for: As you could probably guess, these are exposed via two extension methods to IObservable<T> called SubscribeOn and ObserveOn.
Like events, Rx is just a way of chaining callbacks together for a given notification. While Rx is a free-threaded model, this does not mean that subscribing or calling OnNext will introduce multi-threading to your sequence. Being free-threaded means that you are not restricted to which thread you choose to do your work.
Fast moving data can be too overwhelming for batch processing systems and human consumption. Rx provides the ability to aggregate and partition on the fly, enabling real-time reporting without the need for expensive CEP or OLAP products.
Unlike simple recursive methods in C#, we are also protected from stack overflows, as Rx provides an extra level of abstraction. Indeed, Rx takes our recursive method and transforms it to a loop structure instead.
Buffer
and Throttle
would be enough, if your source is hot. To make it hot, you can use .Publish().RefCount()
to ensure you only end up with one subscription to the source.
IObservable<IList<T>> BufferWithInactivity<T>(this IObservable<T> source,
TimeSpan dueTime)
{
if (source == null) throw new ArgumentNullException("source");
//defer dueTime checking to Throttle
var hot = source.Publish().RefCount();
return hot.Buffer(() => hot.Throttle(dueTime));
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With