Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Error handling for zipped observables

Tags:

rx-java

My use case is: I get a list of permalinks, and need to issue two REST requests per permalink to get their data in parts. When both requests are back, I want to merge their info together and do something with it (here - print it out). I want to do it with code using the zip operator. Here is my current code (together with mocks for the library I'm using):

public class Main {

    public static void main(String[] args) {
        ContentManager cm = new ContentManager();

        Observable
                .from(cm.getPermalinks(10))
                .flatMap(permalink -> Observable.zip(
                        Observable.<Content>create(subscriber -> cm.getDataByPermalink(permalink, new SubscribingRestCallback(subscriber))),
                        Observable.<Content>create(subscriber -> cm.getStreamByPermalink(permalink, new SubscribingRestCallback(subscriber))),
                        (dataContent, streamUrlContent) -> {
                            if (dataContent == null || streamUrlContent == null) {
                                System.err.println("not zipping " + dataContent + " and " + streamUrlContent);
                                return Observable.empty();
                            }

                            return new Content(dataContent.permalink, dataContent.logoUrl, streamUrlContent.streamUrl);
                        }))
                .subscribe(System.out::println);
    }
}

class SubscribingRestCallback implements RestCallback {

    private final Subscriber<? super Content> subscriber;

    public SubscribingRestCallback(Subscriber<? super Content> subscriber) {
        this.subscriber = subscriber;
    }

    @Override
    public void onSuccess(Content content) {
        subscriber.onNext(content);
        subscriber.onCompleted();
    }

    @Override
    public void onFailure(int code, String message) {
        System.err.println(message);
        subscriber.onNext(null);
        subscriber.onCompleted();
    }
}

public class Content {

    public final String permalink;

    public final String logoUrl;

    public final String streamUrl;

    public Content(String permalink, String logoUrl, String streamUrl) {
        this.permalink = permalink;
        this.logoUrl = logoUrl;
        this.streamUrl = streamUrl;
    }

    @Override
    public String toString() {
        return String.format("Content [%s, %s, %s]", permalink, logoUrl, streamUrl);
    }
}

public interface RestCallback {

    void onSuccess(Content content);

    void onFailure(int code, String message);
}

class ContentManager {

    private final Random random = new Random();

    public List<String> getPermalinks(int n) {
        List<String> permalinks = new ArrayList<>(n);
        for (int i = 1; i <= n; ++i) {
            permalinks.add("perma_" + i);
        }

        return permalinks;
    }

    public void getDataByPermalink(String permalink, RestCallback callback) {
        getByPermalink(permalink, callback, false);
    }

    public void getStreamByPermalink(String permalink, RestCallback callback) {
        getByPermalink(permalink, callback, true);
    }

    private void getByPermalink(String permalink, RestCallback callback, boolean stream) {
        // simulate network latency and unordered results
        new Thread(() -> {
            try {
                Thread.sleep(random.nextInt(1000) + 200);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            if (random.nextInt(100) < 95) {
                String logoUrl;
                String streamUrl;
                if (stream) {
                    logoUrl = null;
                    streamUrl = "http://" + permalink + "/stream";
                } else {
                    logoUrl = "http://" + permalink + "/logo.png";
                    streamUrl = null;
                }
                callback.onSuccess(new Content(permalink, logoUrl, streamUrl));
            } else {
                callback.onFailure(-1, permalink + " data failure");
            }
        }).start();
    }
}

In general, it works, but I don't like the error handling in this implementation. Basically, the REST requests may fail, in which case the onFailure method calls subscriber.onNext(null) so that the zip method always has something to work with (one request may have failed, but the other one may have not, and I don't know which failed). Then, in the zip function I need an if which checks that both are not null (my code will crash if any of the partial Contents is null).

I would like to be able to filter out the null using the filter operator somewhere, if possible. Or maybe there is a better way than emitting null values for the failure case but so that it still works with the zip function?

like image 775
wujek Avatar asked Jun 13 '15 19:06

wujek


1 Answers

First of all, the right way to notify a Subscriber about an error is to call subscriber.onError method:

class SubscribingRestCallback implements RestCallback {
    private final Subscriber<? super Content> subscriber;

    public SubscribingRestCallback(Subscriber<? super Content> subscriber) {
        this.subscriber = subscriber;
    }

    @Override
    public void onSuccess(Content content) {
        subscriber.onNext(content);
        subscriber.onCompleted();
    }

    @Override
    public void onFailure(int code, String message) {
        subscriber.onError(new Exception(message));
    }
}

Even if you don't want the whole stream to fail, you still need to call a subscriber.onError() method. There are some other ways to shallow the errors. One of them is an onErrorResumeNext operator:

Observable
        .from(cm.getPermalinks(10))
        .flatMap(permalink -> Observable.zip(
                Observable.<Content>create(subscriber -> cm.getDataByPermalink(permalink, new SubscribingRestCallback(subscriber))),
                Observable.<Content>create(subscriber -> cm.getStreamByPermalink(permalink, new SubscribingRestCallback(subscriber))),
                (dataContent, streamUrlContent) -> {
                    return new Content(dataContent.permalink, dataContent.logoUrl, streamUrlContent.streamUrl);
                }).onErrorResumeNext(Observable.empty()))
        .subscribe(System.out::println);

EDIT

I have one last question: if you notice my zipper functions, I return Observable.empty() if the two objects cannot be zipped, and once I return Content. This seems wrong. How should I handle such error conditions in the zipper function?

Yes, returning Observable.empty() is totally wrong. Throwing an exception from zip function seems like the best solution:

Observable
        .from(cm.getPermalinks(10))
        .flatMap(permalink -> Observable.zip(
                Observable.<Content>create(subscriber -> cm.getDataByPermalink(permalink, new SubscribingRestCallback(subscriber))),
                Observable.<Content>create(subscriber -> cm.getStreamByPermalink(permalink, new SubscribingRestCallback(subscriber))),
                (dataContent, streamUrlContent) -> {
                    if (!isDataValid(dataContent, streamUrlContent)) {
                        throw new RuntimeException("Something went wrong.");
                    }
                    return new Content(dataContent.permalink, dataContent.logoUrl, streamUrlContent.streamUrl);
                }).onErrorResumeNext(Observable.empty()))
        .subscribe(System.out::println);
like image 114
Vladimir Mironov Avatar answered Oct 21 '22 09:10

Vladimir Mironov