I am new to RxJava, hence, asking this question. I have an input stream which I have to convert to a sequence of byte arrays of specific size. Something like:
Observable
.just(inputStream)
.map(new Func1<InputStream, Chunk>());
Here Chunk
is a custom class which contains the number of bytes read from the stream. Could someone help me understand how to do this in RxJava
fromArray example: Integer[] array = new Integer[10]; for (int i = 0; i < array. length; i++) { array[i] = i; } Observable<Integer> observable = Observable. fromArray(array); observable. subscribe(item -> System.
RxJava provides many methods in its library to create an Observable. Choosing which one to use can be difficult. My goal from this article is to help you in making this choice simpler by providing you with a mental map of different scenarios and which methods to use in each scenario.
Transforming Single to Observable is simple, as Single satisfies Observable's contract. Just call single. toObservable() and you're good.
An Observable is like a speaker that emits a value. It does some work and emits some values. An Operator is like a translator which translates/modifies data from one form to another form. An Observer gets those values.
Use StringObservable.from(InputStream, chunkSize)
from RxJavaString. It will return an Observable<byte[]>
and supports backpressure (won't read from the InputStream
unless requested by downstream).
By the way Observable.using
completes the picture for closing resources properly. You can use Bytes.from(file, chunkSize)
from rxjava-extras if you are reading bytes from a file (it uses Observable.using
under the covers).
You can use Observable.create followed by flatMap. Note that by default QueuedProducer is unbounded, you can supply a custom implementation including a bounding queue.
for example:
static class Chunk {
byte[] buf;
int size;
int index;
public Chunk(byte[] buf, int size, int index) {
this.buf = buf;
this.size = size;
this.index = index;
}
}
FileInputStream fis = ...
Observable<Chunk> o = Observable.just(fis).flatMap(new Func1<InputStream, Observable<Chunk>>() {
@Override
public Observable<Chunk> call(InputStream is) {
return Observable.create(new Observable.OnSubscribe<Chunk>() {
public void call(Subscriber<? super Chunk> subscriber) {
final QueuedProducer<Chunk> producer = new QueuedProducer<>(subscriber);
subscriber.setProducer(producer);
try {
int size = 0;
int index = 0;
do {
byte[] buf = new byte[4096];
size = is.read(buf);
if (size > 0) {
Chunk chunk = new Chunk(buf, size, index++);
System.out.println("Producing chunk #" + index + " of size: " + chunk.size);
producer.onNext(chunk);
}
} while (size >= 0);
producer.onCompleted();
} catch (IOException e) {
producer.onError(e);
} finally {
try {
System.out.println("Closing stream");
is.close();
} catch (IOException e) {
}
}
}
})
.subscribeOn(Schedulers.io());
}
});
o.subscribe(new Action1<Chunk>() {
@Override
public void call(Chunk chunk) {
System.out.println("Received chunk #" + chunk.index + " of size: " + chunk.size);
}
});
Thread.sleep(10000);
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With