Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Combination of Observable.FromAsync+Repeat+TakeWhile creates infinite loop

Can someone explain why the following AsObservable method creates an infinite loop even though the end of stream is reached?

public static class StreamExt {
    public static IObservable<byte> AsObservable(this Stream stream, int bufferSize) {
        return Observable
            .FromAsync(cancel => stream.ReadBytes(bufferSize, cancel))
            .Repeat()
            .TakeWhile(bytes => bytes != null) // EndOfStream
            .SelectMany(bytes => bytes);
    }

    private static async Task<byte[]> ReadBytes(this Stream stream, int bufferSize, CancellationToken cancel) {
        var buf = new byte[bufferSize];
        var bytesRead = await stream
            .ReadAsync(buf, 0, bufferSize, cancel)
            .ConfigureAwait(false);

        if (bytesRead < 1) return null; // EndOfStream
        var result_size = Math.Min(bytesRead, bufferSize);
        Array.Resize(ref buf, result_size);
        return buf;
    }
}

A quick tests shows that it produces an infinite loop:

class Program {
    static void Main(string[] args) {
        using (var stream = new MemoryStream(new byte[] { 1, 2, 3 })) {
            var testResult = stream
                .AsObservable(1024)
                .ToEnumerable()
                .ToArray();
            Console.WriteLine(testResult.Length);
        }
    }
}

Of course I could add an .SubscribeOn(TaskPoolScheduler.Default) but however, the infinite loop stays alive (blocks a task pool scheduler + infinitely reads from Stream).

[UPDATE 2017-05-09]

Shlomo posted a better example to reproduce this issue:

int i = 0;
var testResult = Observable.FromAsync(() => Task.FromResult(i++))
    .Repeat()
    .TakeWhile(l => l < 3);
testResult.Subscribe(b => Console.WriteLine(b), e => { }, () => Console.WriteLine("OnCompleted"));
Console.WriteLine("This is never printed.");
like image 279
Daniel Müller Avatar asked May 09 '17 08:05

Daniel Müller


Video Answer


2 Answers

For anyone who ends up here and needs an answer, not just an explanation: the issue appears to be the default scheduler of FromAsync, as indicated by this self-answered question. If you adjust to the "current thread" scheduler Repeat().TakeWhile(...) behaves more predictably. E.g. (extract from question):

.FromAsync(cancel => stream.ReadBytes(bufferSize, cancel), 
    System.Reactive.Concurrency.Scheduler.CurrentThread)
.Repeat()
.TakeWhile(bytes => bytes != null) // EndOfStream
like image 147
Marc L. Avatar answered Sep 21 '22 18:09

Marc L.


You can confirm the OnCompleted is being produced correctly with this:

using (var stream = new MemoryStream(new byte[] { 1, 2, 3 }))
{
    var testResult = stream
        .AsObservable(1024)
        ;
    testResult.Subscribe(b => Console.WriteLine(b), e => {}, () => Console.WriteLine("OnCompleted"));
}

It looks like there's a problem with the .FromAsync + .Repeat combination. The following code acts similarly:

int i = 0;
var testResult = Observable.FromAsync(() => Task.FromResult(i++))
    .Repeat()
    .TakeWhile(l => l < 3)
    ;
testResult.Subscribe(b => Console.WriteLine(b), e => { }, () => Console.WriteLine("OnCompleted"));
Console.WriteLine("This is never printed.");

...whereas this code terminates correctly:

var testResult = Observable.Generate(0, i => true, i => i + 1, i => i)
    .Repeat()
    .TakeWhile(l => l < 3)
    ;
testResult.Subscribe(b => Console.WriteLine(b), e => { }, () => Console.WriteLine("OnCompleted"));
Console.WriteLine("This is printed.");
like image 27
Shlomo Avatar answered Sep 21 '22 18:09

Shlomo