Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

store retrieve IObservable subscription state in Rx

[ this question is in the realm of Reactive Extensions (Rx) ]

A subscription that needs to continue on application restart

int nValuesBeforeOutput = 123;

myStream.Buffer(nValuesBeforeOutput).Subscribe(
    i => Debug.WriteLine("Something Critical on Every 123rd Value"));

Now I need to serialize and deserialize the state of this subscription so that next time the application is started the buffer count does NOT start from zero, but from whatever the buffer count got to before application exit.

  • How could you persist the state of IObservable.Subscribe() in this case and later load it?
  • Is there a general solution to saving observer state in Rx?



From Answer to Solution

Based on Paul Betts approach, here's a semi-generalizable implementation that worked in my initial testing

Use

int nValuesBeforeOutput = 123;

var myRecordableStream = myStream.Record(serializer);
myRecordableStream.Buffer(nValuesBeforeOutput).ClearRecords(serializer).Subscribe(
    i => Debug.WriteLine("Something Critical on Every 123rd Value"));

Extension methods

    private static bool _alreadyRecording;

    public static IObservable<T> Record<T>(this IObservable<T> input,
                                           IRepositor repositor) 
    {
        IObservable<T> output = input;
        List<T> records = null;
        if (repositor.Deserialize(ref records))
        {
            ISubject<T> history = new ReplaySubject<T>();
            records.ForEach(history.OnNext);
            output = input.Merge(history);
        }
        if (!_alreadyRecording)
        {
            _alreadyRecording = true;
            input.Subscribe(i => repositor.SerializeAppend(new List<T> {i}));
        }
        return output;
    }

    public static IObservable<T> ClearRecords<T>(this IObservable<T> input,
                                                 IRepositor repositor)
    {
        input.Subscribe(i => repositor.Clear());
        return input;
    }

Notes

  • This will not work for storing states that depend on time-intervals between the values produced
  • You need a serializer implementation that supports serializing T
  • _alreadyRecording is needed if you subscribe to myRecordableStream more than once
  • _alreadyRecording is a static boolean, very ugly, and prevents the extension methods from being used in more than one place if needing parallel subscriptions - needs to be re-implemented for future use
like image 850
Cel Avatar asked Apr 10 '12 12:04

Cel


1 Answers

There is no general solution for this, and making one would be NonTrivial™. The closest thing you can do is make myStream some sort of replay Observable (i.e. instead of serializing the state, serialize the state of myStream and redo the work to get you back to where you were).

like image 79
Ana Betts Avatar answered Nov 14 '22 00:11

Ana Betts