Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Switch to a different IObservable if the first is empty

I'm writing a function that retrieves news about a subject and feeds this news back via an IObservable return value.

However, I have several news sources. I don't want to use Merge to combine together these sources into one. Instead, what I'd like to do is order them by priority --

  1. When my function gets called, the first news source is queried (which produces an IObservable representing that source).
  2. If that news source's IObservable completes without returning any results, the next news source is queried.
  3. If that second source completes without returning results, the final news source is queried.
  4. This whole behavior is wrapped up into an observable I can return to the user.

Is this sort of behavior something I can accomplish using built-in Rx extension methods, or do I need to implement a custom class to handle this? How would I approach doing either?

like image 604
David Pfeffer Avatar asked Mar 04 '13 19:03

David Pfeffer


2 Answers

The accepted answer is undesirable in my opinion because it uses Subject, Do, and still subscribes to the second sequence when the first isn't empty. The latter can be a big problem if the second observable invokes anything nontrivial. I came up with the following solution instead:

public static IObservable<T> SwitchIfEmpty<T>(this IObservable<T> @this, IObservable<T> switchTo)
{
    if (@this == null) throw new ArgumentNullException(nameof(@this));
    if (switchTo == null) throw new ArgumentNullException(nameof(switchTo));
    return Observable.Create<T>(obs =>
    {
        var source = @this.Replay(1);
        var switched = source.Any().SelectMany(any => any ? Observable.Empty<T>() : switchTo);
        return new CompositeDisposable(source.Concat(switched).Subscribe(obs), source.Connect());
    });
}

The name SwitchIfEmpty falls in line with the existing RxJava implementation. Here is an ongoing discussion about incorporating some of the RxJava operators into RxNET.

I'm sure a custom IObservable implementation would be much more efficient than mine. You can find one here written by ReactiveX member akarnokd. It's also available on NuGet.

like image 57
Taylor Buchanan Avatar answered Sep 20 '22 13:09

Taylor Buchanan


It sounds like you can just use a plain-old Amb query.

EDIT: based on comment, Amb won't do it - give this a whack:

public static IObservable<T> SwitchIfEmpty<T>(
     this IObservable<T> first, 
     Func<IObservable<T>> second)
{
    return first.IsEmpty().FirstOrDefault() ? second() : first;
}

Test rig:

static Random r = new Random();
public IObservable<string> GetSource(string sourceName)
{
    Console.WriteLine("Source {0} invoked", sourceName);
    return r.Next(0, 10) < 5 
        ? Observable.Empty<string>() 
        : Observable.Return("Article from " + sourceName);
}

void Main()
{
    var query = GetSource("A")
        .SwitchIfEmpty(() => GetSource("B"))
        .SwitchIfEmpty(() => GetSource("C"));

    using(query.Subscribe(Console.WriteLine))
    {
        Console.ReadLine();
    }           
}

Some example runs:

Source A invoked
Article from A

Source A invoked
Source B invoked
Article from B

Source A invoked
Source B invoked
Source C invoked
Article from C

EDITEDIT:

You could also generalize it to this, I suppose:

public static IObservable<T> SwitchIf<T>(
    this IObservable<T> first, 
    Func<IObservable<T>, IObservable<bool>> predicate, 
    Func<IObservable<T>> second)
{
    return predicate(first).FirstOrDefault() 
        ? second() 
        : first;
}
like image 30
JerKimball Avatar answered Sep 19 '22 13:09

JerKimball