Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Cycling dependencies between streams in reactive programming

Dabbling in reactive programming, I often encounter situations where two streams depend on each other. What is an idiomatic way to solve these cases?

A minimal example: There are buttons A and B, both display a value. Clicking on A must increment the value of A by B. Clicking on B must set the value of B to A.

First solution I could come up with (example in F#, but answers in any language are welcome):

let solution1 buttonA buttonB =
    let mutable lastA = 0
    let mutable lastB = 1
    let a = new Subject<_> ()
    let b = new Subject<_> ()
    (OnClick buttonA).Subscribe(fun _ -> lastA <- lastA + lastB; a.OnNext lastA) 
    (OnClick buttonB).Subscribe(fun _ -> lastB <- lastA; b.OnNext lastB)
    a.Subscribe(SetText buttonA)
    b.Subscribe(SetText buttonA)
    a.OnNext 0
    b.OnNext 1

This solution uses mutable state and subjects, it not very readable and does not look idiomatic.

The second solution I tried involves creating a method which links two dependent streams together:

let dependency (aGivenB: IObservable<_> -> IObservable<_>) (bGivenA: IObservable<_> -> IObservable<_>) =
    let bProxy = new ReplaySubject<_> () 
    let a = aGivenB bProxy
    let b = bGivenA a
    b.Subscribe(bProxy.OnNext)
    a, b

let solution2 buttonA buttonB =
    let aGivenB b =
        Observable.WithLatestFrom(OnClick buttonA, b, fun click bValue -> bValue)
                  .Scan(fun acc x -> acc + x)
                  .StartWith(0)
    let bGivenA a =
        Observable.Sample(a, OnClick buttonB)
                  .StartWith(1)
    let a, b = dependency aGivenB bGivenA
    a.Subscribe(SetText buttonA)
    b.Subscribe(SetText buttonB)

This seems a bit better, but since there exists no method like dependency in the reactive library, I believe there exist a more idiomatic solution. It is also easy to introduce infinite recursion by using the second approach.

What is the recommended way to approach problems involving cycling dependency between streams, such as in the example above, in reactive programming?

like image 608
Steve Avatar asked Feb 10 '17 14:02

Steve


3 Answers

EDIT:

Here's an F# solution:

type DU = 
    | A 
    | B 

type State = { AValue : int; BValue : int }

let solution2 (aObservable:IObservable<_>, bObservable:IObservable<_>) = 

    let union = aObservable.Select(fun _ -> A).Merge(bObservable.Select(fun _ -> B))

    let result = union.Scan({AValue = 0; BValue = 1}, fun state du -> match du with
        | A -> { state with AValue = state.AValue + state.BValue }
        | B -> { state with BValue = state.AValue }
    )

    result

F# is actually a great language for this, thanks to the built-in discriminated unions and records. Here's an answer written in C#, with a custom Discriminated Union; my F# is rather rusty.

The trick is to turn your two observables into one observable using discriminated union. So basically uniting a and b into one observable of a discriminated union:

a : *---*---*---**
b : -*-*--*---*---
du: ab-ba-b-a-b-aa

Once that is done, so you can react to if the item is an 'A' push or a 'B' push.

Just to confirm, I assume there's no way to explicitly set the value embedded in ButtonA/ButtonB. If there is, those changes should be modeled as observables, and also worked into the discriminated union.

var a = new Subject<Unit>();
var b = new Subject<Unit>();
var observable = a.DiscriminatedUnion(b)
    .Scan(new State(0, 1), (state, du) => du.Unify(
        /* A clicked case */_ => new State(state.A + state.B, state.B), 
        /* B clicked case */_ => new State(state.A, state.A)
    )
);

observable.Subscribe(state => Console.WriteLine($"a = {state.A}, b = {state.B}"));
a.OnNext(Unit.Default);
a.OnNext(Unit.Default);
a.OnNext(Unit.Default);
a.OnNext(Unit.Default);
b.OnNext(Unit.Default);
a.OnNext(Unit.Default);
a.OnNext(Unit.Default);
a.OnNext(Unit.Default);
a.OnNext(Unit.Default);
b.OnNext(Unit.Default);

Here's the classes this relies on in C#. Most of this translates easily to built-in F# types.

public class State /*easily replaced with an F# record */
{
    public State(int a, int b)
    {
        A = a;
        B = b;
    }

    public int A { get; }
    public int B { get; }
}

/* easily replaced with built-in discriminated unions and pattern matching */
public static class DiscriminatedUnionExtensions
{
    public static IObservable<DiscriminatedUnionClass<T1, T2>> DiscriminatedUnion<T1, T2>(this IObservable<T1> a, IObservable<T2> b)
    {
        return Observable.Merge(
            a.Select(t1 => DiscriminatedUnionClass<T1, T2>.Create(t1)),
            b.Select(t2 => DiscriminatedUnionClass<T1, T2>.Create(t2))
        );
    }

    public static IObservable<TResult> Unify<T1, T2, TResult>(this IObservable<DiscriminatedUnionClass<T1, T2>> source,
        Func<T1, TResult> f1, Func<T2, TResult> f2)
    {
        return source.Select(union => Unify(union, f1, f2));
    }

    public static TResult Unify<T1, T2, TResult>(this DiscriminatedUnionClass<T1, T2> union, Func<T1, TResult> f1, Func<T2, TResult> f2)
    {
        return union.Item == 1
            ? f1(union.Item1)
            : f2(union.Item2)
        ;
    }
}

public class DiscriminatedUnionClass<T1, T2>
{
    private readonly T1 _t1;
    private readonly T2 _t2;
    private readonly int _item;
    private DiscriminatedUnionClass(T1 t1, T2 t2, int item)
    {
        _t1 = t1;
        _t2 = t2;
        _item = item;
    }

    public int Item
    {
        get { return _item; }
    }

    public T1 Item1
    {
        get { return _t1; }
    }

    public T2 Item2
    {
        get { return _t2; }
    }

    public static DiscriminatedUnionClass<T1, T2> Create(T1 t1)
    {
        return new DiscriminatedUnionClass<T1, T2>(t1, default(T2), 1);
    }

    public static DiscriminatedUnionClass<T1, T2> Create(T2 t2)
    {
        return new DiscriminatedUnionClass<T1, T2>(default(T1), t2, 2);
    }
}
like image 139
Shlomo Avatar answered Oct 20 '22 22:10

Shlomo


Here's a very simple solution using Gjallarhorn:

#r @"..\packages\Gjallarhorn\lib\portable-net45+netcore45+wpa81+wp8+MonoAndroid1+MonoTouch1\Gjallarhorn.dll"

open Gjallarhorn

(*
    Clicking on A must increment the value of A by B. Clicking on B must set the value of B to A.
*)
let  a = Mutable.create 3
let b = Mutable.create 4

let clickA() = a.Value <- a.Value + b.Value
let clickB() = b.Value <- a.Value

let d1 = Signal.Subscription.create (fun x -> printfn "%A" <| "Clicked A: " + x.ToString()) a
let d2 = Signal.Subscription.create (fun x -> printfn "%A" <| "Clicked B: " + x.ToString()) b

clickA()
clickB()  

It's actually quite similar to your initial so does use mutable state but makes binding to a UI quite easy, for more idiomatic usage see this blog post.

like image 1
s952163 Avatar answered Oct 21 '22 00:10

s952163


Assuming that the output eventually gets sent back to the source you can do this with basic operators. All you have to do is call withLatestFrom twice for each button/signal observable. My solution is in java but it should be easy enough to follow!

private static Pair<Observable<Integer>, Observable<Integer>> test(
    final Observable<Integer> aValues,
    final Observable<Integer> bValues,
    final Observable<Void> aButton,
    final Observable<Void> bButton,
    final Func2<Integer, Integer, Integer> aFunction,
    final Func2<Integer, Integer, Integer> bFunction
) {
    return new Pair<>(
        aButton.withLatestFrom(aValues, (button, a) -> a).withLatestFrom(bValues, aFunction),
        bButton.withLatestFrom(aValues, (button, a) -> a).withLatestFrom(bValues, bFunction)
    );
}

Heres the test code I used:

final TestScheduler scheduler = new TestScheduler();

final TestSubject<Integer> aSubject = TestSubject.create(scheduler);
final TestSubject<Integer> bSubject = TestSubject.create(scheduler);
aSubject.onNext(1);
bSubject.onNext(1);

final TestSubject<Void> aButton = TestSubject.create(scheduler);
final TestSubject<Void> bButton = TestSubject.create(scheduler);

final Pair<Observable<Integer>, Observable<Integer>> pair = test(
    aSubject, bSubject, aButton, bButton, (a, b) -> a + b, (a, b) -> a
);

pair.component1().subscribe(aSubject::onNext);
pair.component2().subscribe(bSubject::onNext);
pair.component1().map(a -> "A: " + a).subscribe(System.out::println);
pair.component2().map(b -> "B: " + b).subscribe(System.out::println);

aButton.onNext(null); scheduler.triggerActions();
bButton.onNext(null); scheduler.triggerActions();
aButton.onNext(null); scheduler.triggerActions();
aButton.onNext(null); scheduler.triggerActions();
bButton.onNext(null); scheduler.triggerActions();

This prints:

A: 2
B: 2
A: 4
A: 6
B: 6
like image 1
flakes Avatar answered Oct 21 '22 00:10

flakes