While trying the OSGi PushStream library I felt that it was really slow. I have created two methods that do the same thing one using the PushStream and the other a simple BlockingQueue (see code below), the result is the following:
Queue needs 3 milliseconds to process 1000 events.
PushStream needs 31331 milliseconds to process 1000 events.
Why the PushStream is slower? What I am doing wrong?
With PushStream:
public class TestPush{
@Test
public void testPushStream() throws Exception {
final PromiseFactory pf = new PromiseFactory(PromiseFactory.inlineExecutor());
final PushStreamProvider psp = new PushStreamProvider();
final SimplePushEventSource<Integer> source =
psp.buildSimpleEventSource(Integer.class).withQueuePolicy(QueuePolicyOption.BLOCK).build();
final Deferred<Instant> startD = pf.deferred();
final Deferred<Instant> endD = pf.deferred();
psp.createStream(source).onClose(() -> endD.resolve( Instant.now()) ).forEach((i) -> {
if (i == 0) {
startD.resolve( Instant.now() );
}
});
final Promise<Long> nbEvent = psp.createStream(source).count();
for (int i = 0; i < 1000; i++) {
source.publish(i);
}
source.endOfStream();
System.out.println("PushStream needs "
+ Duration.between( startD.getPromise().getValue(), endD.getPromise().getValue() ).toMillis()
+ " milliseconds to process " + nbEvent.getValue() + " events.");
}
With ArrayBlockingQueue:
@Test
public void testBlockingQueue() throws Exception {
final PromiseFactory pf = new PromiseFactory(PromiseFactory.inlineExecutor());
final Executor e = Executors.newFixedThreadPool(1);
final ArrayBlockingQueue<Integer> abq = new ArrayBlockingQueue<>(32);
final Deferred<Instant> startD = pf.deferred();
final Deferred<Instant> endD = pf.deferred();
final Deferred<Integer> nbEvent = pf.deferred();
e.execute( () -> {
try {
Integer i = 0;
Integer last = 0;
do {
i = abq.take();
if (i == 0) {
startD.resolve(Instant.now());
} else if (i != -1) {
last = i;
}
}
while (i != -1);
endD.resolve(Instant.now());
nbEvent.resolve(last + 1);
}
catch (final InterruptedException exception) {
exception.printStackTrace();
}
});
for (int i = 0; i < 1000; i++) {
abq.put(i);
}
abq.put(-1);
System.out.println("Queue needs "
+ Duration.between( startD.getPromise().getValue(), endD.getPromise().getValue() ).toMillis()
+ " milliseconds to process " + nbEvent.getPromise().getValue() + " events.");
}
}
This is a fun question :)
Why the PushStream is slower? What I am doing wrong?
Thank you for not just assuming that the PushStream implementation sucks. In this case it is slower because (probably without realising) you asked it to be!
By default PushStreams are buffered. This means that they include a queue into which events are placed before they are processed. Buffering therefore does a few things which negatively affect the speed of throughput.
In this case the vast majority of the slowdown is because of the back pressure. When you create a stream using psp.createStream(source)
it is set up with a buffer of 32 elements and a linear back pressure policy based on the size of the buffer, returning one second when full, and 31 millis when it has one item in it. It is worth noting that 31 millis per element adds up to 30 seconds!
Importantly, the SimplePushEventSource always honours back pressure requests from the consumers that are added to it. This means that you may be pumping events into the SimplePushEventSource as fast as you can, but they will only be delivered as fast as they are requested by the pipeline.
If we remove the buffering from the push streams that you are creating then we get the following test:
@Test
public void testPushStream2() throws Exception {
final PromiseFactory pf = new PromiseFactory(PromiseFactory.inlineExecutor());
final PushStreamProvider psp = new PushStreamProvider();
final SimplePushEventSource<Integer> source =
psp.buildSimpleEventSource(Integer.class)
.withQueuePolicy(QueuePolicyOption.BLOCK)
.build();
final Deferred<Instant> startD = pf.deferred();
final Deferred<Instant> endD = pf.deferred();
psp.buildStream(source).unbuffered().build().onClose(() -> endD.resolve( Instant.now()) ).forEach((i) -> {
if (i == 0) {
startD.resolve( Instant.now() );
}
});
final Promise<Long> nbEvent = psp.buildStream(source).unbuffered().build().count();
for (int i = 0; i < 1000; i++) {
source.publish(i);
}
source.endOfStream();
System.out.println("PushStream needs "
+ Duration.between( startD.getPromise().getValue(), endD.getPromise().getValue() ).toMillis()
+ " milliseconds to process " + nbEvent.getValue() + " events.");
}
The result of running this (on my machine) is:
PushStream needs 39 milliseconds to process 1000 events.
This is obviously much closer to what you would expect, but it is still noticeably slower. Note that we could have still had some buffering, but tuned the PushbackPolicy. This would have given us faster throughput, but not quite as fast as this.
The next thing to notice is that you are using an onClose()
handler. This adds an extra stage into your push stream pipeline. You can actually move the onClose to be a result of the promise, decreasing the length of your pipeline (you only need to run it once).
@Test
public void testPushStream3() throws Exception {
final PromiseFactory pf = new PromiseFactory(PromiseFactory.inlineExecutor());
final PushStreamProvider psp = new PushStreamProvider();
final SimplePushEventSource<Integer> source =
psp.buildSimpleEventSource(Integer.class)
.withQueuePolicy(QueuePolicyOption.BLOCK)
.build();
final Deferred<Instant> startD = pf.deferred();
final Deferred<Instant> endD = pf.deferred();
psp.buildStream(source).unbuffered().build().forEach((i) -> {
if (i == 0) {
startD.resolve( Instant.now() );
}
});
final Promise<Long> nbEvent = psp.buildStream(source).unbuffered().build().count()
.onResolve(() -> endD.resolve( Instant.now()));
for (int i = 0; i < 1000; i++) {
source.publish(i);
}
source.endOfStream();
System.out.println("PushStream needs "
+ Duration.between( startD.getPromise().getValue(), endD.getPromise().getValue() ).toMillis()
+ " milliseconds to process " + nbEvent.getValue() + " events.");
}
The result of this version (on my machine) is:
PushStream needs 21 milliseconds to process 1000 events.
A key difference between the "raw array blocking queue" example and the PushStream example is that you actually create two PushStreams. The first does the work to capture the start time, the second to count the events. This forces the SimplePushEventSource to multiplex the events over multiple consumers.
What if we collapsed the behaviour into a single pipeline so that the SimplePushEventSource could use a fast-path delivery?
@Test
public void testPushStream4() throws Exception {
final PromiseFactory pf = new PromiseFactory(PromiseFactory.inlineExecutor());
final PushStreamProvider psp = new PushStreamProvider();
final SimplePushEventSource<Integer> source =
psp.buildSimpleEventSource(Integer.class)
.withQueuePolicy(QueuePolicyOption.BLOCK)
.build();
final Deferred<Instant> startD = pf.deferred();
final Deferred<Instant> endD = pf.deferred();
final Promise<Long> nbEvent = psp.buildStream(source).unbuffered().build()
.filter(i -> {
if (i == 0) {
startD.resolve( Instant.now() );
}
return true;
})
.count()
.onResolve(() -> endD.resolve( Instant.now()));
for (int i = 0; i < 1000; i++) {
source.publish(i);
}
source.endOfStream();
System.out.println("PushStream needs "
+ Duration.between( startD.getPromise().getValue(), endD.getPromise().getValue() ).toMillis()
+ " milliseconds to process " + nbEvent.getValue() + " events.");
}
The result of this version (on my machine) is:
PushStream needs 3 milliseconds to process 1000 events.
PushStreams are a fast, effective way to consume asynchronously arriving events, but it is very important to understand about what buffering behaviour is suitable for your application. If you have a big lump of data that you want to iterate over very quickly then you need to be careful how you set things up, as the buffering defaults are designed for a different use case!
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With