Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Reactive Extensions seem very slow - am I doing something wrong?

I'm evaluating Rx for a trading platform project that will need to process thousands of messages a second. The existing platform has a complex event routing system (multicast delegates) that responds to these messages and performs a lot of subsequent processing.

I've looked at Reactive Extensions for the obvious benefits but noticed it's somewhat slower, usual 100 times slower.

I've created unit test to demonstrate this which runs a simple increment 1 million times, using various Rx flavours and a straight-out-the-box delegate "control" test.

Here are the results:

Delegate                                 - (1000000) - 00:00:00.0410000
Observable.Range()                       - (1000000) - 00:00:04.8760000
Subject.Subscribe() - NewThread          - (1000000) - 00:00:02.7630000
Subject.Subscribe() - CurrentThread      - (1000000) - 00:00:03.0280000
Subject.Subscribe() - Immediate          - (1000000) - 00:00:03.0030000
Subject.Subscribe() - ThreadPool         - (1000000) - 00:00:02.9800000
Subject.Subscribe() - Dispatcher         - (1000000) - 00:00:03.0360000

As you can see, all the Rx methods are ~100 times slower than a delegate equivalent. Obviously Rx is doing a lot under the covers that will be of use in a more complex example, but this just seems incredibly slow.

Is this normal or are my testing assumptions invalid? Nunit code for the above below -

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Text;
using NUnit.Framework;
using System.Concurrency;

namespace RxTests
{
    [TestFixture]
    class ReactiveExtensionsBenchmark_Tests
    {
        private int counter = 0;

        [Test]
        public void ReactiveExtensionsPerformanceComparisons()
        {
            int iterations = 1000000;

            Action<int> a = (i) => { counter++; };

            DelegateSmokeTest(iterations, a);
            ObservableRangeTest(iterations, a);
            SubjectSubscribeTest(iterations, a, Scheduler.NewThread, "NewThread");
            SubjectSubscribeTest(iterations, a, Scheduler.CurrentThread, "CurrentThread");
            SubjectSubscribeTest(iterations, a, Scheduler.Immediate, "Immediate");
            SubjectSubscribeTest(iterations, a, Scheduler.ThreadPool, "ThreadPool");
            SubjectSubscribeTest(iterations, a, Scheduler.Dispatcher, "Dispatcher");
        }

        public void ObservableRangeTest(int iterations, Action<int> action)
        {
            counter = 0;

            long start = DateTime.Now.Ticks;

            Observable.Range(0, iterations).Subscribe(action);

            OutputTestDuration("Observable.Range()", start);
        }


        public void SubjectSubscribeTest(int iterations, Action<int> action, IScheduler scheduler, string mode)
        {
            counter = 0;

            var eventSubject = new Subject<int>();
            var events = eventSubject.SubscribeOn(scheduler); //edited - thanks dtb
            events.Subscribe(action);

            long start = DateTime.Now.Ticks;

            Enumerable.Range(0, iterations).ToList().ForEach
                (
                    a => eventSubject.OnNext(1)
                );

            OutputTestDuration("Subject.Subscribe() - " + mode, start);
        }

        public void DelegateSmokeTest(int iterations, Action<int> action)
        {
            counter = 0;
            long start = DateTime.Now.Ticks;

            Enumerable.Range(0, iterations).ToList().ForEach
                (
                    a => action(1)
                );

            OutputTestDuration("Delegate", start);
        }


        /// <summary>
        /// Output helper
        /// </summary>
        /// <param name="test"></param>
        /// <param name="duration"></param>
        public void OutputTestDuration(string test, long duration)
        {
            Debug.WriteLine(string.Format("{0, -40} - ({1}) - {2}", test, counter, ElapsedDuration(duration)));
        }

        /// <summary>
        /// Test timing helper
        /// </summary>
        /// <param name="elapsedTicks"></param>
        /// <returns></returns>
        public string ElapsedDuration(long elapsedTicks)
        {
            return new TimeSpan(DateTime.Now.Ticks - elapsedTicks).ToString();
        }

    }
}
like image 754
Chris Webb Avatar asked Nov 24 '10 23:11

Chris Webb


2 Answers

My guess is that the Rx team focuses on building the functionality first and doesn't care about performance optimization yet.

Use a profiler to determine bottlenecks and replace slow Rx classes with your own optimized versions.

Below are two examples.

Results:

Delegate                                 - (1000000) - 00:00:00.0368748

Simple - NewThread                       - (1000000) - 00:00:00.0207676
Simple - CurrentThread                   - (1000000) - 00:00:00.0214599
Simple - Immediate                       - (1000000) - 00:00:00.0162026
Simple - ThreadPool                      - (1000000) - 00:00:00.0169848

FastSubject.Subscribe() - NewThread      - (1000000) - 00:00:00.0588149
FastSubject.Subscribe() - CurrentThread  - (1000000) - 00:00:00.0508842
FastSubject.Subscribe() - Immediate      - (1000000) - 00:00:00.0513911
FastSubject.Subscribe() - ThreadPool     - (1000000) - 00:00:00.0529137

First of all, it seems to matter a lot how the observable is implemented. Here's an observable that cannot be unsubscribed from, but it's fast:

private IObservable<int> CreateFastObservable(int iterations)
{
    return Observable.Create<int>(observer =>
    {
        new Thread(_ =>
        {
            for (int i = 0; i < iterations; i++)
            {
                observer.OnNext(i);
            }
            observer.OnCompleted();
        }).Start();
        return () => { };
    });
}

Test:

public void SimpleObserveTest(int iterations, Action<int> action, IScheduler scheduler, string mode)
{
    counter = 0;

    var start = Stopwatch.StartNew();

    var observable = CreateFastObservable(iterations);

    observable.SubscribeOn(scheduler).Run(action);

    OutputTestDuration("Simple - " + mode, start);
}

Subjects add a lot of overhead. Here's a subject that is stripped of much of the functionality expected from a subject, but it's fast:

class FastSubject<T> : ISubject<T>
{
    private event Action onCompleted;
    private event Action<Exception> onError;
    private event Action<T> onNext;

    public FastSubject()
    {
        onCompleted += () => { };
        onError += error => { };
        onNext += value => { };
    }

    public void OnCompleted()
    {
        this.onCompleted();
    }

    public void OnError(Exception error)
    {
        this.onError(error);
    }

    public void OnNext(T value)
    {
        this.onNext(value);
    }

    public IDisposable Subscribe(IObserver<T> observer)
    {
        this.onCompleted += observer.OnCompleted;
        this.onError += observer.OnError;
        this.onNext += observer.OnNext;

        return Disposable.Create(() =>
        {
            this.onCompleted -= observer.OnCompleted;
            this.onError -= observer.OnError;
            this.onNext -= observer.OnNext;
        });
    }
}

Test:

public void FastSubjectSubscribeTest(int iterations, Action<int> action, IScheduler scheduler, string mode)
{
    counter = 0;

    var start = Stopwatch.StartNew();

    var observable = new ConnectableObservable<int>(CreateFastObservable(iterations), new FastSubject<int>()).RefCount();

    observable.SubscribeOn(scheduler).Run(action);

    OutputTestDuration("FastSubject.Subscribe() - " + mode, start);
}
like image 159
dtb Avatar answered Oct 22 '22 02:10

dtb


Update for Rx 2.0: I took the code from the original post with (almost) the latest Linqpad beta 4.42.04 (well there's a 06, but anyway): Rx Main assemblies

... and adjusted it slightly to use the new Rx v2 scheduler syntax:

        public void ReactiveExtensionsPerformanceComparisons()
    {
        int iterations = 1000000;

        Action<int> a = (i) => { counter++; };

        DelegateSmokeTest(iterations, a);
        ObservableRangeTest(iterations, a);
        SubjectSubscribeTest(iterations, a, NewThreadScheduler.Default, "NewThread");
        SubjectSubscribeTest(iterations, a, CurrentThreadScheduler.Instance, "CurrentThread");
        SubjectSubscribeTest(iterations, a, ImmediateScheduler.Instance, "Immediate");
        SubjectSubscribeTest(iterations, a, ThreadPoolScheduler.Instance, "ThreadPool");
        // I *think* this is the same as the ThreadPool scheduler in my case
        SubjectSubscribeTest(iterations, a, DefaultScheduler.Instance, "Default");                
        // doesn't work, as LinqPad has no Dispatcher attched to the Gui thread, maybe there's a workaround; the Instance property on it is obsolete
        //SubjectSubscribeTest(iterations, a, DispatcherScheduler.Current, "ThreadPool");
    }

Note: results vary wildly, in rare cases Threadpool beats newThread, but in most cases NewThread has a slight edge above the schedulers below it in the list:

Delegate                                 - (1000000) - 00:00:00.0440025
Observable.Range()                       - (1000000) - 00:00:01.9251101
Subject.Subscribe() - NewThread          - (1000000) - 00:00:00.0400023
Subject.Subscribe() - CurrentThread      - (1000000) - 00:00:00.0530030
Subject.Subscribe() - Immediate          - (1000000) - 00:00:00.0490028
Subject.Subscribe() - ThreadPool         - (1000000) - 00:00:00.0490028
Subject.Subscribe() - Default            - (1000000) - 00:00:00.0480028

So it seems they worked pretty hard on performance..

like image 35
hko Avatar answered Oct 22 '22 03:10

hko