Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Buffer group by groups with Reactive Extensions, nested subscribe

I have an event source that generates events that belong to certain groups. I would like to buffer these groups and send the groups (in batches) to storage. So far I have this:

eventSource
    .GroupBy(event => event.GroupingKey)
    .Select(group => new { group.Key, Events = group })
    .Subscribe(group => group.Events
                            .Buffer(TimeSpan.FromSeconds(60), 100)
                            .Subscribe(list => SendToStorage(list)));

So there's a nested subscribe to the events in a group. Somehow I think there's a better way but I haven't been able to figure it out yet.

like image 690
Ronald Wildenberg Avatar asked Feb 24 '17 07:02

Ronald Wildenberg


Video Answer


2 Answers

Here's the solution:

eventSource
    .GroupBy(e => e.GroupingKey)
    .SelectMany(group => group.Buffer(TimeSpan.FromSeconds(60), 100))
    .Subscribe(list => SendToStorage(list));

Here's a couple general rules that can help you 'reduce':

1) A nested subscription is normally fixed with Selecting everything before the nested subscription followed by a Merge, then followed by the nested subscription. So applying that, you get this:

eventSource
    .GroupBy(e => e.GroupingKey)
    .Select(group => new { group.Key, Events = group })
    .Select(group => group.Events.Buffer(TimeSpan.FromSeconds(60), 100)) //outer subscription selector
    .Merge()
    .Subscribe(list => SendToStorage(list));

2) You can obviously combine two consecutive selects (and since you're not doing anything with the anonymous object, can just remove that):

eventSource
    .GroupBy(e => e.GroupingKey)
    .Select(group => group.Buffer(TimeSpan.FromSeconds(60), 100)) 
    .Merge()
    .Subscribe(list => SendToStorage(list));

3) Finally, a Select followed by a Merge can be reduced to a SelectMany:

eventSource
    .GroupBy(e => e.GroupingKey)
    .SelectMany(group => group.Buffer(TimeSpan.FromSeconds(60), 100))
    .Subscribe(list => SendToStorage(list));
like image 147
Shlomo Avatar answered Sep 28 '22 23:09

Shlomo


Here is one way to do it

(from g in eventSource.GroupByUntil(e => e.GroupingKey,
                                    g => g.Buffer(TimeSpan.FromSeconds(60), 100))
 from b in g.ToList()
 select b).Subscribe(SendToStorage);
like image 31
supertopi Avatar answered Sep 28 '22 22:09

supertopi