The case I'm into right now is quite hard to explain so I will write a simpler version just to explain the issue.
I have an Observable.from()
which emits a sequence of files defined by an ArrayList
of files. All of these files should be uploaded to a server. For that I have an function that does the job and returns an Observable
.
Observable<Response> uploadFile(File file);
When I run this code it gets crazy, the Observable.from()
emits all of the files and they are uploaded all at ones, or at least for a max of threads it can handle.
I want to have a max of 2 file uploads in parallel. Is there any operator that can handle this for me?
I tried buffer, window and some others but they seems to only emit two items together instead of having two parallel file uploads constantly. I also tried to set a max threads pool on the uploading part, but this cannot be used in my case.
There should be a simple operator for this right? Am I missing something?
By default, nothing in RxJava is multi-threaded. Multi-threading can easily be introduced, however, by using Schedulers. For example, if you did this: Observable.
Coming to RxJava, RxJava streams are prone to leaks, where a stream continues to process items even when you no longer care. Kotlin coroutines use structured concurrency, which makes it much easier to manage the lifecycle of all your concurrent code.
Flowable be the backpressure-enabled base reactive class. Let's understand the use of Flowable using another example. Suppose you have a source that is emitting data items at a rate of 1 Million items/second.
It changes the thread as many times as you write it. flatMap starts the chain only during root chain data emission. No actions are performed during the root stream subscription process. Operators interval/delay/timer subscribe to computation under the hood, by default.
I think all files are uploaded in parallel because you're using flatMap()
, which executes all transformations simultaneously. Instead you should use concatMap()
, which runs one transformation after another. And to run two parallel uploads you need to call window(2)
on you files observable and then invoke flatMap()
as you did in your code.
Observable<Response> responses =
files
.window(2)
.concatMap(windowFiles ->
windowFiles.flatMap(file -> uploadFile(file));
);
UPDATE:
I found a better solution, which does exactly what you want. There's an overload of flatMap()
that accepts the max number of concurrent threads.
Observable<Response> responses =
files
.onBackpressureBuffer()
.flatMap(index -> {
return uploadFile(file).subscribeOn(Schedulers.io());
}, 2);
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With