Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Calling network services in parallel using RxJava. Is this the right way?

Idea is to make 3 network calls in parallel. (I am using Google as the servies for demo purpose. The following works but not sure if this is the right way or it can be simplified. What should I do if I have to combine the responses of all the three searches? Please advise.

public class GoogleSearchRx
{
    public static void main(String args[])
    {
        CountDownLatch latch = new CountDownLatch(3);

        search("RxJava").subscribeOn(Schedulers.io()).subscribe(
                links -> {
                    links.forEach(link -> out.println(currentThreadName() + "\t" + link.text()));
                    latch.countDown();
                },
                e -> {
                    out.println(currentThreadName() + "\t" + "Failed: " + e.getMessage());
                    latch.countDown();
                }
        );

        search("Reactive Extensions").subscribeOn(Schedulers.io()).subscribe(
                links -> {
                    links.forEach(link -> out.println(currentThreadName() + "\t" + link.text()));
                    latch.countDown();
                },
                e -> {
                    out.println(currentThreadName() + "\t" + "Failed: " + e.getMessage());
                    latch.countDown();
                }
        );

        //run the last one on current thread
        search("Erik Meijer").subscribe(
                links -> {
                    links.forEach(link -> out.println(currentThreadName() + "\t" + link.text()));
                    latch.countDown();
                },
                e -> {
                    out.println(currentThreadName() + "\t" + "Failed: " + e.getMessage());
                    latch.countDown();
                }
        );

        try
        {
            latch.await();
        }
        catch (InterruptedException e)
        {
            e.printStackTrace();
        }
    }

    public static Observable<Elements> search(String q)
    {
        String google = "http://www.google.com/search?q=";

        String charset = "UTF-8";
        String userAgent = "ExampleBot 1.0 (+http://example.com/bot)"; // Change this to your company's name and bot homepage!

        return Observable.create(new Observable.OnSubscribe<Elements>()
        {

            @Override public void call(Subscriber<? super Elements> subscriber)
            {
                out.println(currentThreadName() + "\tOnSubscribe.call");

                try
                {
                    Elements links = Jsoup.connect(google + URLEncoder.encode(q, charset)).timeout(1000).userAgent(userAgent).get().select("li.g>h3>a");
                    subscriber.onNext(links);
                }
                catch (IOException e)
                {
                    subscriber.onError(e);
                }
                subscriber.onCompleted();
            }
        });
    }
}
like image 275
Aravind Yarram Avatar asked Jan 18 '15 00:01

Aravind Yarram


1 Answers

Going by the "combine the responses of all the three searches" part of your question, you might be looking for Zip.

Observable<Elements> search1 = search("RxJava");
Observable<Elements> search2 = search("Reactive Extensions");
Observable<Elements> search3 = search("Eric Meijer");
Observable.zip(searc1, search2, search3,
            new Func3<Elements, Elements, Elements, Elements>() {
                @Override
                public Elements call(Elements result1, Elements result2, Elements result3) {
                    // Add all the results together...
                    return results;
                }
            }
    ).subscribeOn(Schedulers.io()).subscribe(
            links -> {
                links.forEach(link -> out.println(currentThreadName() + "\t" + link.text()));
                latch.countDown();
            },
            e -> {
                out.println(currentThreadName() + "\t" + "Failed: " + e.getMessage());
                latch.countDown();
            }
    );

This assumes you want to deal with all the results at the same time (in the subscriber, here) and not care about which query was used for the given result.

Note there's different versions of the zip function, taking from 1..N observables, and Func1 to Func9 or FuncN, allowing you to zip a specific or arbitrarily large number of observables.

like image 145
Adam S Avatar answered Sep 28 '22 06:09

Adam S