Can someone explain why the following AsObservable
method creates an infinite loop even though the end of stream is reached?
public static class StreamExt {
public static IObservable<byte> AsObservable(this Stream stream, int bufferSize) {
return Observable
.FromAsync(cancel => stream.ReadBytes(bufferSize, cancel))
.Repeat()
.TakeWhile(bytes => bytes != null) // EndOfStream
.SelectMany(bytes => bytes);
}
private static async Task<byte[]> ReadBytes(this Stream stream, int bufferSize, CancellationToken cancel) {
var buf = new byte[bufferSize];
var bytesRead = await stream
.ReadAsync(buf, 0, bufferSize, cancel)
.ConfigureAwait(false);
if (bytesRead < 1) return null; // EndOfStream
var result_size = Math.Min(bytesRead, bufferSize);
Array.Resize(ref buf, result_size);
return buf;
}
}
A quick tests shows that it produces an infinite loop:
class Program {
static void Main(string[] args) {
using (var stream = new MemoryStream(new byte[] { 1, 2, 3 })) {
var testResult = stream
.AsObservable(1024)
.ToEnumerable()
.ToArray();
Console.WriteLine(testResult.Length);
}
}
}
Of course I could add an .SubscribeOn(TaskPoolScheduler.Default)
but however, the infinite loop stays alive (blocks a task pool scheduler + infinitely reads from Stream
).
[UPDATE 2017-05-09]
Shlomo posted a better example to reproduce this issue:
int i = 0;
var testResult = Observable.FromAsync(() => Task.FromResult(i++))
.Repeat()
.TakeWhile(l => l < 3);
testResult.Subscribe(b => Console.WriteLine(b), e => { }, () => Console.WriteLine("OnCompleted"));
Console.WriteLine("This is never printed.");
For anyone who ends up here and needs an answer, not just an explanation: the issue appears to be the default scheduler of FromAsync
, as indicated by this self-answered question. If you adjust to the "current thread" scheduler Repeat().TakeWhile(...)
behaves more predictably. E.g. (extract from question):
.FromAsync(cancel => stream.ReadBytes(bufferSize, cancel),
System.Reactive.Concurrency.Scheduler.CurrentThread)
.Repeat()
.TakeWhile(bytes => bytes != null) // EndOfStream
You can confirm the OnCompleted
is being produced correctly with this:
using (var stream = new MemoryStream(new byte[] { 1, 2, 3 }))
{
var testResult = stream
.AsObservable(1024)
;
testResult.Subscribe(b => Console.WriteLine(b), e => {}, () => Console.WriteLine("OnCompleted"));
}
It looks like there's a problem with the .FromAsync
+ .Repeat
combination. The following code acts similarly:
int i = 0;
var testResult = Observable.FromAsync(() => Task.FromResult(i++))
.Repeat()
.TakeWhile(l => l < 3)
;
testResult.Subscribe(b => Console.WriteLine(b), e => { }, () => Console.WriteLine("OnCompleted"));
Console.WriteLine("This is never printed.");
...whereas this code terminates correctly:
var testResult = Observable.Generate(0, i => true, i => i + 1, i => i)
.Repeat()
.TakeWhile(l => l < 3)
;
testResult.Subscribe(b => Console.WriteLine(b), e => { }, () => Console.WriteLine("OnCompleted"));
Console.WriteLine("This is printed.");
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With