Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to create an Observable sequence of bytes from an input stream in RxJava

Tags:

java

rx-java

I am new to RxJava, hence, asking this question. I have an input stream which I have to convert to a sequence of byte arrays of specific size. Something like:

Observable
  .just(inputStream)
  .map(new Func1<InputStream, Chunk>());

Here Chunk is a custom class which contains the number of bytes read from the stream. Could someone help me understand how to do this in RxJava

like image 978
Niranjan Avatar asked Jun 24 '16 16:06

Niranjan


People also ask

How do you make an Observable in RxJava?

fromArray example: Integer[] array = new Integer[10]; for (int i = 0; i < array. length; i++) { array[i] = i; } Observable<Integer> observable = Observable. fromArray(array); observable. subscribe(item -> System.

Can we create our own Observable in RxJava?

RxJava provides many methods in its library to create an Observable. Choosing which one to use can be difficult. My goal from this article is to help you in making this choice simpler by providing you with a mental map of different scenarios and which methods to use in each scenario.

How do I convert Observable to single RxJava?

Transforming Single to Observable is simple, as Single satisfies Observable's contract. Just call single. toObservable() and you're good.

What is RxJava Observable?

An Observable is like a speaker that emits a value. It does some work and emits some values. An Operator is like a translator which translates/modifies data from one form to another form. An Observer gets those values.


2 Answers

Use StringObservable.from(InputStream, chunkSize) from RxJavaString. It will return an Observable<byte[]> and supports backpressure (won't read from the InputStream unless requested by downstream).

By the way Observable.using completes the picture for closing resources properly. You can use Bytes.from(file, chunkSize) from rxjava-extras if you are reading bytes from a file (it uses Observable.using under the covers).

like image 67
Dave Moten Avatar answered Nov 03 '22 00:11

Dave Moten


You can use Observable.create followed by flatMap. Note that by default QueuedProducer is unbounded, you can supply a custom implementation including a bounding queue.

for example:

  static class Chunk {
    byte[] buf;
    int size;
    int index;
    public Chunk(byte[] buf, int size, int index) {
      this.buf = buf;
      this.size = size;
      this.index = index;
    }

  }

  FileInputStream fis = ...
  Observable<Chunk> o = Observable.just(fis).flatMap(new Func1<InputStream, Observable<Chunk>>() {

  @Override
  public Observable<Chunk> call(InputStream is) {
    return Observable.create(new Observable.OnSubscribe<Chunk>() {

      public void call(Subscriber<? super Chunk> subscriber) {
        final QueuedProducer<Chunk> producer = new QueuedProducer<>(subscriber);
        subscriber.setProducer(producer);
        try {
          int size = 0;
          int index = 0;
          do {
            byte[] buf = new byte[4096];
            size = is.read(buf);
            if (size > 0) {
              Chunk chunk = new Chunk(buf, size, index++);
              System.out.println("Producing chunk #" + index + " of size: " + chunk.size);
              producer.onNext(chunk);
            }
          } while (size >= 0);
          producer.onCompleted();
        } catch (IOException e) {
          producer.onError(e);
        } finally {
          try {
            System.out.println("Closing stream");
            is.close();
          } catch (IOException e) {
          }
        }
      }
    })
    .subscribeOn(Schedulers.io());
 }
});

o.subscribe(new Action1<Chunk>() {

  @Override
  public void call(Chunk chunk) {
    System.out.println("Received chunk #" + chunk.index + " of size: " + chunk.size);

  }

});

Thread.sleep(10000);
like image 28
yurgis Avatar answered Nov 03 '22 00:11

yurgis