Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Why does Rx Observable.Subscribe block my thread?

Hello there' I've tried out one of the 101 Rx examples:

    static IEnumerable<int> GenerateAlternatingFastAndSlowEvents()
    {
        int i = 0;

        while (true)
        {
            if (i > 1000)
            {
                yield break;
            }
            yield return i;
            Thread.Sleep(i++ % 10 < 5 ? 500 : 1000);
        }
    }

    private static void Main()
    {
        var observable = GenerateAlternatingFastAndSlowEvents().ToObservable().Timestamp();
        var throttled = observable.Throttle(TimeSpan.FromMilliseconds(750));

        using (throttled.Subscribe(x => Console.WriteLine("{0}: {1}", x.Value, x.Timestamp)))
        {
            Console.WriteLine("Press any key to unsubscribe");
            Console.ReadKey();
        }

        Console.WriteLine("Press any key to exit");
        Console.ReadKey();
    }

I don't understand why the line "Press any key to unsubscribe" never shows. My understanding was subscribing is asynchronous, you subscribe and it immedietly returns. What am I missing that causes my main thread to block?

like image 461
Eldar Avatar asked Jan 21 '23 09:01

Eldar


1 Answers

The blocking is caused by a combination of your enumerable looping on while (true) and IEnumerable<T>.ToObservable() extension methods defaulting to CurrentThreadScheduler.

If you supply Scheduler.TaskPool (or Scheduler.ThreadPool in pre-.NET 4) to an overload of ToObservable, you should see the behavior you're expecting (though it won't call your subscriber on the main thread, FYI).

Having said that, I think you'll find your combination of Thread.Sleep and Throttle will work as you expect. You're probably better off creating a custom observable that uses a scheduler to schedule your delays.

like image 128
Richard Szalay Avatar answered Jan 22 '23 23:01

Richard Szalay