I have a problem with Flowables and adding them to the compositeDisposables. I want to switch from an Observable to a Flowable as the operation might emit 1000 or more values. Im somewhat unexperienced with rxjava2 so please forgive me if that question is stupid :)
So far I used the observable like this:
public Observable<String> uploadPictureRx(String path)
{
return Observable.create(new ObservableOnSubscribe<String>()
{
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception
{
Uri file = Uri.fromFile(new File(path));
String segment = file.getLastPathSegment();
UploadTask uploadTask = reference.child("SomeChild").child(segment).putFile(file);
uploadTask.addOnFailureListener(new OnFailureListener()
{
@Override
public void onFailure(@NonNull Exception exception)
{
e.onError(exception);
}
}).addOnSuccessListener(new OnSuccessListener<UploadTask.TaskSnapshot>()
{
@Override
public void onSuccess(UploadTask.TaskSnapshot taskSnapshot)
{
//noinspection VisibleForTests
downloadUrl = taskSnapshot.getDownloadUrl();
String url = downloadUrl.getPath();
e.onNext(url);
e.onComplete();
}
}).addOnProgressListener(new OnProgressListener<UploadTask.TaskSnapshot>()
{
@Override
public void onProgress(UploadTask.TaskSnapshot taskSnapshot)
{
//noinspection VisibleForTests
long bytes = taskSnapshot.getBytesTransferred();
String bytesS = String.valueOf(bytes);
e.onNext(bytesS);
}
});
}
});
}
and called the method like this:
private void uploadPicToFireBaseStorage(String path)
{
compositeDisposable.add(storageService.uploadPictureRx(path)
.subscribeOn(Schedulers.io())
.observeOn(mainScheduler)
.subscribeWith(new DisposableObserver<String>()
{
@Override
public void onNext(String s)
{
String ss = s;
System.out.println(ss);
}
@Override
public void onError(Throwable e)
{
e.printStackTrace();
}
@Override
public void onComplete()
{
view.displayToast("Picture Upload completed");
}
})
);
}
This works fine! However when I try to do the same with a Flowable instead of observable it wont compile:
public Flowable<String> uploadPictureRx(String path)
{
return Flowable.create(new FlowableOnSubscribe<String>()
{
@Override
public void subscribe(FlowableEmitter<String> e) throws Exception
{
Uri file = Uri.fromFile(new File(path));
String segment = file.getLastPathSegment();
UploadTask uploadTask = reference.child("somechild").child(segment).putFile(file);
uploadTask.addOnFailureListener(new OnFailureListener()
{
@Override
public void onFailure(@NonNull Exception exception)
{
e.onError(exception);
}
}).addOnSuccessListener(new OnSuccessListener<UploadTask.TaskSnapshot>()
{
@Override
public void onSuccess(UploadTask.TaskSnapshot taskSnapshot)
{
//noinspection VisibleForTests
downloadUrl = taskSnapshot.getDownloadUrl();
String url = downloadUrl.getPath();
e.onNext(url);
e.onComplete();
}
}).addOnProgressListener(new OnProgressListener<UploadTask.TaskSnapshot>()
{
@Override
public void onProgress(UploadTask.TaskSnapshot taskSnapshot)
{
//noinspection VisibleForTests
long bytes = taskSnapshot.getBytesTransferred();
String bytesS = String.valueOf(bytes);
e.onNext(bytesS);
}
});
}
}, BackpressureStrategy.BUFFER);
}
The Error is: Inferred type 'E' for type parameter 'E' is not within its bound; should implement 'org.reactivestreams.Subscriber
My guess is, that Flowable does not implement Disposable and thats why it wont compile. I have no clue if thats true or not, just my best guess so far. Or do I have to change subscribeWith() to subscribe()? I dont know what the impact of that change would be.
Anyway suggestions how to make this work and get this Flowable into my compositedisposable is really appreciated.
Thanks guys!
Edit:
Tried to change the DisposableObserver into a Subscriber. But this results in the following Error: Compiler Error
Flowables use Subscription instead of Disposable for the reason of Backpressure. Basically use Subscription.request() method to tell observable how many items I want for that moment.
Change your code:
private void uploadPicToFireBaseStorage(String path)
{
compositeDisposable.add(storageService.uploadPictureRx(path)
.subscribeOn(Schedulers.io())
.observeOn(mainScheduler)
.subscribeWith(new DisposableObserver<String>()
{
@Override
public void onNext(String s)
{
String ss = s;
System.out.println(ss);
}
@Override
public void onError(Throwable e)
{
e.printStackTrace();
}
@Override
public void onComplete()
{
view.displayToast("Picture Upload completed");
}
})
);
}
into
private void uploadPicToFireBaseStorage(String path)
{
compositeDisposable.add(storageService.uploadPictureRx(path)
.subscribeOn(Schedulers.io())
.observeOn(mainScheduler)
.subscribeWith(new ResourceSubscriber<String>()
{
@Override
public void onNext(String s)
{
String ss = s;
System.out.println(ss);
}
@Override
public void onError(Throwable e)
{
e.printStackTrace();
}
@Override
public void onComplete()
{
view.displayToast("Picture Upload completed");
}
})
);
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With