Hello there' I've tried out one of the 101 Rx examples:
static IEnumerable<int> GenerateAlternatingFastAndSlowEvents()
{
int i = 0;
while (true)
{
if (i > 1000)
{
yield break;
}
yield return i;
Thread.Sleep(i++ % 10 < 5 ? 500 : 1000);
}
}
private static void Main()
{
var observable = GenerateAlternatingFastAndSlowEvents().ToObservable().Timestamp();
var throttled = observable.Throttle(TimeSpan.FromMilliseconds(750));
using (throttled.Subscribe(x => Console.WriteLine("{0}: {1}", x.Value, x.Timestamp)))
{
Console.WriteLine("Press any key to unsubscribe");
Console.ReadKey();
}
Console.WriteLine("Press any key to exit");
Console.ReadKey();
}
I don't understand why the line "Press any key to unsubscribe" never shows. My understanding was subscribing is asynchronous, you subscribe and it immedietly returns. What am I missing that causes my main thread to block?
The blocking is caused by a combination of your enumerable looping on while (true)
and IEnumerable<T>.ToObservable()
extension methods defaulting to CurrentThreadScheduler
.
If you supply Scheduler.TaskPool
(or Scheduler.ThreadPool
in pre-.NET 4) to an overload of ToObservable
, you should see the behavior you're expecting (though it won't call your subscriber on the main thread, FYI).
Having said that, I think you'll find your combination of Thread.Sleep
and Throttle
will work as you expect. You're probably better off creating a custom observable that uses a scheduler to schedule your delays.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With