Is it possible to refactor a traditional event listener into a Java 8 Stream, such that listener events become the stream source?
A client submits an arbitrary job, then listens for results:
Client client = new JobClient()
client.addTaskListener(this)
client.submitJobAsync( new MultiTaskJob()) //returns void, important (see below)
public void onTaskResult(TaskResult result){
if(result.isLastResult())
aggregateJobResults(result)
else
processResult(result)
}
For any job submitted, the client receives n results, but it doesn't know how many results it will receive (it uses isLastResult()
to determine when to stop and aggregate).
I want to refactor the listener into a "supplier", or something similiar, such that onTaskResult()
is the stream source:
Supplier<TaskResult> taskResultSupplier =
() -> Stream.of( .. ) //onTaskResult() feeds this
.map(result -> {
if(result.isLastResult())
//logic here
});
Something like that; if I can do it without the client knowing how many results to expect, I'm golden; right now, submitJobAsync()
returns void, I'd like to keep it that way, but I'm open to options as well...
After reading up on CompletableFutures by Tomasz Nurkiewicz for a similar scenario, an alternate option exists, assuming a minor change to the client:
List<CompletableFuture<TaskResult>> taskFutures =
client.submitJobAsync( new MultiTaskJob())
Here, the client gets a list of CompletableFutures<TaskResult>
, so we need to collect the results of the futures when they complete:
//processes all task result futures
List<TaskResult> = taskFutures.stream()
.map(taskResult ->
taskResult.thenApply(this::processResult))
.collect(Collectors.<TaskResult>toList());
The article also illustrates using CompletableFuture.allOf(..)
to perform final processing but only after all futures complete (it's pretty slick); that's where aggregation would occur in my case. No code to show for that here, though the article does a great job explaining it (I'm a total n00b with streams, though if I get it working I'll post the code:-D )
Java 8 offers the possibility to create streams out of three primitive types: int, long and double. As Stream<T> is a generic interface, and there is no way to use primitives as a type parameter with generics, three new special interfaces were created: IntStream, LongStream, DoubleStream.
Java 8 introduced streams. Not to be confused with input/output streams, these Java 8+ streams can also process data that goes through them. It was hailed as a great new feature that allowed coders to write algorithms in a more readable (and therefore more maintainable) way.
Non-Terminal Operations. The non-terminal stream operations of the Java Stream API are operations that transform or filter the elements in the stream. When you add a non-terminal operation to a stream, you get a new stream back as result.
It is possible to construct a Stream around your TaskResults. See this example:
import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.Spliterator;
import java.util.Spliterators;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
/**
* Created for http://stackoverflow.com/q/27670421/1266906.
*/
public class AsyncToStream {
public static void main(String[] args) {
System.out.println("Unbuffered Test:");
AsyncTaskResultIterator<TaskResult> taskListener1 = new AsyncTaskResultIterator<>();
new TaskResultGenerator(taskListener1, 5).start();
taskListener1.unbufferedStream().forEach(System.out::println);
System.out.println("Buffered Test:");
AsyncTaskResultIterator<TaskResult> taskListener2 = new AsyncTaskResultIterator<>();
new TaskResultGenerator(taskListener2, 5).start();
taskListener2.bufferedStream().forEach(System.out::println);
}
/**
* This class wraps a sequence of TaskResults into an iterator upto the first TaskResult where {@code }isLastResult()} returns {@code true}
*/
public static class AsyncTaskResultIterator<T extends TaskResult> implements Iterator<T>, TaskListener<T> {
/**
* This acts as an asynchronous buffer so we can easily wait for the next TaskResult
*/
private final BlockingQueue<T> blockingQueue;
/**
* Becomes {@code true} once {@code TaskResult.isLastResult()} is received
*/
private boolean ended;
public AsyncTaskResultIterator() {
blockingQueue = new LinkedBlockingQueue<>();
}
/**
* Waits on a new TaskResult and returns it as long as the previous TaskResult did not specify {@code isLastResult()}. Afterwards no more elements can be retrieved.
*/
@Override
public T next() {
if (ended) {
throw new NoSuchElementException();
} else {
try {
T next = blockingQueue.take();
ended = next.isLastResult();
return next;
} catch (InterruptedException e) {
throw new IllegalStateException("Could not retrieve next value", e);
}
}
}
@Override
public boolean hasNext() {
return !ended;
}
/**
* Enqueue another TaskResult for retrieval
*/
@Override
public void onTaskResult(T result) {
if (ended) {
throw new IllegalStateException("Already received a TaskResult with isLastResult() == true");
}
try {
blockingQueue.put(result);
} catch (InterruptedException e) {
throw new IllegalStateException("Could not enqueue next value", e);
}
}
/**
* Builds a Stream that acts upon the results just when they become available
*/
public Stream<T> unbufferedStream() {
Spliterator<T> spliterator = Spliterators.spliteratorUnknownSize(this, 0);
return StreamSupport.stream(spliterator, false);
}
/**
* Buffers all results and builds a Stream around the results
*/
public Stream<T> bufferedStream() {
Stream.Builder<T> builder = Stream.builder();
this.forEachRemaining(builder);
return builder.build();
}
}
public static class TaskResultImpl implements TaskResult {
private boolean lastResult;
private String name;
public TaskResultImpl(boolean lastResult, String name) {
this.lastResult = lastResult;
this.name = name;
}
@Override
public String toString() {
return "TaskResultImpl{" +
"lastResult=" + lastResult +
", name='" + name + '\'' +
'}';
}
@Override
public boolean isLastResult() {
return lastResult;
}
}
public static interface TaskListener<T extends TaskResult> {
public void onTaskResult(T result);
}
public static interface TaskResult {
boolean isLastResult();
}
private static class TaskResultGenerator extends Thread {
private final TaskListener<TaskResult> taskListener;
private final int count;
public TaskResultGenerator(TaskListener<TaskResult> taskListener, int count) {
this.taskListener = taskListener;
this.count = count;
}
@Override
public void run() {
try {
for (int i = 1; i < count; i++) {
Thread.sleep(200);
taskListener.onTaskResult(new TaskResultImpl(false, String.valueOf(i)));
}
Thread.sleep(200);
taskListener.onTaskResult(new TaskResultImpl(true, String.valueOf(count)));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
You did not provide your TaskResult and TaskListener-definitions so I made up my own. AsyncTaskResultIterator will work only for single TaskResult-sequences. If no TaskResult with isLastResult() == true
is supplied next()
and therefore also the unbuffered Stream and the buffered Stream generation will wait endlessly.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With