Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Reactive Extensions: Split input, process, and concatenate back

Basically, I have an observable of input strings that I want to process individually and then do something with the result. If the input string contains commas (as a delimiter), I want to split the string and process each substring individually and then do something with the each sequence of strings. The snippet below illustrates a simplified version of what I am trying to do:

[Fact]
public void UniTest1()
{
    var observable = new ReplaySubject<string>();
    observable.OnNext("a,b");
    observable.OnNext("c,d,e");
    observable.OnCompleted();

    var result = new List<string[]>();
    observable
        .SelectMany(x => x.Split(','))
        .Select(x => x.ToUpper())
        .ToArray() // How to collect an IEnumerable for each item here?
        .Do(s => result.Add(s))
        .Subscribe();

    // Here, result is actually {{"A","B","C","D","E"}}, I need {{"A","B"},{"C","D","E"}}
    Assert.Equal(2, result.Count);

    Assert.Equal("A", result[0][0]);
    Assert.Equal("B", result[0][1]);

    Assert.Equal("C", result[1][0]);
    Assert.Equal("D", result[1][1]);
    Assert.Equal("E", result[1][2]);
}

As explained in the comment, the above does NOT work. The .ToArray()-call concatenates the entire observable into a single sequence.

However, I have solved this by putting the splitting and processing into a single action, as such:

[Fact]
public void UniTest2()
{
    var observable = new ReplaySubject<string>();
    observable.OnNext("a,b");
    observable.OnNext("c,d,e");
    observable.OnCompleted();

    var result = new List<string[]>();
    observable
        .Select(x => x.Split(',').Select(s => s.ToUpper()).ToArray())
        .Do(s => result.Add(s))
        .Subscribe();

    // Result is as expected: {{"A","B"},{"C","D","E"}}
    Assert.Equal(2, result.Count);
    Assert.Equal("A", result[0][0]);
    Assert.Equal("B", result[0][1]);
    Assert.Equal("C", result[1][0]);
    Assert.Equal("D", result[1][1]);
    Assert.Equal("E", result[1][2]);
}

But is there a way, using Rx, to solve this problem by NOT putting the splitting and processing in the same action? What is the recommended solution for this problem?

I should also mention that the processing, i.e. the ToUpper()-call, is in reality a web-service call. I used ToUpper() in my examples so that my problem should be easy to explain. But that means I want this processing to be done in parallel and non-blocking.

like image 975
lenkan Avatar asked Oct 19 '22 16:10

lenkan


1 Answers

There are a number of things that you've ended up raising in you code that are worth mentioning.

To start with, the .ToArray() operator takes an observable that returns zero or more single values and changes it to an observable that returns a single array of zero or more values. Such an observable must complete before it can return its one and only value.

With this in mind the results of the first query should make sense.

Your second query with the x.Split(',').Select(s => s.ToUpper()).ToArray() produces the output that you wanted, but you wanted to know "is there a way, using RX, to solve this problem by NOT putting the splitting and processing in the same action".

Well, trivially, yes:

var result = new List<string[]>();
observable
    .Select(x => x.Split(','))
    .Select(x => x.Select(s => s.ToUpper()))
    .Select(x => x.ToArray())
    .Do(s => result.Add(s))
    .Subscribe();

However, this doesn't process the items in parallel. Rx is designed to work in series unless you invoke an operation that introduces parallelism.

Often, an easy way to do that is to take a long-running select, such as .Select(x => longRunningOperation(x)) and do this with it:

.SelectMany(x => Observable.Start(() => longRunningOperation(x)))

In your case you could begin by doing this:

observable
    .ObserveOn(Scheduler.Default)
    .SelectMany(x => Observable.Start(() => x.Split(',')))
    .SelectMany(x => Observable.Start(() => x.Select(s => s.ToUpper())))
    .SelectMany(x => Observable.Start(() => x.ToArray()))
    .Do(s => result.Add(s))
    .Subscribe();

But that is only parallelizing each original .OnNext call, not the processing within. To do that you need to turn the result of the x.Split(',') into an observable, and process that in parallel.

    observable
        .SelectMany(x => Observable.Start(() => x.Split(',').ToObservable()))
        .SelectMany(x => Observable.Start(() => x.SelectMany(s => Observable.Start(() => s.ToUpper()))))
        .SelectMany(x => Observable.Start(() => x.ToArray()))
        .Do(s => s.Do(t => result.Add(t)))
        .Merge()
        .Subscribe();

But that's starting to look crazy and it no longer runs on the current thread, meaning that your test isn't going to wait for the results.

Let's relook at this query.

I've begun by getting rid of the .Do call. These are generally good for debugging, but for any state changes they are bad. They can run at any point on any thread within a query so you need to make sure that your code in a .Do call is thread-safe and calling result.Add(s) is NOT thread-safe.

I've also introduced a "webservice" call to replace .ToUpper() with a one second processing delay so that we can see how long the query takes to process and thus know if it is running in parallel or not. If the final query takes 5 seconds to run then no parallism and if it's less then we're winning.

So, if I write the query in the most basic way it looks like this:

Func<string, string> webservice = x =>
{
    Thread.Sleep(1000);
    return x.ToUpper();
};

var query =
    observable
        .Select(ls =>
            from p in ls.Split(',')
            select webservice(p))
        .Select(rs => rs.ToArray())
        .ToArray()
        .Select(rss => new List<string[]>(rss));

var sw = Stopwatch.StartNew();
List<string[]> result = query.Wait();
sw.Stop();

When I run this I get the results as expected {{"A","B"},{"C","D","E"}}, but it takes just over 5 seconds to complete. No parallelism here as expected.

Let's now introduce some parallelism:

var query =
    observable
        .Select(ls =>
            from p in ls.Split(',').ToObservable()
            from r in Observable.Start(() => webservice(p))
            select r)
        .Select(rs => rs.ToArray())
        .Merge()
        .ToArray()
        .Select(rss => new List<string[]>(rss));

I've basically applied the "Select to SelectMany/Start" pattern that I described above. The only tricky part was that the .Select(rs => rs.ToArray()) went from being an IObservable<string[]> to a IObservable<IObservable<string[]>> so I popped in the .Merge() to flatten it back out. This is normal when you introduce parallelism into Rx queries.

Now when I run the query - BOOM - just over one second. All five of the inputs are running in parallel. The only problem now is that the order is no longer determinant. But you can't help that when the results are performed in parallel.

One such run I got this result:

results

If I were running this as a test I would sort the results into a known order and compare that to the expected result.

like image 93
Enigmativity Avatar answered Oct 21 '22 05:10

Enigmativity