In my quest to write a stock market trader IObserver
I've encountered three errors mostly thrown from within the Reactive Extensions
library.
I have the following CompanyInfo
class:
public class CompanyInfo
{
public string Name { get; set; }
public double Value { get; set; }
}
And an IObservable<CompanyInfo>
called StockMarket
:
public class StockMarket : IObservable<CompanyInfo>
My Observer
looks like the following:
public class StockTrader : IObserver<CompanyInfo>
{
public void OnCompleted()
{
Console.WriteLine("Market Closed");
}
public void OnError(Exception error)
{
Console.WriteLine(error);
}
public void OnNext(CompanyInfo value)
{
WriteStock(value);
}
private void WriteStock(CompanyInfo value) { ... }
}
I run the following code:
StockMarket market = GetStockMarket();
StockTrader trader = new StockTrader();
IObservable<CompanyInfo> differential = market //[F, 1], [S, 5], [S, 4], [F, 2]
.GroupBy(x => x.Name) //[F, 1], [F, 2]; [S, 5], [S, 4]
.SelectMany(x => x //4, 8, 2, 3
.Buffer(2, 1) //(4, 8), (8, 2), (2, 3), (3)
.SkipLast(1) //(4, 8), (8, 2), (2, 3)
.Select(y => new CompanyInfo //(+100%), (-75%), (+50%)
{
Name = x.Key,
Value = (y[1].Value - y[0].Value) / y[0].Value
}) //[F, +100%]; [S, -20%]
);
using (IDisposable subscription = differential.Subscribe(trader))
{
Observable.Wait(market);
}
One of three errors occur:
The following ArgumentException
is thrown from within the Reactive Extensions
:
System.ArgumentException: An item with the same key has already been added. at System.ThrowHelper.ThrowArgumentException(ExceptionResource resource) at System.Collections.Generic.Dictionary`2.Insert(TKey key, TValue value, Boolean add) at System.Reactive.Linq.Observable.GroupBy'3._.OnNext(TSource value)
The following IndexOutOfRangeException
:
Parameter name: index at System.ThrowHelper.ThrowArgumentOutOfRangeException(ExceptionArgument argument, ExceptionResource resource) at System.Collections.Generic.List'1.get_Item(Int32 index) at StockMarketTests.<>c__DisplayClass0_0.b__2(IList'1 y) at System.Reactive.Linq.Observable.Select'2._.OnNext(TSource value)
The text of the Console
tweaks sporadically (The color should be consistance):
What could cause those bizzare symptoms?
One of the greatest things about the concept of Reactive Extensions
is the ability to subscribe to an 'occurrence' (IObservable
) which happened 'somewhere' and applying object oriented concepts on this 'occurrence' - this without having to know where that 'somewhere' is.
This way Reactive Extensions
simplifies the event
oriented programming and producer-consumer
problems a lot.
The ability to subscribe to an IObservable
without knowing the source of the observed data forces the subscriber to assume notifications are unpredictable. In other words, when observing an IObservable
you should assume notifications can be delivered concurrently.
Due to a behavioural contract of Reactive Externsions
, IObservables
should produce one item at a time. Usually, that's what happen, but sometimes external implementations don't follow that contract.
Lets take a look on each of the three problems:
GroupBy
is not thread-safeGroupBy
works by returning an IObservable<IGroupedObservable<T>>
, its OnNext
method calls the outer IObservable
's OnNext
with the IGroupedObservable<T>
which matches the current notification. it does so by keeping one IGroupedObservable<T>
(more accurately one Subject<T>
) for each key inside a Dictionary
- which is unsurprisingly - not a ConcurrentDictionary
. That means two proximate notifications can cause a double insert.
Select
is not aloneSelect
's thread-safety is determined by its provided delegate. In the case above, the delegate provided to the Select
method relies on the fact that Buffer(2, 1)
will provide a list with the size of 2. Buffer
contains a Queue
, which is not concurrent, therefore when iterated from multiple threads - Buffer
's Queue
can provide us some unexpected results.
another Exception
which could be thrown from the same reason is NullReferenceException
if y
would be provided null
, or an InvalidOperationException
for the Queue
could be modified while it's being iterated.
Last but not least, even when you do only the basic observation, the StockTrader
's OnNext
method is modifying the console in a non-atomic operation which causes the wierd text layout.
The Synchronize
method exists to make you able to validate that you are subscribing to a linear IObservable<T>
which means no more than one invocation on OnNext
method can occur concurrently.
Since even the GroupBy
extension method is not thread-safe, the Synchronize
method needs to be invoked in the beginning of the chain:
IObservable<CompanyInfo> differential = market //[F, 1], [S, 5], [S, 4], [F, 2]
.Synchronize()
.GroupBy(x => x.Name) //[F, 1], [F, 2]; [S, 5], [S, 4]
.SelectMany(x => x //4, 8, 2, 3
.Buffer(2, 1) //(4, 8), (8, 2), (2, 3), (3)
.SkipLast(1) //(4, 8), (8, 2), (2, 3)
.Select(y => new CompanyInfo //(+100%), (-75%), (+50%)
{
Name = x.Key,
Value = (y[1].Value - y[0].Value) / y[0].Value
})
); //[F, +100%]; [S, -20%]
Note that Synchronize
adds another proxy Observable
to your query, hence it will make the query a little bit slower, therefore you should avoid using it when it is not required.
The problem with your code is not with the query, nor with Rx per se. The problem is likely coming from either of your actual StockMarket
or StockTrader
implementations.
Now, it's likely that the issue is being caused because you are creating two subscriptions to your market
observable.
When you write this:
using (IDisposable subscription = differential.Subscribe(trader))
{
Observable.Wait(market);
}
...you are getting two subscriptions to market
. One in differential.Subscribe(trader)
and the other because of Observable.Wait(market);
.
I suspect that two concurrent subscriptions are causing your issues, but without seeing the implementation of StockMarket
we can't tell why it's throwing.
This is the danger of implementing your own observable and observer implementations. You should avoid doing that. It would be better to have a property IObservable<CompanyInfo> CompanyValues { get; }
hanging off of CompanyInfo
that is built using the standard Rx operators.
And you should always avoid blocking operations like .Wait(...)
.
As a quick test I would replace your current Observable.Wait(market);
with a Thread.Sleep(?)
with a sufficiently long sleep period to see if your code behaves. Of course you'll need to make sure that you are producing values on a background scheduler (like Scheduler.Default
).
I ran this code to test your query:
public class CompanyInfo
{
public string Name { get; set; }
public double Value { get; set; }
}
public class StockTrader : IObserver<CompanyInfo>
{
public void OnCompleted()
{
Console.WriteLine("Market Closed");
}
public void OnError(Exception error)
{
Console.WriteLine(error);
}
public void OnNext(CompanyInfo value)
{
WriteStock(value);
}
private void WriteStock(CompanyInfo value) { Console.WriteLine($"{value.Name} = {value.Value}"); }
}
public class StockMarket : IObservable<CompanyInfo>
{
private CompanyInfo[] _values = new CompanyInfo[]
{
new CompanyInfo() { Name = "F", Value = 1 },
new CompanyInfo() { Name = "S", Value = 5 },
new CompanyInfo() { Name = "S", Value = 4 },
new CompanyInfo() { Name = "F", Value = 2 },
};
public IDisposable Subscribe(IObserver<CompanyInfo> observable)
{
return _values.ToObservable().ObserveOn(Scheduler.Default).Subscribe(observable);
}
}
...with this:
StockMarket market = new StockMarket();
StockTrader trader = new StockTrader();
IObservable<CompanyInfo> differential = market //[F, 1], [S, 5], [S, 4], [F, 2]
.GroupBy(x => x.Name) //[F, 1], [F, 2]; [S, 5], [S, 4]
.SelectMany(x => x //4, 8, 2, 3
.Buffer(2, 1) //(4, 8), (8, 2), (2, 3), (3)
.SkipLast(1) //(4, 8), (8, 2), (2, 3)
.Select(y => new CompanyInfo //(+100%), (-75%), (+50%)
{
Name = x.Key,
Value = (y[1].Value - y[0].Value) / y[0].Value
}) //[F, +100%]; [S, -20%]
);
IDisposable subscription = differential.Subscribe(trader);
Thread.Sleep(10000);
Not once did I get it to crash or cause any exceptions.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With