Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Using flatMap in RxJava 2.x

I'm working with an API of my own and a i'm hoping to chain a few paginated results using RxJava. I use cursor based pagination. (imagine there are 50 users in this first request):

{
    "data":{
        "status":"ok",
        "total":988, //users total
        "has_next_page":true,
        "end_cursor":"AQAxd8QPGHum7LSDz8DnwIh7yHJDM22nEjd",
        "users":[{"id":"91273813",
                "username":"codergirl",
                "full_name":"Code Girl",
                "picture_url":"https://cdn.com/21603182_7904715668509949952_n.jpg",
                },
                ...
                ]
        }
}

Right now, I'm getting the first 50 results like this, using retrofit:

public class DataResponse {
    @SerializedName("end_cursor")
    private String end_cursor;

    @SerializedName("users")
    private JsonArray users;

    @SerializedName("has_next_page")
    private Boolean has_next_page;

    public boolean hasNextCursor(){
        return has_next_page;
    }
    public String endCursor(){
        if (hasNextCursor()){
            return end_cursor;
        }
        return "";
    }
    public JsonArray getUsers(){
        return users;
    }
}

then:

public interface MyService  {
    @GET( "/users")
    Observable<DataResponse> getUsers(
            @Query("cursor") String cursor,
    );
}

and

MyService service = RetrofitClient.getInstance();
service.getUsers()
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe( val->  showUsers(val.getUsers())); // getting the first 50 users

The next call should be to "/users?cursor=AQAxd8QPGHum7LSDz8DnwIh7yHJDM22nEjd"

I'd like to return all (in this case 988) users

like image 859
sandes Avatar asked Jun 12 '18 23:06

sandes


People also ask

What does FlatMap do in RxJava?

FlatMap transforms the items emitted by an Observable into Observables. So, the main difference between Map and FlatMap is that FlatMap mapper returns an observable itself, so it is used to map over asynchronous operations. Very important: FlatMap is used to map over asynchronous operations.

How do you use FlatMap in stream?

We can use a flatMap() method on a stream with the mapper function List::stream. On executing the stream terminal operation, each element of flatMap() provides a separate stream. In the final phase, the flatMap() method transforms all the streams into a new stream.

What is FlatMap in reactive?

The FlatMap operator transforms an Observable by applying a function that you specify to each item emitted by the source Observable, where that function returns an Observable that itself emits items. FlatMap then merges the emissions of these resulting Observables, emitting these merged results as its own sequence.

What is switchMap in RxJava?

the switchMap is like the flatMap , but it will only emit items from the new observable until a new event is emitted from the source observable. The marble diagram shows it well.


1 Answers

My solution

import io.reactivex.Observer;
import io.reactivex.Scheduler;
import io.reactivex.android.schedulers.AndroidSchedulers;
import io.reactivex.disposables.Disposable;
import io.reactivex.schedulers.Schedulers;

public void getAllUsers(){

    AtomicReference<String> cache = new AtomicReference<>();
    AtomicBoolean hasMore = new AtomicBoolean(true);

    io.reactivex.Observable.just(0)
        // getting the first 50 users
        .flatMap(users1-> service.getUsers( cache.get() ))
        
        // scheduler
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        
        // re-call variable
        .repeatUntil(() -> !hasMore.get())
        
        .subscribe(new Observer<DataResponse>() {
            @Override
            public void onSubscribe(Disposable d) { // on subscribe }

            @Override
            public void onNext(DataResponse response) {
                
                // saving boolean (If there are more users)
                hasMore.set(response.hasNextCursor());
                
                // saving next cursor
                cache.set(response.endCursor());
                
                // adding the new 50 users
                addToList(response.getUsers());
                
            }

            @Override
            public void onError(Throwable e) { /*error */ }

            @Override
            public void onComplete() { /*complete*/ }
        });

}
like image 185
sandes Avatar answered Oct 20 '22 03:10

sandes