Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to limit consuming sequence with Reactive?

We have an application, wherein we have a materialized array of items which we are going to process through a Reactive pipeline. It looks a little like this

EventLoopScheduler eventLoop = new EventLoopScheduler();
IScheduler concurrency = new TaskPoolScheduler(
    new TaskFactory(
        new LimitedConcurrencyLevelTaskScheduler(threadCount)));
IEnumerable<int> numbers = Enumerable.Range(1, itemCount);

// 1. transform on single thread
IConnectableObservable<byte[]> source = 
    numbers.Select(Transform).ToObservable(eventLoop).Publish();

// 2. naive parallelization, restricts parallelization to Work 
// only; chunk up sequence into smaller sequences and process
// in parallel, merging results
IObservable<int> final = source.
    Buffer(10).
    Select(
        batch =>
        batch.
        ToObservable(concurrency).
        Buffer(10).
        Select(
            concurrentBatch =>
            concurrentBatch.
            Select(Work).
            ToArray().
            ToObservable(eventLoop)).
        Merge()).
    Merge();

final.Subscribe();

source.Connect();
Await(final).Wait();

If you are really curious to play with this, the stand-in methods look like

private async static Task Await(IObservable<int> final)
{
    await final.LastOrDefaultAsync();
}

private static byte[] Transform(int number)
{
    if (number == itemCount)
    {
        Console.WriteLine("numbers exhausted.");
    }
    byte[] buffer = new byte[1000000];
    Buffer.BlockCopy(bloat, 0, buffer, 0, bloat.Length);
    return buffer;
}

private static int Work(byte[] buffer)
{
    Console.WriteLine("t {0}.", Thread.CurrentThread.ManagedThreadId);
    Thread.Sleep(50);
    return 1;
}

A little explanation. Range(1, itemCount) simulates raw inputs, materialized from a data-source. Transform simulates an enrichment process each input must go through, and results in a larger memory footprint. Work is a "lengthy" process which operates on the transformed input.

Ideally, we want to minimize the number of transformed inputs held concurrently by the system, while maximizing throughput by parallelizing Work. The number of transformed inputs in memory should be batch size (10 above) times concurrent work threads (threadCount).

So for 5 threads, we should retain 50 Transform items at any given time; and if, as here, the transform is a 1MB byte buffer, then we would expect memory consumption to be at about 50MB throughout the run.

What I find is quite different. Namely that Reactive is eagerly consuming all numbers, and Transform them up front (as evidenced by numbers exhausted. message), resulting in a massive memory spike up front (@1GB for 1000 itemCount).

My basic question is: Is there a way to achieve what I need (ie minimized consumption, throttled by multi-threaded batching)?

UPDATE: sorry for reversal James; at first, i did not think paulpdaniels and Enigmativity's composition of Work(Transform) applied (this has to do with the nature of our actual implementation, which is more complex than the simple scenario provided above), however, after some further experimentation, i may be able to apply the same principles: ie defer Transform until batch executes.

like image 701
johnny g Avatar asked Dec 06 '22 21:12

johnny g


1 Answers

You have made a couple of mistakes with your code that throws off all of your conclusions.

First up, you've done this:

IEnumerable<int> numbers = Enumerable.Range(1, itemCount);

You've used Enumerable.Range which means that when you call numbers.Select(Transform) you are going to burn through all of the numbers as fast as a single thread can take it. Rx hasn't even had a chance to do any work because up till this point your pipeline is entirely enumerable.

The next issue is in your subscriptions:

final.Subscribe();

source.Connect();
Await(final).Wait();

Because you call final.Subscribe() & Await(final).Wait(); you are creating two separate subscriptions to the final observable.

Since there is a source.Connect() in the middle the second subscription may be missing out on values.

So, let's try to remove all of the cruft that's going on here and see if we can work things out.

If you go down to this:

IObservable<int> final =
    Observable
        .Range(1, itemCount)
        .Select(n => Transform(n))
        .Select(bs => Work(bs));

Things work well. The numbers get exhausted right at the end, and processing 20 items on my machine takes about 1 second.

But this is processing everything in sequence. And the Work step provides back-pressure on Transform to slow down the speed at which it consumes the numbers.

Let's add concurrency.

IObservable<int> final =
    Observable
        .Range(1, itemCount)
        .Select(n => Transform(n))
        .SelectMany(bs => Observable.Start(() => Work(bs)));

This processes 20 items in 0.284 seconds, and the numbers exhaust themselves after 5 items are processed. There is no longer any back-pressure on the numbers. Basically the scheduler is handing all of the work to the Observable.Start so it is ready for the next number immediately.

Let's reduce the concurrency.

IObservable<int> final =
    Observable
        .Range(1, itemCount)
        .Select(n => Transform(n))
        .SelectMany(bs => Observable.Start(() => Work(bs), concurrency));

Now the 20 items get processed in 0.5 seconds. Only two get processed before the numbers are exhausted. This makes sense as we've limited concurrency to two threads. But still there's no back pressure on the consumption of the numbers so they get chewed up pretty quickly.

Having said all of this, I tried to construct a query with the appropriate back pressure, but I couldn't find a way. The crux comes down to the fact that Transform(...) performs far faster than Work(...) so it completes far more quickly.

So then the obvious move for me was this:

IObservable<int> final =
    Observable
        .Range(1, itemCount)
        .SelectMany(n => Observable.Start(() => Work(Transform(n)), concurrency));

This doesn't complete the numbers until the end, and it limits processing to two threads. It appears to do the right thing for what you want, except that I've had to do Work(Transform(...)) together.

like image 178
Enigmativity Avatar answered Dec 08 '22 11:12

Enigmativity