Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Recommended way to execute async tasks with RXJava

I'm new to RxJava and I'm trying to understand the best/recommended way to perform long running tasks asynchronously (e.g. network requests). I've read through a lot of examples online but would appreciate some feedback.

The following code works (it prints 'one', 'two', then 'User: x' ... etc) but should I really be creating/managing Threads manually?

Thanks in advance!

public void start() throws Exception {
    System.out.println("one");
    observeUsers()
        .flatMap(users -> Observable.from(users))
        .subscribe(user -> System.out.println(String.format("User: %s", user.toString()));
    System.out.println("two");
}

Observable<List<User>> observeUsers() {
    return Observable.<List<User>>create(s -> {
        Thread thread = new Thread(() -> getUsers(s));
        thread.start();
    });
}

void getUsers(final Subscriber s) {
    s.onNext(userService.getUsers());
    s.onCompleted();
}

// userService.getUsers() fetches users from a web service.
like image 853
arrwhidev Avatar asked Jul 23 '15 17:07

arrwhidev


1 Answers

Instead of managing your own thread try using the defer() operator. Meaning replace observeUsers() with Observable.defer(() -> Observable.just(userService.getUsers())). Then you can use the RxJava Schedulers to control what threads are used during subscription and observation. Here's your code modified with the above suggestions.

Observable.defer(() -> Observable.just(userService.getUsers()))
        .flatMap(users -> Observable.from(users))
        .subscribeOn(Schedulers.newThread())
        .observeOn(Schedulers.trampoline())
        .subscribe(user -> System.out.println(String.format("User: %s", user.toString()));
like image 64
kjones Avatar answered Nov 09 '22 10:11

kjones