I want to use a Stream
to parallelize processing of a heterogenous set of remotely stored JSON files of unknown number (the number of files is not known upfront). The files can vary widely in size, from 1 JSON record per file up to 100,000 records in some other files. A JSON record in this case means a self-contained JSON object represented as one line in the file.
I really want to use Streams for this and so I implemented this Spliterator
:
public abstract class JsonStreamSpliterator<METADATA, RECORD> extends AbstractSpliterator<RECORD> {
abstract protected JsonStreamSupport<METADATA> openInputStream(String path);
abstract protected RECORD parse(METADATA metadata, Map<String, Object> json);
private static final int ADDITIONAL_CHARACTERISTICS = Spliterator.IMMUTABLE | Spliterator.DISTINCT | Spliterator.NONNULL;
private static final int MAX_BUFFER = 100;
private final Iterator<String> paths;
private JsonStreamSupport<METADATA> reader = null;
public JsonStreamSpliterator(Iterator<String> paths) {
this(Long.MAX_VALUE, ADDITIONAL_CHARACTERISTICS, paths);
}
private JsonStreamSpliterator(long est, int additionalCharacteristics, Iterator<String> paths) {
super(est, additionalCharacteristics);
this.paths = paths;
}
private JsonStreamSpliterator(long est, int additionalCharacteristics, Iterator<String> paths, String nextPath) {
this(est, additionalCharacteristics, paths);
open(nextPath);
}
@Override
public boolean tryAdvance(Consumer<? super RECORD> action) {
if(reader == null) {
String path = takeNextPath();
if(path != null) {
open(path);
}
else {
return false;
}
}
Map<String, Object> json = reader.readJsonLine();
if(json != null) {
RECORD item = parse(reader.getMetadata(), json);
action.accept(item);
return true;
}
else {
reader.close();
reader = null;
return tryAdvance(action);
}
}
private void open(String path) {
reader = openInputStream(path);
}
private String takeNextPath() {
synchronized(paths) {
if(paths.hasNext()) {
return paths.next();
}
}
return null;
}
@Override
public Spliterator<RECORD> trySplit() {
String nextPath = takeNextPath();
if(nextPath != null) {
return new JsonStreamSpliterator<METADATA,RECORD>(Long.MAX_VALUE, ADDITIONAL_CHARACTERISTICS, paths, nextPath) {
@Override
protected JsonStreamSupport<METADATA> openInputStream(String path) {
return JsonStreamSpliterator.this.openInputStream(path);
}
@Override
protected RECORD parse(METADATA metaData, Map<String,Object> json) {
return JsonStreamSpliterator.this.parse(metaData, json);
}
};
}
else {
List<RECORD> records = new ArrayList<RECORD>();
while(tryAdvance(records::add) && records.size() < MAX_BUFFER) {
// loop
}
if(records.size() != 0) {
return records.spliterator();
}
else {
return null;
}
}
}
}
The problem I'm having is that while the Stream parallelizes beautifully at first, eventually the largest file is left processing in a single thread. I believe the proximal cause is well documented: the spliterator is "unbalanced".
More concretely, appears that the trySplit
method is not called after a certain point in the Stream.forEach
's lifecycle, so the extra logic to distribute small batches at the end of trySplit
is rarely executed.
Notice how all the spliterators returned from trySplit share the same paths
iterator. I thought this was a really clever way to balance the work across all spliterators, but it hasn't been enough to achieve full parallelism.
I would like the parallel processing to proceed first across files, and then when few large files are still left spliterating, I want to parallelize across chunks of the remaining files. That was the intent of the else
block at the end of trySplit
.
Is there an easy / simple / canonical way around this problem?
Spliterators, like other Iterators, are for traversing the elements of a source. A source can be a Collection, an IO channel or a generator function. It is included in JDK 8 for support of efficient parallel traversal(parallel programming) in addition to sequential traversal.
The Spliterator interface, introduced in Java 8, can be used for traversing and partitioning sequences. It's a base utility for Streams, especially parallel ones. In this article, we'll cover its usage, characteristics, methods and how to create our own custom implementations.
Your trySplit
should output splits of equal size, regardless of the size of the underlying files. You should treat all the files as a single unit and fill up the ArrayList
-backed spliterator with the same number of JSON objects each time. The number of objects should be such that processing one split takes between 1 and 10 milliseconds: lower than 1 ms and you start approaching the costs of handing off the batch to a worker thread, higher than that and you start risking uneven CPU load due to tasks which are too coarse-grained.
The spliterator is not obliged to report a size estimate, and you are already doing this correctly: your estimate is Long.MAX_VALUE
, which is a special value meaning "unbounded". However, if you have many files with a single JSON object, resulting in batches of size 1, this will hurt your performance in two ways: the overhead of opening-reading-closing the file may become a bottleneck and, if you manage to escape that, the cost of thread handoff may be significant compared to the cost of processing one item, again causing a bottleneck.
Five years ago I was solving a similar problem, you can have a look at my solution.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With