Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Rx, dynamically merging sources

I am looking for a way to dynamically merge data sources without interruption. A real world scenario would be pulling like data from multiple sources without consideration of redundant information.

To simplify the code, I have replaced more complex code with a simple number generator that will continuously produce data. This could be compared reading a continual stream of data from multiple external servers.

I want to be able to merge the two sources and print the results (when appropriate) to the console, this part works great. When we terminate those two sources and merge in another source is where things stop working as expected. In this case, we could just as easily reconnect mergedStreamObserver, however, in a much larger app, we would have to be concerned about gaps in data as well as keeping track what Observers are subscribed.

Is there a way around this?

// imports
using System;
using System.Reactive.Linq;
using System.Threading;
using System.Threading.Tasks;

static void Main(string[] args) {
    // base "stream of results" as we will want to randomly add (and terminate other sources)
    IObservable<int> merged = Observable.Empty<int>();

    // source 1
    var tokenSource1 = new CancellationTokenSource();
    IObservable<int> xs = Generate(tokenSource1, "A");

    // to avoid generating the same numbers, which does happen,
    // sleep some amount of time before calling generate again
    Thread.Sleep(100);

    // source 2
    var tokenSource2 = new CancellationTokenSource();
    IObservable<int> xt = Generate(tokenSource2, "B");

    // odd queries
    var seq1 = from n in xs where n % 2 == 1 select n;

    // even queries
    var seq2 = from n in xt where n % 2 == 0 select n;

    // merge everything together
    merged = merged.Merge<int>(seq1);
    merged = merged.Merge<int>(seq2);

    // observer for the merged "streams"
    // NOTE: while this does not appear to be working correctly,
    // remember you have 2 streams and 2 queries at work.  It
    // really is doing what it's expected to here.
    IDisposable mergedStreamObserver = merged.Subscribe(str => { Console.WriteLine(str); });

    // kill both sources
    Console.ReadKey();

    tokenSource1.Cancel();
    tokenSource2.Cancel();

    // start source and query for evens
    // try to merge it
    Console.ReadKey();

    tokenSource2 = new CancellationTokenSource();
    xt = Generate(tokenSource2, "B");

    seq2 = from n in xt where n % 2 == 0 select n;

    merged = merged.Merge(seq2);

    // Nothing is happening because the merged stream was modified.
    // How do we create a composite Observable from multiple sources
    // and dynamically add/terminate those sources?

    Console.ReadKey();

    tokenSource2.Cancel();
    mergedStreamObserver.Dispose();
    Console.ReadKey();
}

static IObservable<int> Generate(CancellationTokenSource tokenSource, string name) {
    Random random = new Random();

    Action<int> observer = _ => { };    /* We could use null, but then at every invocation
                                            * we'd have to copy to a local and check for null.
                                            */

    Task.Factory.StartNew(() => {
            while(!tokenSource.IsCancellationRequested) {
                var t = random.Next(0, 100);
                Console.WriteLine("From Generator {0}: {1}", name, t);

                observer(t);

                Thread.Sleep(1000);
            }

            Console.WriteLine("Random Generator Stopped");
        }, tokenSource.Token);

    return Observable.FromEvent<int>(
        eh => observer += eh,
        eh => observer -= eh);
}
like image 831
codeape Avatar asked Jul 09 '13 16:07

codeape


1 Answers

Use a subject, and subscribe to the merged stream before creating the streams:

var streams = new Subject<IObservable<int>>();
var mergedStreams = streams.Merge();
var mergedObserver = mergedStreams.Subscribe(...);

// now create your streams
...

// add them to the streams subject
streams.OnNext(seq1);
streams.OnNext(seq2);
...

streams.OnNext(seq3);
streams.OnNext(seq4);

...
// If we know there will be no more streams, tell the Subject...
streams.OnCompleted();
like image 75
Brandon Avatar answered Sep 20 '22 02:09

Brandon