Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Fast Repeat TakeWhile causes infinite loop

How can I make the following observable repeat until stream.DataAvailable is false? Currently it looks like it never stops.

AsyncReadChunk and Observable.Return inside the Defer section make OnNext call then OnCompleted call. When Repeat receives the OnNext call it passes it to TakeWhile. When TakeWhile's is not satisfied it completes the observable but I think the OnCompleted that comes right after the OnNext is so fast that it makes Repeat to re-subscribes to the observable and causes the infinite loop.

How can I correct this behaviour?

public static IObservable<byte[]> AsyncRead(this NetworkStream stream, int bufferSize)
{
    return Observable.Defer(() =>
        {
            try
            {
                return stream.DataAvailable ? AsyncReadChunk(stream, bufferSize) : Observable.Return(new byte[0]);
            }
            catch (Exception)
            {
                return Observable.Return(new byte[0]);
            }
        })
        .Repeat()
        .TakeWhile((dataChunk, index) => dataChunk.Length > 0);
}
like image 415
Samet S. Avatar asked Nov 27 '11 07:11

Samet S.


1 Answers


SELF ANSWER: (Below is an answer posted by Samet, the author of the question. However, he posted the answer as part of the question. I'm moving it into a separate answer, marking as community wiki, since the author hasn't moved it himself.)


I discovered by refactoring that it is a problem with schedulers. The Return uses Immediate scheduler while Repeat uses CurrentThread. The fixed code is below.

    public static IObservable<byte[]> AsyncRead(this NetworkStream stream, int bufferSize)
    {
        return Observable.Defer(() =>
                                    {
                                        try
                                        {
                                            return stream.DataAvailable ? AsyncReadChunk(stream, bufferSize) : Observable.Return(new byte[0], Scheduler.CurrentThread);
                                        }
                                        catch (Exception)
                                        {
                                            return Observable.Return(new byte[0], Scheduler.CurrentThread);
                                        }
                                    })
            .Repeat()
            .TakeWhile((dataChunk, index) => dataChunk.Length > 0);
    }
like image 191
Judah Gabriel Himango Avatar answered Nov 05 '22 12:11

Judah Gabriel Himango