Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Split Rx Observable into multiple streams and process individually

Here is a picture of what I am attempting to accomplish.

--a-b-c-a--bbb--a

split into

--a-----a-------a --> a stream

----b------bbb--- --> b stream

------c---------- --> c stream

Then, be able to

a.subscribe() b.subscribe() c.subscribe() 

So far, everything I have found has split the stream using a groupBy(), but then collapsed everything back into a single stream and process them all in the same function. What I want to do is process each derived stream in a different way.

The way I'm doing it right now is doing a bunch of filters. Is there a better way to do this?

like image 315
Brandon Bil Avatar asked Mar 04 '15 12:03

Brandon Bil


2 Answers

Easy as pie, just use filter

An example in scala

import rx.lang.scala.Observable  val o: Observable[String] = Observable.just("a", "b", "c", "a", "b", "b", "b", "a") val hotO: Observable[String] = o.share val aSource: Observable[String] = hotO.filter(x ⇒ x == "a") val bSource: Observable[String] = hotO.filter(x ⇒ x == "b") val cSource: Observable[String] = hotO.filter(x ⇒ x == "c")  aSource.subscribe(o ⇒ println("A: " + o), println, () ⇒ println("A Completed"))  bSource.subscribe(o ⇒ println("B: " + o), println, () ⇒ println("B Completed"))  cSource.subscribe(o ⇒ println("C: " + o), println, () ⇒ println("C Completed")) 

You just need to make sure that the source observable is hot. The easiest way is to share it.

like image 157
Tomáš Dvořák Avatar answered Sep 16 '22 23:09

Tomáš Dvořák


You don't have to collapse Observables from groupBy. You can instead subscribe to them.

Something like this:

String[] inputs= {"a", "b", "c", "a", "b", "b", "b", "a"};  Action1<String> a = s -> System.out.print("-a-");  Action1<String> b = s -> System.out.print("-b-");  Action1<String> c = s -> System.out.print("-c-");  Observable     .from(inputs)     .groupBy(s -> s)     .subscribe((g) -> {         if ("a".equals(g.getKey())) {             g.subscribe(a);         }          if ("b".equals(g.getKey())) {             g.subscribe(b);         }          if ("c".equals(g.getKey())) {             g.subscribe(c);         }     }); 

If statements look kinda ugly but at least you can handle each stream separately. Maybe there is a way of avoiding them.

like image 33
ihuk Avatar answered Sep 19 '22 23:09

ihuk