Hi I just start learning Reactive programming using RxJava2. How do I create a task that runs in the background thread and then complete on main thread using RxJava2.
Example in Android we use AsyncTask just like example below
private class MyTask extends AsyncTask<String, Integer, Boolean>
{
@Override
protected Boolean doInBackground(String... paths)
{
for (int index = 0; index < paths.length; index++)
{
boolean result = copyFileToExternal(paths[index]);
if (result == true)
{
// update UI
publishProgress(index);
}
else
{
// stop the background process
return false;
}
}
return true;
}
@Override
protected void onProgressUpdate(Integer... values)
{
super.onProgressUpdate(values);
int count = values[0];
// this will update my textview to show the number of files copied
myTextView.setText("Total files: " + count);
}
@Override
protected void onPostExecute(Boolean result)
{
super.onPostExecute(result);
if (result)
{
// display a success dialog
ShowSuccessAlertDialog();
}
else
{
// display a fail dialog
ShowFailAlertDialog();
}
}
}
For this example I want to pass in a Array / ArrayList of Strings and it is use to execute some method in the background thread. Then every success result will update my TextView (UI thread). If one of the process fail, I want it to stop directly. Lastly I want to update my Views when the process has completed.
I only manage to get this far
Observable.just(paths).subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<ArrayList<String>>()
{
private boolean result;
@Override
public void onSubscribe(Disposable d)
{
}
@Override
public void onNext(ArrayList<String> paths)
{
for (int index = 0; index < paths.size(); index++)
{
result = copyFileToExternal(paths[index]);
if (result == true)
{
// cant update UI because is in background thread
myTextView.setText("Total files: " + index);
}
else
{
// end the loop
break;
}
}
}
@Override
public void onError(Throwable e)
{
}
@Override
public void onComplete()
{
if (result)
{
// cant display because it is still in background thread
ShowSuccessAlertDialog();
}
else
{
// cant display because it is still in background thread
ShowFailAlertDialog();
}
}
});
I looked at a few tutorials but can't seem to find the answer.
Thanks in advance for the help
I would do something like this:
Observable.fromArray(getPaths())
.map(path -> copyFileToExternal(path))
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(aInteger -> Log.i("test", "update UI"),
throwable -> ShowFailAlertDialog),
() -> ShowSuccessAlertDialog());
A good idea is usually to have a "handler" for controlling the subscription to your observer. So that, when you need to stop your background task (for example because the user left the Activity), you can use it. For this purpose you can use subscribeWith
instead of subscribe
, that receive as input a ResourceObserver
: in this way you get a Disposable
.
Disposable subscription = Observable.fromArray(getPaths())
.map(path -> copyFileToExternal(path))
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribeWith(new ResourceObserver<Integer>() {
@Override
public void onNext(@NonNull Integer index) {
Log.i("test", "update UI");
}
@Override
public void onError(@NonNull Throwable e) {
ShowFailAlertDialog();
}
@Override
public void onComplete() {
ShowSuccessAlertDialog();
}
});
When you need to stop the task you can just call:
subscription.dispose();
I'm new at this, but I got a working example..
//Observable
Observable.just("input_parameter")
.subscribeOn(Schedulers.io())//creation of secondary thread
.map(new Function<String, String>() {//<input obj,return obj>
@Override
public String apply(String cad){//input obj
Log.d(TAG,"thread :"+Thread.currentThread().getName());
//runs in a secondary thread
return "result text: "+doLongNetworkOperation();
}
})
.observeOn(AndroidSchedulers.mainThread())
.subscribe(MyObserver);//now this runs in main thread
And MyObserver:
//Observer
Observer MyObserver = new Observer() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG,"onSubscribe thread:"+Thread.currentThread().getName());
}
@Override
public void onNext(Object value) {
Log.d(TAG,"on next, valor:<<"+value.toString()+">> \n nombre hilo:"+Thread.currentThread().getName());
}
@Override
public void onError(Throwable e) {
Log.d(TAG,"error "+e.toString());
}
@Override
public void onComplete() {
Log.d(TAG,"onCompleted thread:"+Thread.currentThread().getName());
}
};
Plz, let me know if this works for you.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With