Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Make Reactive Extensions Buffer wait for asynchronous operation to complete

I am using Reactive Extensions (Rx) to buffer some data. I'm having an issue though in that I then need to do something asynchronous with this data, yet I don't want the buffer to pass the next group through until the asynchronous operation is complete.

I've tried to structure the code two ways (contrived example):

public async Task processFiles<File>(IEnumerable<File> files)
{
    await files.ToObservable()
        .Buffer(10)
        .SelectMany(fi => fi.Select(f => upload(f)) //Now have an IObservable<Task>
        .Select(t => t.ToObservable())
        .Merge()
        .LastAsync();
}

public Task upload(File item)
{
    return Task.Run(() => { //Stuff });
}

or

public async Task processFiles<File>(IEnumerable<File> files)
{
    var buffered = files.ToObservable()
        .Buffer(10);

    buffered.Subscribe(async files => await Task.WhenAll(files.Select(f => upload(f)));

    await buffered.LastAsync();
}

public Task upload(File item)
{
    return Task.Run(() => { //Stuff });
}

Unfortunately, neither of these methods have worked as the buffer pushes the next group before the async operations complete. The intent is to have each buffered group executed asynchronously and only when that operation is complete, continue with the next buffered group.

Any help is greatly appreciated.

like image 456
MgSam Avatar asked Jun 12 '13 22:06

MgSam


2 Answers

To make sure I understand you correctly, it sounds like you want to ensure you carry on buffering items while only presenting each buffer when the previous buffer has been processed.

You also need to make the processing of each buffer asynchronous.

It's probably valuable to consider some theoretical points, because I have to confess that I'm a bit confused about the approach. IObservable is often said to be the dual of IEnumerable because it mirrors the latter with the key difference being that data is pushed to the consumer rather than the consumer pulling it as it chooses.

You are trying to use the buffered stream like an IEnumerable instead of an IObservable - you essentially want to pull the buffers rather than have them pushed to you - so I do have to wonder if have you picked the right tool for the job? Are you are trying to hold up the buffering operation itself while a buffer is processed? As a consumer having the data pushed at you this isn't really a correct approach.

You could consider applying a ToEnumerable() call to the buffer operation, so that you can deal we the buffers when ready. That won't prevent ongoing buffering occurring while you deal with a current buffer though.

There's little you can do to prevent this - doing the buffer processing synchronously inside a Select() operation applied to the buffer would carry a guarantee that no subsequent OnNext() call would occur until the Select() projection completed. The guarantee comes for free as the Rx library operators enforce the grammar of Rx. But it's only guaranteeing non-overlapping invocations of OnNext() - there's nothing to say a given operator couldn't (and indeed shouldn't) carry on getting the next OnNext() ready to go. That's the nature of a push based API.

It's very unclear why you think you need the projection to be asynchronous if you also want to block the Buffers? Have a think about this - I suspect using a synchronous Select() in your observer might solve the issue but it's not entirely clear from your question.

Similar to a synchronous Select() is a synchronous OnNext() handler which is easier to handle if your processing of items have no results - but it's not the same because (depending on the implementation of the Observable) you are only blocking delivery of OnNext() calls to that Subscriber rather than all Subscribers. However, with just a single Subscriber it's equivalent so you could do something like:

void Main()
{
    var source = Observable.Range(1, 4);

    source.Buffer(2)
        .Subscribe(i =>
    {
        Console.WriteLine("Start Processing Buffer");
        Task.WhenAll(from n in i select DoUpload(n)).Wait();
        Console.WriteLine("Finished Processing Buffer");
    });
}

private Task DoUpload(int i)
{
    return Task.Factory.StartNew(
        () => {
            Thread.Sleep(1000);
            Console.WriteLine("Process File " + i);
        });
}

Which outputs (*with no guarantee on the order of Process File x within a Buffer):

Start Processing Buffer
Process File 2
Process File 1
Finished Processing Buffer
Start Processing Buffer
Process File 3
Process File 4
Finished Processing Buffer

If you prefer to use a Select() and your projection returns no results, you can do it like this:

source.Buffer(2)
    .Select(i =>
{
    Console.WriteLine("Start Processing Buffer");
    Task.WhenAll(from n in i select DoUpload(n)).Wait();
    Console.WriteLine("Finished Processing Buffer");
    return Unit.Default;
}).Subscribe();

NB: Sample code written in LINQPad and including Nuget package Rx-Main. This code is for illustrative purposes - don't Thread.Sleep() in production code!

like image 119
James World Avatar answered Oct 20 '22 14:10

James World


First, I think your requirement to execute the items from each group in parallel, but each group in series is quite unusual. A more common requirement would be to to execute the items in parallel, but at most n of them at the same time. This way, there are not fixed groups, so if a single items takes too long, other items don't have to wait for it.

To do what you're asking for, I think TPL Dataflow is more suitable than Rx (though some Rx code will still be useful). TPL Dataflow is centered about “blocks” that execute stuff, by default in series, which is exactly what you need.

Your code could look like this:

public static class Extensions
{
    public static Task ExecuteInGroupsAsync<T>(
         this IEnumerable<T> source, Func<T, Task> func, int groupSize)
     {
         var block = new ActionBlock<IEnumerable<T>>(
             g => Task.WhenAll(g.Select(func)));
         source.ToObservable()
               .Buffer(groupSize)
               .Subscribe(block.AsObserver());
         return block.Completion;
     }
}

public Task ProcessFiles(IEnumerable<File> files)
{
    return files.ExecuteInGroupsAsync(Upload, 10);
}

This leaves most of the heavy lifting on the ActionBlock (and some on Rx). Dataflow blocks can act as Rx observers (and observables), so we can take advantage of that to keep using Buffer().

We want to handle the whole group at once, so we use Task.WhenAll() to create a Task that completes when the whole group completes. Dataflow blocks understand Task-returning functions, so next group won't start executing until the Task returned by the previous group completes.

The final result is the Completion Task, which will complete after the source observable completes and all processing is done.

TPL Dataflow also has BatchBlock, which works like Buffer() and we could directly Post() each item from the collection (without using ToObservable() and AsObserver()), but I think using Rx for this part of the code makes it simpler.

EDIT: Actually you don't need TPL Dataflow here at all. Using ToEnumerable() as James World suggested will be enough:

public static async Task ExecuteInGroupsAsync<T>(
     this IEnumerable<T> source, Func<T, Task> func, int groupSize)
{
    var groups = source.ToObservable().Buffer(groupSize).ToEnumerable();
    foreach (var g in groups)
    {
        await Task.WhenAll(g.Select(func));
    }
}

Or even simpler without Rx using Batch() from morelinq:

public static async Task ExecuteInGroupsAsync<T>(
    this IEnumerable<T> source, Func<T, Task> func, int groupSize)
{
    var groups = source.Batch(groupSize);
    foreach (var group in groups)
    {
        await Task.WhenAll(group.Select(func));
    }
}
like image 44
svick Avatar answered Oct 20 '22 15:10

svick