Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Recursive / fan-out in Reactive Extensions

I'm attempting to piece together a Rx pipeline that works as follows:

  1. I have written a function that takes in an IObservable providing me with profiles containing information about a company
  2. I query a variety of data sources to find potentially related company profiles, all in parallel. I merge that into a single IObservable of company profiles.
  3. When I get back these potentially related profiles, I compare them to profiles that I have already observed and, if they have a relevance > 80% and are not the same as any profiles I've already observed, I consider them a match.
  4. I want to feed the matching companies back into step 1 so that I can search for related data to these new matching profiles.

I bootstrap the process with some known good profiles.

Eventually, there's no more matching profiles that haven't already been seen, and so the process ends.

I'm having trouble programming this. If I use a Subject to allow the tail end of the pipeline to send its profiles to the beginning of the workflow, then no one is going to call OnCompleted and I never find out that the process has ended. If I develop this with recursion instead, I seem to always end up with a stack overflow since I'm trying to call a function with its own return value.

Can anyone help me with how I can accomplish this task in a way where I can determine that the process has ended?

like image 473
David Pfeffer Avatar asked Feb 17 '23 06:02

David Pfeffer


1 Answers

It sounds like you want a dataflow like this:

seed profiles --> source --> get related --> output
                     ^                    |
                     |                    v
                     -<--- transform <-----

This seems like a case where solving the general problem is as easy or easier than the specific one, so I'll propose a generic "feedback" function that should give you the building blocks you need:

edit: fixed function to complete

IObservable<TResult> Feedback<T, TResult>(this IObservable<T> seed,
                                          Func<T, IObservable<TResult>> produce,
                                          Func<TResult, IObservable<T>> feed)
    {
        return Observable.Create<TResult>(
                obs =>
                {
                    var ret = new CompositeDisposable();
                    Action<IDisposable> partComplete = 
                        d =>
                        {
                            ret.Remove(d);
                            if (ret.Count == 0) obs.OnCompleted();
                        };
                    Action<IObservable<T>, Action<T>> ssub =
                        (o, n) =>
                        {
                            var disp = new SingleAssignmentDisposable();
                            ret.Add(disp);
                            disp.Disposable = o.Subscribe(n, obs.OnError, () => partComplete(disp));
                        };
                    Action<IObservable<TResult>, Action<TResult>> rsub =
                        (o, n) =>
                        {
                            var disp = new SingleAssignmentDisposable();
                            ret.Add(disp);
                            disp.Disposable = o.Subscribe(n, obs.OnError, () => partComplete(disp));
                        };

                    Action<T> recurse = null;
                    recurse = s =>
                              {
                                  rsub(produce(s),
                                       r => 
                                       {
                                           obs.OnNext(r);
                                           ssub(feed(r), recurse);
                                       });
                              };

                    ssub(seed, recurse);
                    return ret;
                });
    }

In your case, T and TResult appear to be the same, so feed will be the identity function. produce will be the functions used to implement step 2 and 3.

Some sample code I tested this function with:

void Main()
{
    var seed = new int[] { 1, 2, 3, 4, 5, 6 };
    var found = new HashSet<int>();
    var mults = seed.ToObservable()
                    .Feedback(i =>
                              {
                                  return Observable.Range(0, 5)
                                         .Select(r => r * i)
                                         .TakeWhile(v => v < 100)
                                         .Where(v => found.Add(v));
                              },
                              i => Observable.Return(i));

    using (var disp = mults.Dump())
    {
        Console.WriteLine("Press any key to stop");
        Console.ReadKey();
    }
    Console.WriteLine("Press any key to exit");
    Console.ReadKey();
}

static IDisposable Dump<T>(this IObservable<T> source)
{
    return source.Subscribe(item => Console.WriteLine(item),
                            ex => Console.WriteLine("Error occurred in dump observable: " + ex.ToString()),
                            () => Console.WriteLine("Dump completed"));
}
like image 77
Gideon Engelberth Avatar answered Feb 26 '23 21:02

Gideon Engelberth