I know you should avoid that at all costs, but what if I have a valid use case for a subclassed Observable in RxJava? Is it possible? How could I do it?
In this specific case, I have a "repository" class which currently returns Requests:
class Request<T> {
public abstract Object key();
public abstract Observable<T> asObservable();
[...]
public Request<T> transform(Func1<Request<T>, Observable<T>> transformation) {
Request<T> self = this;
return new Request<T>() {
@Override public Object key() { return self.key; }
@Override public Observable<T> asObservable() { return transformation.call(self); }
}
}
}
I then use the transform method to modify the response observable (asObservable) in a context where I need the request key (like caching):
service.getItemList() // <- returns a Request<List<Item>>
.transform(r -> r.asObservable()
// The activity is the current Activity in Android
.compose(Operators.ensureThereIsAnAccount(activity))
// The cache comes last because we don't need auth for cached responses
.compose(cache.cacheTransformation(r.key())))
.asObservable()
[... your common RxJava code ...]
Now, it would be pretty convenient if my Request class was an Observable subclass, since I could then eliminate all the .asObservable() calls and clients wouldn't even need to know about my Request class.
RxJava provides many methods in its library to create an Observable. Choosing which one to use can be difficult. My goal from this article is to help you in making this choice simpler by providing you with a mental map of different scenarios and which methods to use in each scenario.
Observables are declarative —that is, you define a function for publishing values, but it is not executed until a consumer subscribes to it. The subscribed consumer then receives notifications until the function completes, or until they unsubscribe.
onNext(): This method is called when a new item is emitted from the Observable. onError(): This method is called when an error occurs and the emission of data is not successfully completed. onComplete(): This method is called when the Observable has successfully completed emitting all items.
It is possible to subclass Observable (we do this for Subject
s and ConnectableObservable
s), but it requires extra consideration, because you need to pass in an OnSubscribe
callback to handle your incoming Subscriber
s. It is not clear for me what your Request should do in case someone subscribes to it, so I'll give you two examples of extending an Observable:
If you don't have mutable state to be shared between subscribers, you can just extend Observable and pass in your action to to super
public final class MyObservable extends Observable<Long> {
public MyObservable() {
super(new OnSubscribe<Long>() {
@Override public void call(Subscriber<? super Long> child) {
child.onNext(System.currentTimeMillis());
child.onCompleted();
}
});
}
}
This one is usually trickier because you need to access a shared state from both the OnSubscribe
method and your Observable's methods, but Java won't let you touch instance fields from the OnSubscribe
inner class before super
has completed. The solution is to factor out such a shared state and the OnSubscribe
from the constructor and use a static factory method to set up both:
public final class MySharedObservable extends Observable<Long> {
public static MySharedObservable create() {
final AtomicLong counter = new AtomicLong();
OnSubscribe<Long> onSubscribe = new OnSubscribe<Long>() {
@Override
public void call(Subscriber<? super Long> t1) {
t1.onNext(counter.incrementAndGet());
t1.onCompleted();
}
};
return new MySharedObservable(onSubscribe, counter);
}
private AtomicLong counter;
private MySharedObservable(OnSubscribe<Long> onSubscribe, AtomicLong counter) {
super(onSubscribe);
this.counter = counter;
}
public long getCounter() {
return counter.get();
}
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With