I have created an Observable
as a result of transformation of BehaviourSubject
with a lot of functions. Now I want to share the values of that Observable
, so the chain of functions would not be re-executed for every new subscriber. Also I want the shared copy to behave the same as original, i.e. newly arrived subscriber should get last emitted value just after subscription.
In 0.20.x
it was possible to use multicast(subjectFactory).refCount()
with factory of BehaviourSubject
's, or simply use share(initialValue)
, which in turn used BehaviourSubject
instead of PublishSubject
.
How do I achieve the same behaviour in 1.0.x
?
I think you can replace multicast(behaviorSubjectFactory).refCount()
by replay(1).refCount()
.
To make the discussion a bit more concrete, here's a complete example (in Scala):
@volatile var startTime: Long = 0
def printTimestamped(s: String) {
println(s"[t=${System.currentTimeMillis-startTime}] $s")
}
// Suppose for simplicity that the UI Events are just ticks of a
// hot timer observable.
val uiEvents = Observable.timer(1000 millis, 1000 millis)
.doOnEach(i => printTimestamped("producing " + i))
.publish
// Now apply all the transformations
val transformed = uiEvents.map(i => i + 101)
.doOnEach(i => printTimestamped("transformed to " + i))
// And set a default start value
val o1 = transformed.startWith(100)
// Share and make sure new subscribers get the last element replayed
// immediately after they subscribe:
val o2 = o1.replay(1).refCount
// startTime is just before we start the uiEvents observable
startTime = System.currentTimeMillis
val subscriptionUiEvents = uiEvents.connect
Thread.sleep(500)
printTimestamped("subscribing A")
val subscriptionA = o2.subscribe(i => printTimestamped("A got " + i))
Thread.sleep(2000)
printTimestamped("subscribing B")
val subscriptionB = o2.subscribe(i => printTimestamped("B got " + i))
Thread.sleep(2000)
printTimestamped("unsubscribing B")
subscriptionB.unsubscribe()
Thread.sleep(2000)
printTimestamped("unsubscribing A")
subscriptionA.unsubscribe()
// Now the transformations will stop being executed, but the UI
// events will still be produced
Thread.sleep(2000)
// Finally, also stop the UI events:
subscriptionUiEvents.unsubscribe()
Output:
[t=505] subscribing A
[t=519] A got 100
[t=1002] producing 0
[t=1003] transformed to 101
[t=1003] A got 101
[t=2002] producing 1
[t=2002] transformed to 102
[t=2002] A got 102
[t=2520] subscribing B
[t=2521] B got 102
[t=3003] producing 2
[t=3003] transformed to 103
[t=3003] A got 103
[t=3003] B got 103
[t=4002] producing 3
[t=4002] transformed to 104
[t=4002] A got 104
[t=4002] B got 104
[t=4521] unsubscribing B
[t=5003] producing 4
[t=5003] transformed to 105
[t=5003] A got 105
[t=6002] producing 5
[t=6002] transformed to 106
[t=6002] A got 106
[t=6522] unsubscribing A
[t=7003] producing 6
[t=8002] producing 7
Original answer:
Quoting the release notes for 1.0.0:
Removed any method overload that took an initial value since the startWith operator already allows that generically.
So instead of share(initialValue)
, just use share().startWith(initialValue)
.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With