Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Is it possible to invoke subscribers's OnNexts on different threads in Rx?

I am new to Rx. I want to know if it is possible to dispatch a message to different subscribers such that they run on different thread? How can an IObserable control it? The plain Subject implementation, as I understand it calls the subscribers one after the other on a single thread.


public class Subsciber : IObserver<int>
{
    public void OnNext(int a)
    {
        // Do something
    }
    public void OnError(Exception e)
    {
        // Do something
    }
    public void OnCompeleted()
    {
    }

} 

public static class Program
{
   public void static Main()
   {
       var observable = new <....SomeClass....>();
       var sub1 = new Subscriber();
       var sub2 = new Subscriber();
       observable.Subscribe(sub1);
       observable.Subscribe(sub2);
       // some waiting function 
   }
}

If I use Subject as 'SomeClass', then sub2's OnNext() will not be called until sub1's OnNext() is completed. If sub1 is takes a lot of time, I don't want it to delay sub2's reception. Can someone tell me how Rx allows this kind of implementation for SomeClass.

like image 622
ada Avatar asked Oct 19 '11 12:10

ada


2 Answers

The code you have written is almost there to run the observable in parallel. If you write your observer as this:

public class Subscriber : IObserver<int>
{
    public void OnNext(int a)
    {
        Console.WriteLine("{0} on {1} at {2}",
            a,
            Thread.CurrentThread.ManagedThreadId,
            DateTime.Now.ToString());
    }
    public void OnError(Exception e)
    { }
    public void OnCompleted()
    { }
} 

Then running this code:

var observable =
    Observable
        .Interval(TimeSpan.FromSeconds(1.0))
        .Select(x => (int)x)
        .Take(5)
        .ObserveOn(Scheduler.ThreadPool);
var sub1 = new Subscriber();
var sub2 = new Subscriber();
observable.Subscribe(sub1);
observable.Subscribe(sub2);
Thread.Sleep(10000);

Will produce the following:

0 on 28 at 2011/10/20 00:13:49
0 on 16 at 2011/10/20 00:13:49
1 on 29 at 2011/10/20 00:13:50
1 on 22 at 2011/10/20 00:13:50
2 on 27 at 2011/10/20 00:13:51
2 on 29 at 2011/10/20 00:13:51
3 on 27 at 2011/10/20 00:13:52
3 on 19 at 2011/10/20 00:13:52
4 on 27 at 2011/10/20 00:13:53
4 on 27 at 2011/10/20 00:13:53

It's already running the subscriptions in parallel on different threads.

The important thing that I used was the .ObserveOn extension method - that's what made this work.

You should keep in mind that observers don't generally share the same instance of observables. Subscribing to an observable effectively wires up a unique "chain" of observable operators from the source of the observable to the observer. This is much the same as calling GetEnumerator twice on an enumerable, you will not share the same enumerator instance, you will get two unique instances.

Now, I want to describe what I mean by a chain. I'm going to give the Reflector.NET extracted code from Observable.Generate & Observable.Where to illustrate the point.

Take this code for example:

var xs = Observable.Generate(0, x => x < 10, x => x + 1, x => x);
var ys = xs.Where(x => x % 2 == 0);
ys.Subscribe(y => { /* produces 0, 2, 4, 6, 8 */ });

Under the hood both Generate & Where each create a new instance of the internal Rx class AnonymousObservable<T>. The constructor for AnonymousObservable<T> takes a Func<IObserver<T>, IDisposable> delegate which it uses whenever it receives a call to Subscribe.

The slightly cleaned up code for Observable.Generate<T>(...) from Reflector.NET is:

public static IObservable<TResult> Generate<TState, TResult>(
    TState initialState,
    Func<TState, bool> condition,
    Func<TState, TState> iterate,
    Func<TState, TResult> resultSelector,
    IScheduler scheduler)
{
    return new AnonymousObservable<TResult>((IObserver<TResult> observer) =>
    {
        TState state = initialState;
        bool first = true;
        return scheduler.Schedule((Action self) =>
        {
            bool flag = false;
            TResult local = default(TResult);
            try
            {
                if (first)
                {
                    first = false;
                }
                else
                {
                    state = iterate(state);
                }
                flag = condition(state);
                if (flag)
                {
                    local = resultSelector(state);
                }
            }
            catch (Exception exception)
            {
                observer.OnError(exception);
                return;
            }
            if (flag)
            {
                observer.OnNext(local);
                self();
            }
            else
            {
                observer.OnCompleted();
            }
        });
    });
}

The Action self parameter is a recursive call that iterates output values. You'll notice that nowhere in this code does the observer get stored or that the values get pasted to more than one observer. This code runs once for each new observer.

The slightly cleaned up code for Observable.Where<T>(...) from Reflector.NET is:

public static IObservable<TSource> Where<TSource>(
    this IObservable<TSource> source,
    Func<TSource, bool> predicate)
{
    return new AnonymousObservable<TSource>(observer =>
        source.Subscribe(x =>
        {
            bool flag;
            try
            {
                flag = predicate(x);
            }
            catch (Exception exception)
            {
                observer.OnError(exception);
                return;
            }
            if (flag)
            {
                observer.OnNext(x);
            }
        }, ex => observer.OnError(ex), () => observer.OnCompleted));
}

Again this code doesn't track multiple observers. It calls Subscribe effectively passing its own code as the observer to the underlying source observable.

You should see that, in my example code above, subscribing to Where creates a subscription to Generate and hence this is a chain of observables. In fact it's chaining subscribe calls on a series of AnonymousObservable objects.

If you have two subscriptions you have two chains. If you have 1,000 subscriptions you have 1,000 chains.

Now, just as a side note - even though there are IObservable<T> and IObserver<T> interfaces - you should very very rarely actually implement these in your own classes. The built-in classes and operators handle 99.99% of all cases. It's a bit like IEnumerable<T> - how often do you need to implement this interface yourself?

Let me know if this helps and if you need any further explanation.

like image 200
Enigmativity Avatar answered Oct 11 '22 18:10

Enigmativity


If you have a IObservable and you need to force the subscription to run on a different thread, then you can use the ObserveOn function.

If you run the below code, it will force the number generator to run in a different thread contexts. You can also use the EventLoopScheduler and specify the System.Thread you want to use, set priority, set name, etc...

void Main()
{
    var numbers = Observable.Interval(TimeSpan.FromMilliseconds(100));

    var disposable = new CompositeDisposable()
    {
       numbers.ObserveOn(Scheduler.TaskPool).Subscribe(x=> Console.WriteLine("TaskPool: "+ Thread.CurrentThread.ManagedThreadId)),
       numbers.ObserveOn(Scheduler.ThreadPool).Subscribe(x=> Console.WriteLine("ThreadPool: "+ Thread.CurrentThread.ManagedThreadId)),
       numbers.ObserveOn(Scheduler.Immediate).Subscribe(x=> Console.WriteLine("Immediate: "+ Thread.CurrentThread.ManagedThreadId))
    };

    Thread.Sleep(1000);
    disposable.Dispose();
}

Output

Immediate: 10
ThreadPool: 4
TaskPool: 20
TaskPool: 4
ThreadPool: 24
Immediate: 27
Immediate: 10
TaskPool: 24
ThreadPool: 27
Immediate: 24
TaskPool: 26
ThreadPool: 20
Immediate: 26
ThreadPool: 24
TaskPool: 27
Immediate: 28
ThreadPool: 27
TaskPool: 26
Immediate: 10

Note how I used the CompositeDisposable to dispose all subscriptions at the end. If you don't do this in LinqPad for example. The Observable.Interval will continue to run in memory untill you kill the process.

like image 20
WeSam Abdallah Avatar answered Oct 11 '22 19:10

WeSam Abdallah