I'm writing a function that retrieves news about a subject and feeds this news back via an IObservable return value.
However, I have several news sources. I don't want to use Merge
to combine together these sources into one. Instead, what I'd like to do is order them by priority --
Is this sort of behavior something I can accomplish using built-in Rx extension methods, or do I need to implement a custom class to handle this? How would I approach doing either?
The accepted answer is undesirable in my opinion because it uses Subject
, Do
, and still subscribes to the second sequence when the first isn't empty. The latter can be a big problem if the second observable invokes anything nontrivial. I came up with the following solution instead:
public static IObservable<T> SwitchIfEmpty<T>(this IObservable<T> @this, IObservable<T> switchTo)
{
if (@this == null) throw new ArgumentNullException(nameof(@this));
if (switchTo == null) throw new ArgumentNullException(nameof(switchTo));
return Observable.Create<T>(obs =>
{
var source = @this.Replay(1);
var switched = source.Any().SelectMany(any => any ? Observable.Empty<T>() : switchTo);
return new CompositeDisposable(source.Concat(switched).Subscribe(obs), source.Connect());
});
}
The name SwitchIfEmpty
falls in line with the existing RxJava implementation. Here is an ongoing discussion about incorporating some of the RxJava operators into RxNET.
I'm sure a custom IObservable
implementation would be much more efficient than mine. You can find one here written by ReactiveX member akarnokd. It's also available on NuGet.
It sounds like you can just use a plain-old Amb
query.
EDIT: based on comment, Amb
won't do it - give this a whack:
public static IObservable<T> SwitchIfEmpty<T>(
this IObservable<T> first,
Func<IObservable<T>> second)
{
return first.IsEmpty().FirstOrDefault() ? second() : first;
}
Test rig:
static Random r = new Random();
public IObservable<string> GetSource(string sourceName)
{
Console.WriteLine("Source {0} invoked", sourceName);
return r.Next(0, 10) < 5
? Observable.Empty<string>()
: Observable.Return("Article from " + sourceName);
}
void Main()
{
var query = GetSource("A")
.SwitchIfEmpty(() => GetSource("B"))
.SwitchIfEmpty(() => GetSource("C"));
using(query.Subscribe(Console.WriteLine))
{
Console.ReadLine();
}
}
Some example runs:
Source A invoked
Article from A
Source A invoked
Source B invoked
Article from B
Source A invoked
Source B invoked
Source C invoked
Article from C
EDITEDIT:
You could also generalize it to this, I suppose:
public static IObservable<T> SwitchIf<T>(
this IObservable<T> first,
Func<IObservable<T>, IObservable<bool>> predicate,
Func<IObservable<T>> second)
{
return predicate(first).FirstOrDefault()
? second()
: first;
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With