Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to subclass Observable in RxJava?

Tags:

rx-java

I know you should avoid that at all costs, but what if I have a valid use case for a subclassed Observable in RxJava? Is it possible? How could I do it?

In this specific case, I have a "repository" class which currently returns Requests:

class Request<T> {
    public abstract Object key();
    public abstract Observable<T> asObservable();

    [...]

    public Request<T> transform(Func1<Request<T>, Observable<T>> transformation) {
        Request<T> self = this;
        return new Request<T>() {
             @Override public Object key() { return self.key; }
             @Override public Observable<T> asObservable() { return transformation.call(self); }
        }
    }
}

I then use the transform method to modify the response observable (asObservable) in a context where I need the request key (like caching):

 service.getItemList() // <- returns a Request<List<Item>>
     .transform(r -> r.asObservable()
             // The activity is the current Activity in Android
             .compose(Operators.ensureThereIsAnAccount(activity))
             // The cache comes last because we don't need auth for cached responses
             .compose(cache.cacheTransformation(r.key())))
     .asObservable()
     [...  your common RxJava code ...]

Now, it would be pretty convenient if my Request class was an Observable subclass, since I could then eliminate all the .asObservable() calls and clients wouldn't even need to know about my Request class.

like image 530
dirleyrls Avatar asked Mar 08 '15 02:03

dirleyrls


People also ask

Can we create our own Observable in RxJava?

RxJava provides many methods in its library to create an Observable. Choosing which one to use can be difficult. My goal from this article is to help you in making this choice simpler by providing you with a mental map of different scenarios and which methods to use in each scenario.

What happens when we subscribe a Observable?

Observables are declarative —that is, you define a function for publishing values, but it is not executed until a consumer subscribes to it. The subscribed consumer then receives notifications until the function completes, or until they unsubscribe.

What is onNext in RxJava?

onNext(): This method is called when a new item is emitted from the Observable. onError(): This method is called when an error occurs and the emission of data is not successfully completed. onComplete(): This method is called when the Observable has successfully completed emitting all items.


1 Answers

It is possible to subclass Observable (we do this for Subjects and ConnectableObservables), but it requires extra consideration, because you need to pass in an OnSubscribe callback to handle your incoming Subscribers. It is not clear for me what your Request should do in case someone subscribes to it, so I'll give you two examples of extending an Observable:

Observable without shared mutable state

If you don't have mutable state to be shared between subscribers, you can just extend Observable and pass in your action to to super

public final class MyObservable extends Observable<Long> {
    public MyObservable() {
        super(new OnSubscribe<Long>() {
            @Override public void call(Subscriber<? super Long> child) {
                child.onNext(System.currentTimeMillis());
                child.onCompleted();
            }
        });
    }
}

Observable with shared mutable state

This one is usually trickier because you need to access a shared state from both the OnSubscribe method and your Observable's methods, but Java won't let you touch instance fields from the OnSubscribe inner class before super has completed. The solution is to factor out such a shared state and the OnSubscribe from the constructor and use a static factory method to set up both:

public final class MySharedObservable extends Observable<Long> {
    public static MySharedObservable create() {
        final AtomicLong counter = new AtomicLong();
        OnSubscribe<Long> onSubscribe = new OnSubscribe<Long>() {
            @Override
            public void call(Subscriber<? super Long> t1) {
                t1.onNext(counter.incrementAndGet());
                t1.onCompleted();
            }
        };
        return new MySharedObservable(onSubscribe, counter);
    }
    private AtomicLong counter;

    private MySharedObservable(OnSubscribe<Long> onSubscribe, AtomicLong counter) {
        super(onSubscribe);
        this.counter = counter;
    }
    public long getCounter() {
        return counter.get();
    }
}
like image 91
akarnokd Avatar answered Sep 29 '22 13:09

akarnokd