Here is a picture of what I am attempting to accomplish.
--a-b-c-a--bbb--a
split into
--a-----a-------a --> a stream
----b------bbb--- --> b stream
------c---------- --> c stream
Then, be able to
a.subscribe() b.subscribe() c.subscribe()
So far, everything I have found has split the stream using a groupBy(), but then collapsed everything back into a single stream and process them all in the same function. What I want to do is process each derived stream in a different way.
The way I'm doing it right now is doing a bunch of filters. Is there a better way to do this?
Easy as pie, just use filter
An example in scala
import rx.lang.scala.Observable val o: Observable[String] = Observable.just("a", "b", "c", "a", "b", "b", "b", "a") val hotO: Observable[String] = o.share val aSource: Observable[String] = hotO.filter(x ⇒ x == "a") val bSource: Observable[String] = hotO.filter(x ⇒ x == "b") val cSource: Observable[String] = hotO.filter(x ⇒ x == "c") aSource.subscribe(o ⇒ println("A: " + o), println, () ⇒ println("A Completed")) bSource.subscribe(o ⇒ println("B: " + o), println, () ⇒ println("B Completed")) cSource.subscribe(o ⇒ println("C: " + o), println, () ⇒ println("C Completed"))
You just need to make sure that the source observable is hot. The easiest way is to share
it.
You don't have to collapse Observables
from groupBy
. You can instead subscribe to them.
Something like this:
String[] inputs= {"a", "b", "c", "a", "b", "b", "b", "a"}; Action1<String> a = s -> System.out.print("-a-"); Action1<String> b = s -> System.out.print("-b-"); Action1<String> c = s -> System.out.print("-c-"); Observable .from(inputs) .groupBy(s -> s) .subscribe((g) -> { if ("a".equals(g.getKey())) { g.subscribe(a); } if ("b".equals(g.getKey())) { g.subscribe(b); } if ("c".equals(g.getKey())) { g.subscribe(c); } });
If statements look kinda ugly but at least you can handle each stream separately. Maybe there is a way of avoiding them.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With