Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

RxJava share with last value for new subscribers

Tags:

rx-java

I have created an Observable as a result of transformation of BehaviourSubject with a lot of functions. Now I want to share the values of that Observable, so the chain of functions would not be re-executed for every new subscriber. Also I want the shared copy to behave the same as original, i.e. newly arrived subscriber should get last emitted value just after subscription.

In 0.20.x it was possible to use multicast(subjectFactory).refCount() with factory of BehaviourSubject's, or simply use share(initialValue), which in turn used BehaviourSubject instead of PublishSubject.

How do I achieve the same behaviour in 1.0.x?

like image 449
avakhrenev Avatar asked Nov 20 '14 14:11

avakhrenev


1 Answers

I think you can replace multicast(behaviorSubjectFactory).refCount() by replay(1).refCount().

To make the discussion a bit more concrete, here's a complete example (in Scala):

@volatile var startTime: Long = 0
def printTimestamped(s: String) {
  println(s"[t=${System.currentTimeMillis-startTime}] $s")
}

// Suppose for simplicity that the UI Events are just ticks of a
// hot timer observable.
val uiEvents = Observable.timer(1000 millis, 1000 millis)
        .doOnEach(i => printTimestamped("producing " + i))
        .publish

// Now apply all the transformations
val transformed = uiEvents.map(i => i + 101)
        .doOnEach(i => printTimestamped("transformed to " + i))

// And set a default start value
val o1 = transformed.startWith(100)

// Share and make sure new subscribers get the last element replayed
// immediately after they subscribe:
val o2 = o1.replay(1).refCount

// startTime is just before we start the uiEvents observable
startTime = System.currentTimeMillis
val subscriptionUiEvents = uiEvents.connect

Thread.sleep(500)

printTimestamped("subscribing A")
val subscriptionA = o2.subscribe(i => printTimestamped("A got " + i))

Thread.sleep(2000)

printTimestamped("subscribing B")
val subscriptionB = o2.subscribe(i => printTimestamped("B got " + i))

Thread.sleep(2000)

printTimestamped("unsubscribing B")
subscriptionB.unsubscribe()

Thread.sleep(2000)

printTimestamped("unsubscribing A")
subscriptionA.unsubscribe()

// Now the transformations will stop being executed, but the UI
// events will still be produced

Thread.sleep(2000)

// Finally, also stop the UI events:
subscriptionUiEvents.unsubscribe()

Output:

[t=505] subscribing A
[t=519] A got 100
[t=1002] producing 0
[t=1003] transformed to 101
[t=1003] A got 101
[t=2002] producing 1
[t=2002] transformed to 102
[t=2002] A got 102
[t=2520] subscribing B
[t=2521] B got 102
[t=3003] producing 2
[t=3003] transformed to 103
[t=3003] A got 103
[t=3003] B got 103
[t=4002] producing 3
[t=4002] transformed to 104
[t=4002] A got 104
[t=4002] B got 104
[t=4521] unsubscribing B
[t=5003] producing 4
[t=5003] transformed to 105
[t=5003] A got 105
[t=6002] producing 5
[t=6002] transformed to 106
[t=6002] A got 106
[t=6522] unsubscribing A
[t=7003] producing 6
[t=8002] producing 7

Original answer:

Quoting the release notes for 1.0.0:

Removed any method overload that took an initial value since the startWith operator already allows that generically.

So instead of share(initialValue), just use share().startWith(initialValue).

like image 183
Samuel Gruetter Avatar answered Jan 03 '23 11:01

Samuel Gruetter