My use case is: I get a list of permalinks, and need to issue two REST requests per permalink to get their data in parts. When both requests are back, I want to merge their info together and do something with it (here - print it out). I want to do it with code using the zip
operator. Here is my current code (together with mocks for the library I'm using):
public class Main {
public static void main(String[] args) {
ContentManager cm = new ContentManager();
Observable
.from(cm.getPermalinks(10))
.flatMap(permalink -> Observable.zip(
Observable.<Content>create(subscriber -> cm.getDataByPermalink(permalink, new SubscribingRestCallback(subscriber))),
Observable.<Content>create(subscriber -> cm.getStreamByPermalink(permalink, new SubscribingRestCallback(subscriber))),
(dataContent, streamUrlContent) -> {
if (dataContent == null || streamUrlContent == null) {
System.err.println("not zipping " + dataContent + " and " + streamUrlContent);
return Observable.empty();
}
return new Content(dataContent.permalink, dataContent.logoUrl, streamUrlContent.streamUrl);
}))
.subscribe(System.out::println);
}
}
class SubscribingRestCallback implements RestCallback {
private final Subscriber<? super Content> subscriber;
public SubscribingRestCallback(Subscriber<? super Content> subscriber) {
this.subscriber = subscriber;
}
@Override
public void onSuccess(Content content) {
subscriber.onNext(content);
subscriber.onCompleted();
}
@Override
public void onFailure(int code, String message) {
System.err.println(message);
subscriber.onNext(null);
subscriber.onCompleted();
}
}
public class Content {
public final String permalink;
public final String logoUrl;
public final String streamUrl;
public Content(String permalink, String logoUrl, String streamUrl) {
this.permalink = permalink;
this.logoUrl = logoUrl;
this.streamUrl = streamUrl;
}
@Override
public String toString() {
return String.format("Content [%s, %s, %s]", permalink, logoUrl, streamUrl);
}
}
public interface RestCallback {
void onSuccess(Content content);
void onFailure(int code, String message);
}
class ContentManager {
private final Random random = new Random();
public List<String> getPermalinks(int n) {
List<String> permalinks = new ArrayList<>(n);
for (int i = 1; i <= n; ++i) {
permalinks.add("perma_" + i);
}
return permalinks;
}
public void getDataByPermalink(String permalink, RestCallback callback) {
getByPermalink(permalink, callback, false);
}
public void getStreamByPermalink(String permalink, RestCallback callback) {
getByPermalink(permalink, callback, true);
}
private void getByPermalink(String permalink, RestCallback callback, boolean stream) {
// simulate network latency and unordered results
new Thread(() -> {
try {
Thread.sleep(random.nextInt(1000) + 200);
} catch (InterruptedException e) {
e.printStackTrace();
}
if (random.nextInt(100) < 95) {
String logoUrl;
String streamUrl;
if (stream) {
logoUrl = null;
streamUrl = "http://" + permalink + "/stream";
} else {
logoUrl = "http://" + permalink + "/logo.png";
streamUrl = null;
}
callback.onSuccess(new Content(permalink, logoUrl, streamUrl));
} else {
callback.onFailure(-1, permalink + " data failure");
}
}).start();
}
}
In general, it works, but I don't like the error handling in this implementation. Basically, the REST requests may fail, in which case the onFailure
method calls subscriber.onNext(null)
so that the zip
method always has something to work with (one request may have failed, but the other one may have not, and I don't know which failed). Then, in the zip
function I need an if
which checks that both are not null
(my code will crash if any of the partial Content
s is null
).
I would like to be able to filter out the null
using the filter
operator somewhere, if possible. Or maybe there is a better way than emitting null
values for the failure case but so that it still works with the zip
function?
First of all, the right way to notify a Subscriber
about an error is to call subscriber.onError
method:
class SubscribingRestCallback implements RestCallback {
private final Subscriber<? super Content> subscriber;
public SubscribingRestCallback(Subscriber<? super Content> subscriber) {
this.subscriber = subscriber;
}
@Override
public void onSuccess(Content content) {
subscriber.onNext(content);
subscriber.onCompleted();
}
@Override
public void onFailure(int code, String message) {
subscriber.onError(new Exception(message));
}
}
Even if you don't want the whole stream to fail, you still need to call a subscriber.onError()
method. There are some other ways to shallow the errors. One of them is an onErrorResumeNext
operator:
Observable
.from(cm.getPermalinks(10))
.flatMap(permalink -> Observable.zip(
Observable.<Content>create(subscriber -> cm.getDataByPermalink(permalink, new SubscribingRestCallback(subscriber))),
Observable.<Content>create(subscriber -> cm.getStreamByPermalink(permalink, new SubscribingRestCallback(subscriber))),
(dataContent, streamUrlContent) -> {
return new Content(dataContent.permalink, dataContent.logoUrl, streamUrlContent.streamUrl);
}).onErrorResumeNext(Observable.empty()))
.subscribe(System.out::println);
EDIT
I have one last question: if you notice my zipper functions, I return Observable.empty() if the two objects cannot be zipped, and once I return Content. This seems wrong. How should I handle such error conditions in the zipper function?
Yes, returning Observable.empty()
is totally wrong. Throwing an exception from zip
function seems like the best solution:
Observable
.from(cm.getPermalinks(10))
.flatMap(permalink -> Observable.zip(
Observable.<Content>create(subscriber -> cm.getDataByPermalink(permalink, new SubscribingRestCallback(subscriber))),
Observable.<Content>create(subscriber -> cm.getStreamByPermalink(permalink, new SubscribingRestCallback(subscriber))),
(dataContent, streamUrlContent) -> {
if (!isDataValid(dataContent, streamUrlContent)) {
throw new RuntimeException("Something went wrong.");
}
return new Content(dataContent.permalink, dataContent.logoUrl, streamUrlContent.streamUrl);
}).onErrorResumeNext(Observable.empty()))
.subscribe(System.out::println);
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With