Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Rx.NET GroupByUntil group termination, wait for thread completion

I have infinite stream of objects. And my requirement is that every item from the observable stream with the same key should be processed synchronously, and all the other items with different keys might/should process in parallel. The easiest way to do it (as mentioned in most places) is by using GroupByUntil operator:

var results = observableStream
    .GroupByUntil(item => item.Id, group =>
        group.Throttle(TimeSpan.FromSeconds(30), scheduler))
    .SelectMany(group =>
        group
            .ObserveOn(scheduler)
            .Select(item => ProcessItem(item)));

var disposable = results.Subscribe(result => SaveResults(result));

The code works well until I can guarantee that execution of ProcessItem(item) takes less than 30 seconds. Otherwise group.Throttle(TimeSpan.FromSeconds(30), scheduler) will close the group's stream and there's a very high probability that new item arrives and starts processing on a new thread.

So basically I need to somehow know that my thread has completed processing all the items with specific key and I need to inform within durationSelector of GroupByUntil operator parameter about it.

Any ideas on how to achieve this? Thanks in advance.

like image 524
Azat Avatar asked Jun 05 '26 10:06

Azat


1 Answers

This is very similar to this question: A way to push buffered events in even intervals.

Form the answer to that question, there's an operator Drain:

public static class ObservableDrainExtensions
{
    public static IObservable<TOut> Drain<TSource, TOut>(this IObservable<TSource> source, 
        Func<TSource, IObservable<TOut>> selector)
    {
        return Observable.Defer(() =>
        {
            BehaviorSubject<Unit> queue = new BehaviorSubject<Unit>(new Unit());

            return source
                .Zip(queue, (v, q) => v)
                .SelectMany(v => selector(v)
                    .Do(_ => { }, () => queue.OnNext(new Unit()))
                );
        });
    }
}

Given that operator, your problem becomes very simple:

var results = observableStream
    .GroupBy(item => item.Id)
    .SelectMany(group =>
        group
            .ObserveOn(scheduler)
            .Drain(item => ProcessItem(item)));

var disposable = results.Subscribe(result => SaveResults(result));

Given a stream that looks like A1, A2, B1, A3, B2, C1, B3, C2, GroupBy separates the streams by IDs:

A: A1, A2, A3
B: B1, B2, B3
C: C1, C2

...and Drain makes sure that for the items in a given sub-stream, they run serially, not in parallel.

like image 109
Shlomo Avatar answered Jun 07 '26 23:06

Shlomo