In a C# console application, using System.Reactive.Linq, I'm trying to make an observable, where each item is the string result of some processing by another observable. I've created a simple repro using strings and characters. Warning, this example is completely CONTRIVED, and the point is that the nested .Wait() hangs.
class Program
{
static void Main(string[] args)
{
string[] fileNames = { "file1.doxc", "file2.xlsx", "file3.pptx" };
IObservable<string> files = fileNames.ToObservable();
string[] extensions = files.Select(fn =>
{
var extension = fn.ToObservable()
.TakeLast(4)
.ToArray()
.Wait(); // <<<<<<<<<<<<< HANG HERE
return new string(extension);
})
.ToArray()
.Wait();
}
}
Again, this is not how I would find the suffix of many filenames. The question is how I can produce an Observable of strings, where the strings are computed from a completed observable.
If I pull out this code and run it alone, it completes fine.
var extension = fn.ToObservable()
.TakeLast(4)
.ToArray()
.Wait();
There is something about the nested Wait() on the async methods, which I don't understand.
How can I code the nested async observables, so I can produce a simple array of string?
Thanks
-John
The reason why your code is blocking is because you are using ToObservable()
without specifying a scheduler. In this case it will use the CurrentThreadScheduler
.
So the files
observable issues it's first OnNext()
[A] (sending "file1.doxc"
) using the current thread. It can't continue iterating until that OnNext()
returns. However, the inner fn
observable also uses ToObservable()
and the Wait()
blocks until fn
completes - it will queue the first OnNext()
(sending "f"
) to the current thread scheduler but it will never be able to send it because now the first OnNext()
[A] will never return.
Two simple fixes:
Either change the files
observable like this:
IObservable<string> files = fileNames.ToObservable(NewThreadScheduler.Default);
Or, avoid the use of the inner Wait()
with a SelectMany
(which is definitely more idiomatic Rx):
string[] extensions = files.SelectMany(fn =>
{
return fn.ToObservable()
.TakeLast(4)
.ToArray()
.Select(x => new string(x));
})
.ToArray()
.Wait();
// display results etc.
Each approach will have quite different execution semantics - the first will run much like a nested loop, with each inner observable completing before the next outer iteration. The second will be much more interleaved since the blocking behaviour of the Wait()
is removed. If you use the Spy method I wrote and attach it after both ToObservable()
calls, you'll see this behaviour quite clearly.
Wait
is a blocking calling, which doesn't mix well with Rx. I'm not sure why the nested one is failing.
Assuming an async function, this works:
IObservable<string> files = fileNames.ToObservable();
string[] extensions = await files.SelectMany(async fn =>
{
var extension = await fn.ToObservable()
.TakeLast(4)
.ToArray();
return new string(extension);
})
.ToArray();
James' has nailed the issue, but I would suggest that your code boils done to just doing this:
string[] fileNames = { "file1.doxc", "file2.xlsx", "file3.pptx" };
string[] extensions =
(
from fn in fileNames.ToObservable()
from extension in fn.ToObservable().TakeLast(4).ToArray()
select new string(extension)
)
.ToArray()
.Wait();
Now, that still has a .Wait()
in it. Ideally you'd do something like this:
IDisposable subscription =
(
from fn in fileNames.ToObservable()
from extension in fn.ToObservable().TakeLast(4).ToArray()
select new string(extension)
)
.ToArray()
.Subscribe(extensions =>
{
/* Do something with the `extensions` */
});
You should avoid all waiting.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With