Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How can I see what my reactive extensions query is doing?

Tags:

I'm writing a complex Reactive Extensions query with lots of operators. How can I see what's going on?

I'm asking and answering this as it comes up a fair bit and is probably of good general use.

like image 979
James World Avatar asked Nov 26 '13 15:11

James World


1 Answers

You can append this function liberally to your Rx operators while you are developing them to see what's happening:

    public static IObservable<T> Spy<T>(this IObservable<T> source, string opName = null)     {         opName = opName ?? "IObservable";         Console.WriteLine("{0}: Observable obtained on Thread: {1}",                           opName,                           Thread.CurrentThread.ManagedThreadId);          return Observable.Create<T>(obs =>         {             Console.WriteLine("{0}: Subscribed to on Thread: {1}",                               opName,                               Thread.CurrentThread.ManagedThreadId);              try             {                 var subscription = source                     .Do(x => Console.WriteLine("{0}: OnNext({1}) on Thread: {2}",                                                 opName,                                                 x,                                                 Thread.CurrentThread.ManagedThreadId),                         ex => Console.WriteLine("{0}: OnError({1}) on Thread: {2}",                                                  opName,                                                  ex,                                                  Thread.CurrentThread.ManagedThreadId),                         () => Console.WriteLine("{0}: OnCompleted() on Thread: {1}",                                                  opName,                                                  Thread.CurrentThread.ManagedThreadId)                     )                     .Subscribe(obs);                 return new CompositeDisposable(                     subscription,                     Disposable.Create(() => Console.WriteLine(                           "{0}: Cleaned up on Thread: {1}",                           opName,                           Thread.CurrentThread.ManagedThreadId)));             }             finally             {                 Console.WriteLine("{0}: Subscription completed.", opName);             }         });     } 

Here's an example usage, shows a subtle behaviour difference of Range:

Observable.Range(0, 1).Spy("Range").Subscribe(); 

Gives the output:

Range: Observable obtained on Thread: 7 Range: Subscribed to on Thread: 7 Range: Subscription completed. Range: OnNext(0) on Thread: 7 Range: OnCompleted() on Thread: 7 Range: Cleaned up on Thread: 7 

But this:

Observable.Range(0, 1, Scheduler.Immediate).Spy("Range").Subscribe(); 

Gives the output:

Range: Observable obtained on Thread: 7 Range: Subscribed to on Thread: 7 Range: OnNext(0) on Thread: 7 Range: OnCompleted() on Thread: 7 Range: Subscription completed. Range: Cleaned up on Thread: 7 

Spot the difference?

Obviously you can alter this to write to logs or to Debug, or use preprocessor directives to do a lean pass-through subscription on a Release build etc...

You can apply Spy throughout a chain of operators. e.g.:

Observable.Range(0,3).Spy("Range")           .Scan((acc, i) => acc + i).Spy("Scan").Subscribe(); 

Gives the output:

Range: Observable obtained on Thread: 7 Scan: Observable obtained on Thread: 7 Scan: Subscribed to on Thread: 7 Range: Subscribed to on Thread: 7 Range: Subscription completed. Scan: Subscription completed. Range: OnNext(1) on Thread: 7 Scan: OnNext(1) on Thread: 7 Range: OnNext(2) on Thread: 7 Scan: OnNext(3) on Thread: 7 Range: OnCompleted() on Thread: 7 Scan: OnCompleted() on Thread: 7 Range: Cleaned up on Thread: 7 Scan: Cleaned up on Thread: 7 

I'm sure you can find ways of enriching this to suit your purposes.

like image 154
James World Avatar answered Oct 19 '22 07:10

James World