Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Java 8 lambda api

I'm working to migrate from Rx Java to Java 8 lambdas. One example I can't find is a way to buffer requests. For example, in Rx Java, I can say the following.

Observable.create(getIterator()).buffer(20, 1000, TimeUnit. MILLISECONDS).doOnNext(list -> doWrite(list));

Where we buffer 20 elements into a list, or timeout at 1000 milliseconds, which ever happens first.

Observables in RX are a "push" style observable, where as Streams use a java pull. Would this be possible implementing my own map operation in streams, or does the inability to emit cause problems with this since the doOnNext has to poll the previous element?

like image 585
tnine Avatar asked Feb 21 '15 18:02

tnine


2 Answers

One way to do it would be to use a BlockingQueue and Guava. Using Queues.drain, you can create a Collection that you could then call stream() on and do your transformations. Here's a link: Guava Queues.drain

And here's a quick example:

public void transform(BlockingQueue<Something> input) 
{
    List<Something> buffer = new ArrayList<>(20);
    Queues.drain(input, buffer, 20, 1000, TimeUnit.MILLISECONDS);
    doWrite(buffer);
}
like image 198
toadzky Avatar answered Nov 01 '22 19:11

toadzky


simple-react has similar operators, but not this exact one. It's pretty extensible though, so it should be possible to write your own. With the caveat that I haven't written this in an IDE or tested it, roughly a buffer by size with timeout operator for simple-react would look something like this

  import com.aol.simple.react.async.Queue;
  import com.aol.simple.react.stream.traits.LazyFutureStream;
  import com.aol.simple.react.async.Queue.ClosedQueueException;
  import com.aol.simple.react.util.SimpleTimer;
  import java.util.concurrent.TimeUnit;

  static LazyFutureStream batchBySizeAndTime(LazyFutureStream stream,int size,long time, TimeUnit unit) { 
    Queue queue = stream.toQueue();
    Function<Supplier<U>, Supplier<Collection<U>>> fn = s -> {
        return () -> {
            SimpleTimer timer = new SimpleTimer();
            List<U> list = new ArrayList<>();
            try {
                do {
                    if(list.size()==size())
                        return list;
                    list.add(s.get());
                } while (timer.getElapsedNanoseconds()<unit.toNanos(time));
            } catch (ClosedQueueException e) {

                throw new ClosedQueueException(list);
            }
            return list;
        };
    };
    return stream.fromStream(queue.streamBatch(stream.getSubscription(), fn));
}
like image 38
John McClean Avatar answered Nov 01 '22 20:11

John McClean