I'm working to migrate from Rx Java to Java 8 lambdas. One example I can't find is a way to buffer requests. For example, in Rx Java, I can say the following.
Observable.create(getIterator()).buffer(20, 1000, TimeUnit. MILLISECONDS).doOnNext(list -> doWrite(list));
Where we buffer 20 elements into a list, or timeout at 1000 milliseconds, which ever happens first.
Observables in RX are a "push" style observable, where as Streams use a java pull. Would this be possible implementing my own map operation in streams, or does the inability to emit cause problems with this since the doOnNext
has to poll the previous element?
One way to do it would be to use a BlockingQueue and Guava. Using Queues.drain
, you can create a Collection
that you could then call stream()
on and do your transformations. Here's a link: Guava Queues.drain
And here's a quick example:
public void transform(BlockingQueue<Something> input)
{
List<Something> buffer = new ArrayList<>(20);
Queues.drain(input, buffer, 20, 1000, TimeUnit.MILLISECONDS);
doWrite(buffer);
}
simple-react has similar operators, but not this exact one. It's pretty extensible though, so it should be possible to write your own. With the caveat that I haven't written this in an IDE or tested it, roughly a buffer by size with timeout operator for simple-react would look something like this
import com.aol.simple.react.async.Queue;
import com.aol.simple.react.stream.traits.LazyFutureStream;
import com.aol.simple.react.async.Queue.ClosedQueueException;
import com.aol.simple.react.util.SimpleTimer;
import java.util.concurrent.TimeUnit;
static LazyFutureStream batchBySizeAndTime(LazyFutureStream stream,int size,long time, TimeUnit unit) {
Queue queue = stream.toQueue();
Function<Supplier<U>, Supplier<Collection<U>>> fn = s -> {
return () -> {
SimpleTimer timer = new SimpleTimer();
List<U> list = new ArrayList<>();
try {
do {
if(list.size()==size())
return list;
list.add(s.get());
} while (timer.getElapsedNanoseconds()<unit.toNanos(time));
} catch (ClosedQueueException e) {
throw new ClosedQueueException(list);
}
return list;
};
};
return stream.fromStream(queue.streamBatch(stream.getSubscription(), fn));
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With