Idea is to make 3 network calls in parallel. (I am using Google as the servies for demo purpose. The following works but not sure if this is the right way or it can be simplified. What should I do if I have to combine the responses of all the three searches? Please advise.
public class GoogleSearchRx
{
public static void main(String args[])
{
CountDownLatch latch = new CountDownLatch(3);
search("RxJava").subscribeOn(Schedulers.io()).subscribe(
links -> {
links.forEach(link -> out.println(currentThreadName() + "\t" + link.text()));
latch.countDown();
},
e -> {
out.println(currentThreadName() + "\t" + "Failed: " + e.getMessage());
latch.countDown();
}
);
search("Reactive Extensions").subscribeOn(Schedulers.io()).subscribe(
links -> {
links.forEach(link -> out.println(currentThreadName() + "\t" + link.text()));
latch.countDown();
},
e -> {
out.println(currentThreadName() + "\t" + "Failed: " + e.getMessage());
latch.countDown();
}
);
//run the last one on current thread
search("Erik Meijer").subscribe(
links -> {
links.forEach(link -> out.println(currentThreadName() + "\t" + link.text()));
latch.countDown();
},
e -> {
out.println(currentThreadName() + "\t" + "Failed: " + e.getMessage());
latch.countDown();
}
);
try
{
latch.await();
}
catch (InterruptedException e)
{
e.printStackTrace();
}
}
public static Observable<Elements> search(String q)
{
String google = "http://www.google.com/search?q=";
String charset = "UTF-8";
String userAgent = "ExampleBot 1.0 (+http://example.com/bot)"; // Change this to your company's name and bot homepage!
return Observable.create(new Observable.OnSubscribe<Elements>()
{
@Override public void call(Subscriber<? super Elements> subscriber)
{
out.println(currentThreadName() + "\tOnSubscribe.call");
try
{
Elements links = Jsoup.connect(google + URLEncoder.encode(q, charset)).timeout(1000).userAgent(userAgent).get().select("li.g>h3>a");
subscriber.onNext(links);
}
catch (IOException e)
{
subscriber.onError(e);
}
subscriber.onCompleted();
}
});
}
}
Going by the "combine the responses of all the three searches" part of your question, you might be looking for Zip.
Observable<Elements> search1 = search("RxJava");
Observable<Elements> search2 = search("Reactive Extensions");
Observable<Elements> search3 = search("Eric Meijer");
Observable.zip(searc1, search2, search3,
new Func3<Elements, Elements, Elements, Elements>() {
@Override
public Elements call(Elements result1, Elements result2, Elements result3) {
// Add all the results together...
return results;
}
}
).subscribeOn(Schedulers.io()).subscribe(
links -> {
links.forEach(link -> out.println(currentThreadName() + "\t" + link.text()));
latch.countDown();
},
e -> {
out.println(currentThreadName() + "\t" + "Failed: " + e.getMessage());
latch.countDown();
}
);
This assumes you want to deal with all the results at the same time (in the subscriber, here) and not care about which query was used for the given result.
Note there's different versions of the zip
function, taking from 1..N observables, and Func1
to Func9
or FuncN
, allowing you to zip a specific or arbitrarily large number of observables.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With