Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Can an observer safely listen on multiple observables with Rx?

I am trying to see how multiple observable events can be streamed into a single set of events with Rx. But I get an exception when I run the below code. So does this mean that multiple observers are always prone to exception because of violation of Rx grammar? I mean if two of those multiple observers generate an event at the same time by chance(any two observables will have some probability of generating at the same time), it should give an exception.

DateTimeOffset start;
        object sync = new object();
        var subject = new Subject<long>();
        var observer = Observer.Create<long>(c =>
        {
            lock (sync)
            {
                Console.WriteLine(c);
            }
        })
            ;

        var observable1 = Observable.Interval(TimeSpan.FromSeconds(2));
        var observable2 = Observable.Interval(TimeSpan.FromSeconds(5));
        var observable3 = Observable.Never<long>().Timeout
            (start = DateTimeOffset.Now.AddSeconds(15),
             (new long[] { 1 }).ToObservable());
        var observable4 = Observable.Never<long>().Timeout(start);
        observable1.Subscribe(observer);
        observable2.Subscribe(observer);
        observable3.Subscribe(observer);
        observable4.Subscribe(observer);
        Thread.Sleep(20000);

Thanks Gideon for the explanation. This is the exception that I am getting. You are right that it is a timeoutexception. This was a coding mistake. Thanks.

System.TimeoutException: The operation has timed out.
   at System.Reactive.Observer.<Create>b__8[T](Exception e)
   at System.Reactive.AnonymousObserver`1.Error(Exception exception)
   at System.Reactive.AbstractObserver`1.OnError(Exception error)
   at System.Reactive.Subjects.Subject`1.OnError(Exception error)
   at System.Reactive.AnonymousObservable`1.AutoDetachObserver.Error(Exception e
xception)
   at System.Reactive.AbstractObserver`1.OnError(Exception error)
   at System.Reactive.AnonymousObservable`1.AutoDetachObserver.Error(Exception e
xception)
   at System.Reactive.AbstractObserver`1.OnError(Exception error)
   at System.Reactive.Linq.Observable.<>c__DisplayClass28c`1.<>c__DisplayClass28
e.<Throw>b__28b()
   at System.Reactive.Concurrency.Scheduler.Invoke(IScheduler scheduler, Action
action)
   at System.Reactive.Concurrency.ImmediateScheduler.Schedule[TState](TState sta
te, Func`3 action)
   at System.Reactive.Concurrency.Scheduler.Schedule(IScheduler scheduler, Actio
n action)
   at System.Reactive.Linq.Observable.<>c__DisplayClass28c`1.<Throw>b__28a(IObse
rver`1 observer)
   at System.Reactive.AnonymousObservable`1.<>c__DisplayClass1.<Subscribe>b__0()

   at System.Reactive.Concurrency.Scheduler.Invoke(IScheduler scheduler, Action
action)
   at System.Reactive.Concurrency.ScheduledItem`2.InvokeCore()
   at System.Reactive.Concurrency.ScheduledItem`1.Invoke()
   at System.Reactive.Concurrency.CurrentThreadScheduler.Trampoline.Run()
   at System.Reactive.Concurrency.CurrentThreadScheduler.Schedule[TState](TState
 state, TimeSpan dueTime, Func`3 action)
   at System.Reactive.Concurrency.CurrentThreadScheduler.Schedule[TState](TState
 state, Func`3 action)
   at System.Reactive.Concurrency.Scheduler.Schedule(IScheduler scheduler, Actio
n action)
   at System.Reactive.AnonymousObservable`1.Subscribe(IObserver`1 observer)
   at System.Reactive.Linq.Observable.<>c__DisplayClass543`1.<>c__DisplayClass54
5.<Timeout>b__53f()
   at System.Reactive.Concurrency.Scheduler.Invoke(IScheduler scheduler, Action
action)
   at System.Reactive.Concurrency.ThreadPoolScheduler.<>c__DisplayClass8`1.<Sche
dule>b__6(Object _)
   at System.Threading._TimerCallback.TimerCallback_Context(Object state)
   at System.Threading.ExecutionContext.Run(ExecutionContext executionContext, C
ontextCallback callback, Object state, Boolean ignoreSyncCtx)
   at System.Threading._TimerCallback.PerformTimerCallback(Object state)
like image 732
ada Avatar asked Dec 13 '11 17:12

ada


2 Answers

Yes, an observer can listen to multiple observables. The best example for this would be the Merge operator. The built-in operators will all follow the RX grammar and will often enforce it on sources that do not.

The IObserver you get from Observer.Create is one such case. It will ignore any future calls to OnNext once OnError or OnCompleted is called. This does mean that using the same observer to subscribe to one observable and then another observable after the first one will not work because the termination message from the first observable will cause the observer to ignore messages from the second observable. To get around this, operators like Merge, Concat, and OnErrorResumeNext (among others) use multiple observers internally and do not pass completion messages (OnError and/or OnCompleted depending on the semantics of the operator) from any but the last observable to the outer observer.

You did not mention what exception you are getting, but I would guess that it is the error that comes from the Timeout you are getting from observable4. If you do not provide another observable to use for a timeout, the OnError of the observer is called, and the default OnError for the Subscribe and Observer.Create overloads that do not take an error handler is to simply throw the exception.

While this is clearly example/testing code, I do want to point out that even though you are no longer getting messages passed to your OnNext, all the other observables keep running after this exception. Either use Merge to track this for you or keep track of all the disposables from the description and dispose of them yourself when a completion message comes through. CompositeDisposable (in System.Reactive.Disposables) is good for this.

like image 77
Gideon Engelberth Avatar answered Nov 14 '22 22:11

Gideon Engelberth


You really shouldn't be using locks here, but if you really want this to work, you can do:

var x = Observable.Create<T>(subj => { /* Fill it in*/ })
    .Multicast(new Subject<T>());

// Set up your subscriptions Here!

// When you call the Connect, whatever is in the Observable.Create will be called
x.Connect();

If you wanted to be even more safe, you could make it so that the result of the Create will be "replayed" to you for future subscriptions, by using a ReplaySubject instead of Subject (whereas with a Subject, subscribers after the Connect will get nothing)

like image 42
Ana Betts Avatar answered Nov 14 '22 22:11

Ana Betts