Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Nested Observable hangs on Wait()

In a C# console application, using System.Reactive.Linq, I'm trying to make an observable, where each item is the string result of some processing by another observable. I've created a simple repro using strings and characters. Warning, this example is completely CONTRIVED, and the point is that the nested .Wait() hangs.

class Program
{
    static void Main(string[] args)
    {
        string[] fileNames = { "file1.doxc", "file2.xlsx", "file3.pptx" };
        IObservable<string> files = fileNames.ToObservable();
        string[] extensions = files.Select(fn =>
        {
            var extension = fn.ToObservable()
            .TakeLast(4)
            .ToArray()
            .Wait(); // <<<<<<<<<<<<< HANG HERE
            return new string(extension);
        })
        .ToArray()
        .Wait();
    }
}

Again, this is not how I would find the suffix of many filenames. The question is how I can produce an Observable of strings, where the strings are computed from a completed observable.

If I pull out this code and run it alone, it completes fine.

     var extension = fn.ToObservable()
        .TakeLast(4)
        .ToArray()
        .Wait();

There is something about the nested Wait() on the async methods, which I don't understand.

How can I code the nested async observables, so I can produce a simple array of string?

Thanks

-John

like image 815
JohnKoz Avatar asked Sep 02 '17 17:09

JohnKoz


3 Answers

The reason why your code is blocking is because you are using ToObservable() without specifying a scheduler. In this case it will use the CurrentThreadScheduler.

So the files observable issues it's first OnNext() [A] (sending "file1.doxc") using the current thread. It can't continue iterating until that OnNext() returns. However, the inner fn observable also uses ToObservable() and the Wait() blocks until fn completes - it will queue the first OnNext() (sending "f") to the current thread scheduler but it will never be able to send it because now the first OnNext() [A] will never return.

Two simple fixes:

Either change the files observable like this:

IObservable<string> files = fileNames.ToObservable(NewThreadScheduler.Default);

Or, avoid the use of the inner Wait() with a SelectMany (which is definitely more idiomatic Rx):

string[] extensions = files.SelectMany(fn =>
{
    return fn.ToObservable()
             .TakeLast(4)
             .ToArray()
             .Select(x => new string(x));
})
.ToArray()
.Wait();

// display results etc.

Each approach will have quite different execution semantics - the first will run much like a nested loop, with each inner observable completing before the next outer iteration. The second will be much more interleaved since the blocking behaviour of the Wait() is removed. If you use the Spy method I wrote and attach it after both ToObservable() calls, you'll see this behaviour quite clearly.

like image 148
James World Avatar answered Nov 19 '22 23:11

James World


Wait is a blocking calling, which doesn't mix well with Rx. I'm not sure why the nested one is failing.

Assuming an async function, this works:

IObservable<string> files = fileNames.ToObservable();
string[] extensions = await files.SelectMany(async fn =>
{
    var extension = await fn.ToObservable()
    .TakeLast(4)
    .ToArray();
        return new string(extension);
})
.ToArray();
like image 39
Shlomo Avatar answered Nov 19 '22 23:11

Shlomo


James' has nailed the issue, but I would suggest that your code boils done to just doing this:

    string[] fileNames = { "file1.doxc", "file2.xlsx", "file3.pptx" };
    string[] extensions =
    (
        from fn in fileNames.ToObservable()
        from extension in fn.ToObservable().TakeLast(4).ToArray()
        select new string(extension)
    )
        .ToArray()
        .Wait();

Now, that still has a .Wait() in it. Ideally you'd do something like this:

    IDisposable subscription =
    (
        from fn in fileNames.ToObservable()
        from extension in fn.ToObservable().TakeLast(4).ToArray()
        select new string(extension)
    )
        .ToArray()
        .Subscribe(extensions =>
        {
            /* Do something with the `extensions` */
        });

You should avoid all waiting.

like image 1
Enigmativity Avatar answered Nov 20 '22 01:11

Enigmativity