Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Observable LINQ inconsistent exceptions thrown

In my quest to write a stock market trader IObserver I've encountered three errors mostly thrown from within the Reactive Extensions library.

I have the following CompanyInfo class:

public class CompanyInfo
{
    public string Name { get; set; }

    public double Value { get; set; }
}

And an IObservable<CompanyInfo> called StockMarket:

public class StockMarket : IObservable<CompanyInfo>

My Observer looks like the following:

public class StockTrader : IObserver<CompanyInfo>
{
    public void OnCompleted()
    {
        Console.WriteLine("Market Closed");
    }

    public void OnError(Exception error)
    {
        Console.WriteLine(error);
    }

    public void OnNext(CompanyInfo value)
    {
        WriteStock(value);
    }

    private void WriteStock(CompanyInfo value) { ... }
}

I run the following code:

StockMarket market = GetStockMarket();
StockTrader trader = new StockTrader();

IObservable<CompanyInfo> differential = market  //[F, 1], [S, 5], [S, 4], [F, 2]
    .GroupBy(x => x.Name)                       //[F, 1], [F, 2]; [S, 5], [S, 4]
    .SelectMany(x => x                  //4, 8, 2, 3
        .Buffer(2, 1)                   //(4, 8), (8, 2), (2, 3), (3)
        .SkipLast(1)                    //(4, 8), (8, 2), (2, 3)
        .Select(y => new CompanyInfo    //(+100%), (-75%), (+50%)
        {
            Name = x.Key,
            Value = (y[1].Value - y[0].Value) / y[0].Value
        })                                      //[F, +100%]; [S, -20%]
    );

using (IDisposable subscription = differential.Subscribe(trader))
{
    Observable.Wait(market);
}

One of three errors occur:

  • The following ArgumentException is thrown from within the Reactive Extensions:

    System.ArgumentException: An item with the same key has already been added. at System.ThrowHelper.ThrowArgumentException(ExceptionResource resource) at System.Collections.Generic.Dictionary`2.Insert(TKey key, TValue value, Boolean add) at System.Reactive.Linq.Observable.GroupBy'3._.OnNext(TSource value)

  • The following IndexOutOfRangeException:

    Parameter name: index at System.ThrowHelper.ThrowArgumentOutOfRangeException(ExceptionArgument argument, ExceptionResource resource) at System.Collections.Generic.List'1.get_Item(Int32 index) at StockMarketTests.<>c__DisplayClass0_0.b__2(IList'1 y) at System.Reactive.Linq.Observable.Select'2._.OnNext(TSource value)

  • The text of the Console tweaks sporadically (The color should be consistance):

Console

What could cause those bizzare symptoms?

like image 282
Tamir Vered Avatar asked Feb 01 '16 22:02

Tamir Vered


2 Answers

One of the greatest things about the concept of Reactive Extensions is the ability to subscribe to an 'occurrence' (IObservable) which happened 'somewhere' and applying object oriented concepts on this 'occurrence' - this without having to know where that 'somewhere' is.

This way Reactive Extensions simplifies the event oriented programming and producer-consumer problems a lot.

Great Power

The ability to subscribe to an IObservable without knowing the source of the observed data forces the subscriber to assume notifications are unpredictable. In other words, when observing an IObservable you should assume notifications can be delivered concurrently.

Due to a behavioural contract of Reactive Externsions, IObservables should produce one item at a time. Usually, that's what happen, but sometimes external implementations don't follow that contract.

Lets take a look on each of the three problems:

GroupBy is not thread-safe


GroupBy works by returning an IObservable<IGroupedObservable<T>>, its OnNext method calls the outer IObservable's OnNext with the IGroupedObservable<T> which matches the current notification. it does so by keeping one IGroupedObservable<T> (more accurately one Subject<T>) for each key inside a Dictionary - which is unsurprisingly - not a ConcurrentDictionary. That means two proximate notifications can cause a double insert.

Select is not alone


Select's thread-safety is determined by its provided delegate. In the case above, the delegate provided to the Select method relies on the fact that Buffer(2, 1) will provide a list with the size of 2. Buffer contains a Queue, which is not concurrent, therefore when iterated from multiple threads - Buffer's Queue can provide us some unexpected results.

another Exception which could be thrown from the same reason is NullReferenceException if y would be provided null, or an InvalidOperationException for the Queue could be modified while it's being iterated.

Even the basic observation is not safe


Last but not least, even when you do only the basic observation, the StockTrader's OnNext method is modifying the console in a non-atomic operation which causes the wierd text layout.

So what can you do?


Threads

The Synchronize method exists to make you able to validate that you are subscribing to a linear IObservable<T> which means no more than one invocation on OnNext method can occur concurrently.

Since even the GroupBy extension method is not thread-safe, the Synchronize method needs to be invoked in the beginning of the chain:

IObservable<CompanyInfo> differential = market  //[F, 1], [S, 5], [S, 4], [F, 2]
    .Synchronize()
    .GroupBy(x => x.Name)                       //[F, 1], [F, 2]; [S, 5], [S, 4]
    .SelectMany(x => x                  //4, 8, 2, 3
        .Buffer(2, 1)                   //(4, 8), (8, 2), (2, 3), (3)
        .SkipLast(1)                    //(4, 8), (8, 2), (2, 3)
        .Select(y => new CompanyInfo    //(+100%), (-75%), (+50%)
        {
            Name = x.Key,
            Value = (y[1].Value - y[0].Value) / y[0].Value
        })                                      
    );                                          //[F, +100%]; [S, -20%]

Note that Synchronize adds another proxy Observable to your query, hence it will make the query a little bit slower, therefore you should avoid using it when it is not required.

like image 81
Tamir Vered Avatar answered Oct 05 '22 23:10

Tamir Vered


The problem with your code is not with the query, nor with Rx per se. The problem is likely coming from either of your actual StockMarket or StockTrader implementations.

Now, it's likely that the issue is being caused because you are creating two subscriptions to your market observable.

When you write this:

using (IDisposable subscription = differential.Subscribe(trader))
{
    Observable.Wait(market);
}

...you are getting two subscriptions to market. One in differential.Subscribe(trader) and the other because of Observable.Wait(market);.

I suspect that two concurrent subscriptions are causing your issues, but without seeing the implementation of StockMarket we can't tell why it's throwing.

This is the danger of implementing your own observable and observer implementations. You should avoid doing that. It would be better to have a property IObservable<CompanyInfo> CompanyValues { get; } hanging off of CompanyInfo that is built using the standard Rx operators.

And you should always avoid blocking operations like .Wait(...).

As a quick test I would replace your current Observable.Wait(market); with a Thread.Sleep(?) with a sufficiently long sleep period to see if your code behaves. Of course you'll need to make sure that you are producing values on a background scheduler (like Scheduler.Default).

I ran this code to test your query:

public class CompanyInfo
{
    public string Name { get; set; }

    public double Value { get; set; }
}

public class StockTrader : IObserver<CompanyInfo>
{
    public void OnCompleted()
    {
        Console.WriteLine("Market Closed");
    }

    public void OnError(Exception error)
    {
        Console.WriteLine(error);
    }

    public void OnNext(CompanyInfo value)
    {
        WriteStock(value);
    }

    private void WriteStock(CompanyInfo value) { Console.WriteLine($"{value.Name} = {value.Value}"); }
}

public class StockMarket : IObservable<CompanyInfo>
{
    private CompanyInfo[] _values = new CompanyInfo[]
    {
        new CompanyInfo() { Name = "F", Value = 1 },
        new CompanyInfo() { Name = "S", Value = 5 },
        new CompanyInfo() { Name = "S", Value = 4 },
        new CompanyInfo() { Name = "F", Value = 2 },
    };

    public IDisposable Subscribe(IObserver<CompanyInfo> observable)
    {
        return _values.ToObservable().ObserveOn(Scheduler.Default).Subscribe(observable);
    }
}

...with this:

StockMarket market = new StockMarket();
StockTrader trader = new StockTrader();

IObservable<CompanyInfo> differential = market  //[F, 1], [S, 5], [S, 4], [F, 2]
    .GroupBy(x => x.Name)                       //[F, 1], [F, 2]; [S, 5], [S, 4]
    .SelectMany(x => x                  //4, 8, 2, 3
        .Buffer(2, 1)                   //(4, 8), (8, 2), (2, 3), (3)
        .SkipLast(1)                    //(4, 8), (8, 2), (2, 3)
        .Select(y => new CompanyInfo    //(+100%), (-75%), (+50%)
        {
            Name = x.Key,
            Value = (y[1].Value - y[0].Value) / y[0].Value
        })                                      //[F, +100%]; [S, -20%]
    );

IDisposable subscription = differential.Subscribe(trader);

Thread.Sleep(10000);

Not once did I get it to crash or cause any exceptions.

like image 27
Enigmativity Avatar answered Oct 05 '22 22:10

Enigmativity