Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How can I intercept an observable object and modify it in RxJava before returning to Subscriber?

I'm currently trying to hit a service and get returned a list of objects, before it gets returned to the subscriber, I want to make another synchronous call for each object in the list to make another service call to set a missing field. I'm successfully having all calls being made, but the object returned in the subscriber has this field I need to have set to null. Here is an example of my code:

Example Service:

rx.Observable<List<ExampleObject>> getExampleObject();
rx.Observable<MissingObject> getMissingObjectByFoo(@Path("foo") String foo);

Example Class:

public class ExampleObject {
    String foo;
    MissingObject bar;

    public String getFoo() {
        return this.foo;
    }

    public void setFoo(String value) {
        this.foo = value;
    }

    public MissingObject getBar() {
        return this.bar;
    }

    public void setBar(MissingObject value) {
        this.bar = value;
    }
}

Example Implementation:

mService.getExampleObject().flatMap(new Func1<List<ExampleObject>, Observable<?>>() {
            @Override
            public Observable<List<ExampleObject>> call(List<ExampleObject> exampleObjects) {
                for (ExampleObject entry : exampleObjects) {
                    String foo = entry.getFoo();
                    mService.getMissingObjectByFoo(foo)
                            .subscribeOn(mScheduler.backgroundThread())
                            .observeOn(mScheduler.mainThread())
                            .subscribe(new Subscriber<MissingObject>() {
                        @Override
                        public void onCompleted() {

                        }

                        @Override
                        public void onError(Throwable e) {

                        }

                        @Override
                        public void onNext(MissingObject missingObject) {
                            entry.setBar(missingObject);
                        }
                    });
                }
                return Observable.just(exampleObjects);
            };
like image 419
Bill Zangardi Avatar asked Nov 17 '16 19:11

Bill Zangardi


2 Answers

Because your intermediary call to update the entry is asynchronous, I don't think you can stick to using a List<ExampleObject>, but should instead manipulate ExampleObject directly from the Observable:

mService.getExampleObject()
    // Spread the list
    .flatMap(list -> Observable.from(list))
    // Update your object
    // Here we zip the object with the missing object,
    // so that when the missing object is obtained,
    // we update the entry and emit it.
    .flatMap(entry -> Observable.zip(
           Observable.just(entry), 
           mDocsService.getMissingObjectByFoo(entry.getFoo()), 
           (entry, missingObject) -> {
               entry.setBar(missingObject);
               return entry;
           })
    )
    // if you really want a map after all
    .toList();

Side note:

You can skip the zip if you are fine with having the function in the map depend on an external variable (the entry). That's something I try to avoid, but here it is anyway:

    .flatMap(entry -> mDocsService.getMissingObjectByFoo(entry.getFoo())
                                  .map(missingObject -> {
                                      entry.setBar(missingObject);
                                      return entry;
                                   })
    )
like image 73
njzk2 Avatar answered Oct 26 '22 02:10

njzk2


You're looking for the zip operator, as described here: Zip Operator. I think you want to flatmap to a zip of all of your calls, so, something like this:

    mService.getExampleObject().flatMap(new Func1<List<ExampleObject>, Observable<ExampleObject>>() {
        @Override
        public Observable<List<ExampleObject>> call(List<ExampleObject> exampleObjects) {
            List<Observable<ExampleObject>> allTheObservables = new ArrayList<Observable<ExampleObject>>();
            for (ExampleObject entry : exampleObjects) {
                allTheObservables.add(mService.getMissingObjectByFoo(foo).map(new Func1<MissingObject, ExampleObject>() {
                    @Override
                    public ExampleObject call(MissingObject missingObject) {
                        return entry.setBar(missingObject);
                    }
                }));
            }
            return Observable.zip(allTheObservables, new FuncN<ExampleObject>() {
                @Override
                public ExampleObject call(ExampleObject... args) {
                    return Arrays.asList(args);
                }
            });
        }
    });

and in case that doesn't work, or there are syntax issues, here's a concrete example, using the github api:

    service.getContributorsObservable("square", "dagger")
        .flatMap(new Func1<List<Contributor>, Observable<List<String>>>() {
            @Override
            public Observable<List<String>> call(List<Contributor> contributors) {
                List<Observable<String>> allTheObservables = new ArrayList<>(contributors.size());
                for (final Contributor contributor : contributors) {
                    allTheObservables.add(service.getContributorsObservable(contributor.login).map(new Func1<User, String>() {

                        @Override
                        public String call(User user) {
                            return contributor.login + " is " + user.name;
                        }
                    }));
                }
                return Observable.zip(allTheObservables, new FuncN<List<String>>() {
                    @Override
                    public List<String> call(Object... args) {
                        return Arrays.asList((String[]) args);
                    }
                });
            }
        });

Keep in mind that this will make n+1 network calls, 1 for the list of ExampleObjects, and then 1 per ExampleObject in that list. If it is at all possible, I strongly suggest that you speak with the maintainer of the API to get the information lookup taken care of on the API side. Just know that this is going to use some bandwidth!

like image 29
Travis Avatar answered Oct 26 '22 02:10

Travis