Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Merging historical and live stock price data with Rx

I'm trying out Rx because it seems like a good fit for our domain but the learning curve has taken me by surprise.

I need to knit together historical price data with live price data.

I'm trying to adapt the usual approach to doing this into the language of Rx:

  1. Subscribe to the live prices immediately and start buffering the values I get back
  2. Initiate a request for historical price data (this needs to happen after the subscription to live prices so we don't have any gaps in our data)
  3. Publish historical prices as they come back
  4. Once we've received all historical data, publish the buffered live data, removing any values that overlap with our historical data at the beginning
  5. Continue replaying data from the live price feed

I have this disgusting and incorrect straw man code which seems to work for the naive test cases I've written:

IConnectableObservable<Tick> live = liveService
    .For(symbol)
    .Replay(/* Some appropriate buffer size */);
live.Connect();

IObservable<Tick> historical = historyService.For(since, symbol);

return new[] {historical, live}
    .Concat()
    .Where(TicksAreInChronologicalOrder());

private static Func1<Tick,bool> TicksAreInChronologicalOrder()
{
    // Some stateful predicate comparing the timestamp of this tick 
    // to the timestamp of the last tick we saw
}

This has a few drawbacks

  1. The appropriate replay buffer size is not known. Setting an unlimited buffer isn't possible- this is a long-running sequence. Really we want some kind of one-time buffer that flushes on the first call to Subscribe. If this exists in Rx, I can't find it.
  2. The replay buffer will continue to exist even once we've switched to publishing live prices. We don't need the buffer at this point.
  3. Similarly, the predicate to filter out overlapping ticks isn't necessary once we've skipped the initial overlap between historical and live prices. I really want to do something like: live.SkipWhile(tick => tick.Timestamp < /* lazily get last timestamp in historical data */). Is Wait(this IObservable<TSource>) useful here?

There must be a better way to do this, but I'm still waiting for my brain to grok Rx like it does FP.

Another option I've considered to solve 1. is writing my own Rx extension which would be an ISubject that queues messages until it gets its first subscriber (and refuses subscribers after that?). Maybe that's the way to go?

like image 342
Matt Savage Avatar asked Feb 11 '13 14:02

Matt Savage


3 Answers

If your historical and live data are both time-or-scheduler-based, that is, the event stream looks like this over time:

|---------------------------------------------------->  time
    h   h   h   h  h  h                                 historical
                l  l  l  l  l  l                        live

You can use a simple TakeUntil construct:

var historicalStream = <fetch historical data>;
var liveStream = <fetch live data>;

var mergedWithoutOverlap = 
     // pull from historical
     historicalStream
       // until we start overlapping with live
       .TakeUntil(liveStream)
       // then continue with live data
       .Concat(liveStream);

If you get all your historical data all at once, like a IEnumerable<T>, you can use a combination of StartWith and your other logic:

var historicalData = <get IEnumerable of tick data>;
var liveData = <get IObservable of tick data>;

var mergedWithOverlap = 
    // the observable is the "long running" feed
    liveData
    // But we'll inject the historical data in front of it
    .StartWith(historicalData)
    // Perform filtering based on your needs
    .Where( .... );
like image 66
JerKimball Avatar answered Apr 29 '23 01:04

JerKimball


How about something like:

public static IObservable<T> CombineWithHistory<T, TSelectorResult>(this IObservable<T> live, IObservable<T> history, Func<T, TSelectorResult> selector)
{
    var replaySubject = new ReplaySubject<T>();
    live.Subscribe(replaySubject);
    return history.Concat(replaySubject).Distinct(selector);
}

This uses a sequence id and distinct to filter the duplicates.

And the corresponding tests:

var testScheduler = new TestScheduler();

var history = testScheduler.CreateColdObservable(
    OnNext(1L, new PriceTick { PriceId = 1 }),
    OnNext(2L, new PriceTick { PriceId = 2 }),
    OnNext(3L, new PriceTick { PriceId = 3 }),
    OnNext(4L, new PriceTick { PriceId = 4 }),
    OnCompleted(new PriceTick(), 5L));

var live = testScheduler.CreateHotObservable(
    OnNext(1L, new PriceTick { PriceId = 3 }),
    OnNext(2L, new PriceTick { PriceId = 4 }),
    OnNext(3L, new PriceTick { PriceId = 5 }),
    OnNext(4L, new PriceTick { PriceId = 6 }),
    OnNext(5L, new PriceTick { PriceId = 7 }),
    OnNext(6L, new PriceTick { PriceId = 8 }),
    OnNext(7L, new PriceTick { PriceId = 9 })
    );


live.Subscribe(pt => Console.WriteLine("Live {0}", pt.PriceId));
history.Subscribe(pt => Console.WriteLine("Hist {0}", pt.PriceId), () => Console.WriteLine("C"));

var combined = live.CombineWithHistory(history, t => t.PriceId);

combined.Subscribe(pt => Console.WriteLine("Combined {0}", pt.PriceId));

testScheduler.AdvanceTo(6L);

If you execute this test, combined emits price ticks with ids 1 to 8.

like image 26
Dave Hillier Avatar answered Apr 29 '23 03:04

Dave Hillier


For the record, here's what I did in the end. I'm still very much an Rx learner, and returning to .Net having last seen it at version 2.0. All feedback is very gratefully received.

The Ticks object used below may contain one or more tick values. The historical data service returns data in several Ticks.

public class HistoricalAndLivePriceFeed : IPriceFeed
{
    private readonly IPriceFeed history;
    private readonly IPriceFeed live;
    private readonly IClock clock;

    public HistoricalAndLivePriceFeed(IPriceFeed history, IPriceFeed live)
:            this(history, live, new RealClock())
        {
    }
    public HistoricalAndLivePriceFeed(IPriceFeed history, IPriceFeed live, IClock clock)
    {
        this.history = history;
        this.live = live;
        this.clock = clock;
    }

    public IObservable<Ticks> For(DateTime since, ISymbol symbol)
    {
        return Observable.Create<Ticks>(observer =>
        {
            var liveStream = Buffer<Ticks>.StartBuffering(live.For(since, symbol));

            var definitelyInHistoricalTicks = clock.Now;
            // Sleep to make sure that historical data overlaps our live data
            // If we ever use a data provider with less fresh historical data, we may need to rethink this
            clock.Wait(TimeSpan.FromSeconds(1));

            var liveStreamAfterEndOfHistoricalTicks = liveStream
               .SkipWhile(ticks => ticks.LastTimestamp <= definitelyInHistoricalTicks)
               .Select(ticks => ticks.RemoveBefore(definitelyInHistoricalTicks + 1));

            var subscription = history.For(since, symbol)
               .Select(historicalTicks => historicalTicks.RemoveAtOrAfter(definitelyInHistoricalTicks + 1))
               .Concat(liveStreamAfterEndOfHistoricalTicks)
               .Subscribe(observer);

            return liveStream.And(subscription);
        });
    }
}
public static class CompositeDisposableExtensions
{
    public static CompositeDisposable And(this IDisposable disposable, Action action)
    {
        return And(disposable, Disposable.Create(action));
    }

    public static CompositeDisposable And(this IDisposable disposable, IDisposable other)
    {
        return new CompositeDisposable(disposable, other);
    }
}

Which uses this Rx code, which I still don't quite trust:

using System;
using System.Collections.Generic;
using System.Reactive.Disposables;
using System.Reactive.Subjects;

namespace My.Rx
{
    /// <summary>
    /// Buffers values from an underlying observable when no observers are subscribed.
    /// 
    /// On Subscription, any buffered values will be replayed.
    /// 
    /// Only supports one observer for now.
    /// 
    /// Buffer is an ISubject for convenience of implementation but IObserver methods
    /// are hidden. It is not intended that Buffer should be used as an IObserver,
    /// except through StartBuffering() and it is dangerous to do so because none of 
    /// the IObserver methods check whether Buffer has been disposed.
    /// </summary>
    /// <typeparam name="TSource"></typeparam>
    public class Buffer<TSource> : ISubject<TSource>, IDisposable
    {
        private readonly object gate = new object();
        private readonly Queue<TSource> queue = new Queue<TSource>();

        private bool isDisposed;
        private Exception error;
        private bool stopped;
        private IObserver<TSource> observer = null;
        private IDisposable subscription;

        public static Buffer<TSource> StartBuffering(IObservable<TSource> observable)
        {
            return new Buffer<TSource>(observable);
        }

        private Buffer(IObservable<TSource> observable)
        {
            subscription = observable.Subscribe(this);
        }

        void IObserver<TSource>.OnNext(TSource value)
        {
            lock (gate)
            {
                if (stopped) return;
                if (IsBuffering)
                    queue.Enqueue(value);
                else
                    observer.OnNext(value);
            }
        }

        void IObserver<TSource>.OnError(Exception error)
        {
            lock (gate)
            {
                if (stopped) return;
                if (IsBuffering)
                    this.error = error;
                else
                    observer.OnError(error);
                stopped = true;
            }
        }

        void IObserver<TSource>.OnCompleted()
        {
            lock (gate)
            {
                stopped = true;
            }
        }

        public IDisposable Subscribe(IObserver<TSource> observer)
        {
            lock (gate)
            {
                if (isDisposed)
                    throw new ObjectDisposedException(string.Empty);

                if (this.observer != null)
                    throw new NotImplementedException("A Buffer can currently only support one observer at a time");

                while(!queue.IsEmpty())
                {
                    observer.OnNext(queue.Dequeue());
                }

                if (error != null)
                    observer.OnError(error);
                else if (stopped)
                    observer.OnCompleted();

                this.observer = observer;
                return Disposable.Create(() =>
                                             {
                                                 lock (gate)
                                                 {
                                                                             // Go back to buffering
                                                     this.observer = null;
                                                 }
                                             });
            }
        }

        private bool IsBuffering
        {
            get { return observer == null; }
        }


        public void Dispose()
        {
            lock (gate)
            {
                subscription.Dispose();

                isDisposed = true;
                subscription = null;
                observer = null;
            }
        }
    }
}

Which passes these tests (I haven't bothered checking thread safety yet):

private static readonly Exception exceptionThrownFromUnderlying = new Exception("Hello world");

[Test]
public void ReplaysBufferedValuesToFirstSubscriber()
{
    var underlying = new Subject<int>();
    var buffer = Buffer<int>.StartBuffering(underlying);
    underlying.OnNext(1);
    underlying.OnNext(2);

    var observed = new List<int>();

    buffer.Subscribe(Observer.Create<int>(observed.Add));

    Assert.That(observed, Is.EquivalentTo(new []{1,2}));
}

[Test]
public void PassesNewValuesToObserver()
{
    var underlying = new Subject<int>();
    var buffer = Buffer<int>.StartBuffering(underlying);

    var observed = new List<int>();
    buffer.Subscribe(Observer.Create<int>(observed.Add));

    underlying.OnNext(1);
    underlying.OnNext(2);

    Assert.That(observed, Is.EquivalentTo(new[] { 1, 2 }));
}


[Test]
public void DisposesOfSubscriptions()
{
    var underlying = new Subject<int>();
    var buffer = Buffer<int>.StartBuffering(underlying);

    var observed = new List<int>();

    buffer.Subscribe(Observer.Create<int>(observed.Add))
        .Dispose();

    underlying.OnNext(1);

    Assert.That(observed, Is.Empty);
}

[Test]
public void StartsBufferingAgainWhenSubscriptionIsDisposed()
{
    var underlying = new Subject<int>();
    var buffer = Buffer<int>.StartBuffering(underlying);

    // These should be buffered
    underlying.OnNext(1);
    underlying.OnNext(2);

    var firstSubscriptionObserved = new List<int>();
    using (buffer.Subscribe(Observer.Create<int>(firstSubscriptionObserved.Add)))
    {
        // Should be passed through to first subscription
        underlying.OnNext(3);
    }
    Assert.That(firstSubscriptionObserved, Is.EquivalentTo(new[] { 1, 2, 3 }));

    // First subscription has been disposed-
    // we should be back to buffering again
    underlying.OnNext(4);
    underlying.OnNext(5);

    var secondSubscriptionObserved = new List<int>();
    using (buffer.Subscribe(Observer.Create<int>(secondSubscriptionObserved.Add)))
    {
        // Should be passed through to second subscription
        underlying.OnNext(6);
    }
    Assert.That(secondSubscriptionObserved, Is.EquivalentTo(new[] { 4, 5 ,6}));
}

[Test]
public void DoesNotSupportTwoConcurrentObservers()
{
    // Use .Publish() if you need to do this

    var underlying = new Subject<int>();
    var buffer = Buffer<int>.StartBuffering(underlying);

    buffer.Subscribe(Observer.Create<int>(i => { }));

    Assert.Throws<NotImplementedException>(() => buffer.Subscribe(Observer.Create<int>(i => { })));
}

[Test]
public void CannotBeUsedAfterDisposal()
{
    var underlying = new Subject<int>();
    var buffer = Buffer<int>.StartBuffering(underlying);
    buffer.Dispose();

    Assert.Throws<ObjectDisposedException>(() => buffer.Subscribe(Observer.Create<int>(i => { })));
}

[Test]
public void ReplaysBufferedError()
{
    var underlying = new Subject<int>();
    var buffer = Buffer<int>.StartBuffering(underlying);

    underlying.OnNext(1);
    underlying.OnError(exceptionThrownFromUnderlying);

    var observed = new List<int>();
    Exception foundException = null;
    buffer.Subscribe(
        observed.Add, 
        e => foundException = e);

    Assert.That(observed, Is.EquivalentTo(new []{1}));
    Assert.That(foundException, Is.EqualTo(exceptionThrownFromUnderlying));
}

[Test]
public void ReplaysBufferedCompletion()
{
    var underlying = new Subject<int>();
    var buffer = Buffer<int>.StartBuffering(underlying);

    underlying.OnNext(1);
    underlying.OnCompleted();

    var observed = new List<int>();
    var completed = false;
    buffer.Subscribe(
        observed.Add,
        () => completed=true);

    Assert.That(observed, Is.EquivalentTo(new[] { 1 }));
    Assert.True(completed);
}

[Test]
public void ReplaysBufferedErrorToSubsequentObservers()
{
    var underlying = new Subject<int>();
    var buffer = Buffer<int>.StartBuffering(underlying);

    underlying.OnNext(1);
    underlying.OnError(exceptionThrownFromUnderlying);

    // Drain value queue
    using (buffer.Subscribe(Observer.Create<int>(i => { }, e => { }))) ;

    var observered = new List<int>();
    Exception exceptionEncountered = null;
    using (buffer.Subscribe(Observer.Create<int>(observered.Add, e=>exceptionEncountered=e)));

    Assert.That(observered, Is.Empty);
    Assert.That(exceptionEncountered, Is.EqualTo(exceptionThrownFromUnderlying));
}

[Test]
public void ReplaysBufferedCompletionToSubsequentObservers()
{
    var underlying = new Subject<int>();
    var buffer = Buffer<int>.StartBuffering(underlying);

    underlying.OnNext(1);
    underlying.OnCompleted();

    // Drain value queue
    using (buffer.Subscribe(Observer.Create<int>(i => { }))) ;

    var observered = new List<int>();
    var completed = false;
    using (buffer.Subscribe(Observer.Create<int>(observered.Add, ()=>completed=true)));

    Assert.That(observered, Is.Empty);
    Assert.True(completed);
}



[Test]
public void DisposingOfBufferDisposesUnderlyingSubscription()
{
    var underlyingSubscriptionWasDisposed = false;
    var underlying = Observable.Create<int>(observer => Disposable.Create(() => underlyingSubscriptionWasDisposed=  true   ));

    var buffer = Buffer<int>.StartBuffering(underlying);
    buffer.Dispose();

    Assert.True(underlyingSubscriptionWasDisposed);
}
like image 42
Matt Savage Avatar answered Apr 29 '23 03:04

Matt Savage