I want to run 2 asynchronous tasks, one followed by the other (sequentially). I have read something about ZIP or Flat, but I didn't understand it very well...
My purpose is to load the data from a Local SQLite, and when it finishes, it calls the query to the server (remote).
Can someone suggests me, a way to achieve that?
This is the RxJava Observable skeleton that I am using (single task):
// RxJava Observable
Observable.OnSubscribe<Object> onSubscribe = subscriber -> {
try {
// Do the query or long task...
subscriber.onNext(object);
subscriber.onCompleted();
} catch (Exception e) {
subscriber.onError(e);
}
};
// RxJava Observer
Subscriber<Object> subscriber = new Subscriber<Object>() {
@Override
public void onCompleted() {
// Handle the completion
}
@Override
public void onError(Throwable e) {
// Handle the error
}
@Override
public void onNext(Object result) {
// Handle the result
}
};
Observable.create(onSubscribe)
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(subscriber);
RxJava — Multi-Threading in Android helps to understand the basics of Rx, everything about Observable s, Observer s, Scheduler s, etc. So, hoping that you already know about basics of RxJava lets start by discussing Observable. What is an Observable? In RxJava, Observable s are the source that emits data to the Observers.
What is an Observable? In RxJava, Observable s are the source that emits data to the Observers. We can understand observable s as suppliers — they process and supply data to other components. It does some work and emits some values. The following are the different types of Observable s in RxJava
We can understand RxJava as data emitted by one component, called Observable, and the underlying structure provided by the Rx libraries will propagate changes to another component, Observer. Simply put, it’s an API for asynchronous programming with observable streams.
This article is part of RxJava Introduction series. You can checkout the entire series here: This operator is used when an item is emitted by either of two Observables, and the latest item emitted by each Observable is combined via a specified function and the resulting items are emitted based on the results of this function.
The operator to do that would be merge
, see http://reactivex.io/documentation/operators/merge.html.
My approach would be to create two observables, let's say observableLocal
and observableRemote
, and merge the output:
Observable<Object> observableLocal = Observable.create(...)
Observable<Object> observableRemote = Observable.create(...)
Observable.merge(observableLocal, observableRemote)
.subscribe(subscriber)
If you want to make sure that remote is run after local, you can use concat
.
Lukas Batteau's answer is best if the queries are not dependent on one another. However, if it is necessary for you obtain the data from the local SQLite query before you run the remote query (for example you need the data for the remote query params or headers) then you can start with the local observable and then flatmap it to combine the two observables after you obtain the data from the local query:
Observable<Object> localObservable = Observable.create(...)
localObservable.flatMap(object ->
{
return Observable.zip(Observable.just(object), *create remote observable here*,
(localObservable, remoteObservable) ->
{
*combining function*
});
}).subscribe(subscriber);
The flatmap function allows you to transform the local observable into a combination of the local & remote observables via the zip function. And to reiterate, the advantage here is that the two observables are sequential, and the zip function will only run after both dependent observables run.
Furthermore, the zip function will allow you to combine observables even if the underlying objects have different types. In that case, you provide a combining function as the 3rd parameter. If the underlying data is the same type, replace the zip function with a merge.
You can try my solutions, there are several ways to resolve your problem.
To make sure it's working, I created a stand alone working example and use this API to test: https://jsonplaceholder.typicode.com/posts/1
private final Retrofit retrofit = new Retrofit.Builder()
.baseUrl("https://jsonplaceholder.typicode.com/posts/")
.addConverterFactory(GsonConverterFactory.create())
.addCallAdapterFactory(RxJavaCallAdapterFactory.create())
.build();
private final RestPostsService restPostsService = retrofit.create(RestPostsService.class);
private Observable<Posts> getPostById(int id) {
return restPostsService.getPostsById(id);
}
RestPostService.java
package app.com.rxretrofit;
import retrofit2.http.GET;
import retrofit2.http.Path;
import rx.Observable;
/**
* -> Created by Think-Twice-Code-Once on 11/26/2017.
*/
public interface RestPostsService {
@GET("{id}")
Observable<Posts> getPostsById(@Path("id") int id);
}
Solution1: Use when call multiple tasks in sequences, the result of previous tasks is always the input of the next task
getPostById(1)
.concatMap(posts1 -> {
//get post 1 success
return getPostById(posts1.getId() + 1);
})
.concatMap(posts2 -> {
//get post 2 success
return getPostById(posts2.getId() + 1);
})
.concatMap(posts3 -> {
//get post 3success
return getPostById(posts3.getId() + 1);
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(finalPosts -> {
//get post 4 success
Toast.makeText(this, "Final result: " + finalPosts.getId() + " - " + finalPosts.getTitle(),
Toast.LENGTH_LONG).show();
});
Solution2: Use when call multiple tasks in sequences, all results of previous tasks is the input of the final task (for example: after uploading avatar image and cover image, call api to create new user with these image URLs):
Observable
.zip(getPostById(1), getPostById(2), getPostById(3), (posts1, posts2, posts3) -> {
//this method defines how to zip all separate results into one
return posts1.getId() + posts2.getId() + posts3.getId();
})
.flatMap(finalPostId -> {
//after get all first three posts, get the final posts,
// the final posts-id is sum of these posts-id
return getPostById(finalPostId);
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(finalPosts -> {
Toast.makeText(this, "Final posts: " + finalPosts.getId() + " - " + finalPosts.getTitle(),
Toast.LENGTH_SHORT).show();
});
AndroidManifest
<uses-permission android:name="android.permission.INTERNET"/>
root build.gradle
// Top-level build file where you can add configuration options common to all sub-projects/modules.
buildscript {
repositories {
jcenter()
}
dependencies {
classpath 'com.android.tools.build:gradle:2.3.3'
classpath 'me.tatarka:gradle-retrolambda:3.2.0'
classpath 'me.tatarka.retrolambda.projectlombok:lombok.ast:0.2.3.a2'
// NOTE: Do not place your application dependencies here; they belong
// in the individual module build.gradle files
}
// Exclude the version that the android plugin depends on.
configurations.classpath.exclude group: 'com.android.tools.external.lombok'
}
allprojects {
repositories {
jcenter()
}
}
task clean(type: Delete) {
delete rootProject.buildDir
}
app/build.gradle
apply plugin: 'me.tatarka.retrolambda'
apply plugin: 'com.android.application'
android {
compileSdkVersion 26
buildToolsVersion "26.0.1"
defaultConfig {
applicationId "app.com.rxretrofit"
minSdkVersion 15
targetSdkVersion 26
versionCode 1
versionName "1.0"
testInstrumentationRunner "android.support.test.runner.AndroidJUnitRunner"
}
buildTypes {
release {
minifyEnabled false
proguardFiles getDefaultProguardFile('proguard-android.txt'), 'proguard-rules.pro'
}
}
compileOptions {
sourceCompatibility JavaVersion.VERSION_1_8
targetCompatibility JavaVersion.VERSION_1_8
}
}
dependencies {
compile fileTree(dir: 'libs', include: ['*.jar'])
androidTestCompile('com.android.support.test.espresso:espresso-core:2.2.2', {
exclude group: 'com.android.support', module: 'support-annotations'
})
compile 'com.android.support:appcompat-v7:26.+'
compile 'com.android.support.constraint:constraint-layout:1.0.2'
testCompile 'junit:junit:4.12'
provided 'org.projectlombok:lombok:1.16.6'
compile 'com.squareup.retrofit2:retrofit:2.3.0'
compile 'com.squareup.retrofit2:converter-gson:2.3.0'
compile 'com.squareup.retrofit2:adapter-rxjava:2.3.0'
compile 'io.reactivex:rxandroid:1.2.1'
}
model
package app.com.rxretrofit;
import com.google.gson.annotations.SerializedName;
/**
* -> Created by Think-Twice-Code-Once on 11/26/2017.
*/
public class Posts {
@SerializedName("userId")
private int userId;
@SerializedName("id")
private int id;
@SerializedName("title")
private String title;
@SerializedName("body")
private String body;
public int getUserId() {
return userId;
}
public void setUserId(int userId) {
this.userId = userId;
}
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getTitle() {
return title;
}
public void setTitle(String title) {
this.title = title;
}
public String getBody() {
return body;
}
public void setBody(String body) {
this.body = body;
}
}
By the way, use Rx + Retrofit + Dagger + MVP pattern is a great combine.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With