I need to find a solution to process Json array which contains 1 million elements and write to output as fast as possible . I choose threads to process the data concurrently. But the trickiest part is I need to write the data to the output in the order I received. Let me explain my problem with example.
Let say I have Json array as an input which has 10 elements. I need to check every integer whether the integer is even or odd first and then produce 2 lines per integer if its even or three line per integer if its odd. the line is of format
sequenceNumber_Integer
whereas sequence Number is incremented for every line. below is the example for the Json array of 4 elements which produce 10 lines of output.. I am using
Gson
to parse and iterate json array
[ 1, 2, 3, 4 ]
I am pretty new to Concurrent Programming, but I tried myself and managed to make it produce the result. Below is my example code.
import com.google.gson.stream.JsonReader;
import java.io.BufferedOutputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.StringReader;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
public class SampleCheck {
public static void main(String[] args) throws IOException, InterruptedException {
String jsonStr = "[ 1, 2, 3, 4 ]";
JsonReader jsonReader = new JsonReader(new StringReader(jsonStr));
processJsonArray(jsonReader);
}
private static void processJsonArray(JsonReader jsonReader) throws InterruptedException, IOException {
String newLine = System.getProperty("line.separator");
AtomicInteger writeIndex = new AtomicInteger(0);
AtomicBoolean stop = new AtomicBoolean(false);
ExecutorService executorService = Executors.newFixedThreadPool(4);
ArrayBlockingQueue<Data> queue = new ArrayBlockingQueue<>(100);
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
DataOutputStream out = new DataOutputStream(byteArrayOutputStream);
for (int i = 0; i < 4; i++) {
executorService.submit(() -> {
StringBuilder sb = new StringBuilder(5);
while (!(stop.get() && queue.isEmpty())) {
Data data = queue.poll();
if (data == null) {
continue;
}
try {
int seq = data.getSeq();
String result = newLine;
if (data.getData() % 2 == 0) { //Even
result += seq++ + "_" + data.getData();
result += newLine;
result += seq + "_" + data.getData();
} else { //odd
result += seq++ + "_" + data.getData();
result += newLine;
result += seq++ + "_" + data.getData();
result += newLine;
result += seq + "_" + data.getData();
}
while (data.getIndex() > writeIndex.get()) {
//Do nothing and wait for other threads to complete
}
out.writeBytes(result);
writeIndex.incrementAndGet();
} catch (Exception ignore) {
}
}
});
}
int seq = 1;
int index = 0;
jsonReader.beginArray();
while (true) {
if(jsonReader.hasNext()) {
int data = jsonReader.nextInt();
queue.add(new Data(data, index, seq));
index++;
seq += (data % 2) == 0 ? 2 : 3;
} else {
break;
}
}
stop.set(true);
executorService.shutdown();
executorService.awaitTermination(20, TimeUnit.MINUTES);
out.close();
System.out.println(new String(byteArrayOutputStream.toByteArray()));
}
private static class Data {
private int data;
private int index;
private int seq;
public Data(int data, int index, int seq) {
this.data = data;
this.index = index;
this.seq = seq;
}
public int getData() {
return data;
}
public int getIndex() {
return index;
}
public int getSeq() {
return seq;
}
}
}
But I need the experts advice to approach this problem in different way and gain the maximum performance. My code looks very verbose and I need the better solution than this if any or any changes I can accommodate to gain maximum performance. Can you guys help me or this code looks ok ?
PS: the above example is to show my problem. In real world, i get the data in zip stream (max 1 million) and write the lines to the zip output stream
Edit: Added more realistic exmaple. Processing Json array instead of List. I need help in processJsonArray
method. in real world json reader needs to process 1 million elements
This seems like a very good use case for parallel streams. Java will do all the hard work of splitting into separate threads and reassembling in order and you don't need to work on concurrency or threading at all.
Your code could be as simple as:
inputList.parallelStream()
.flatMap(in -> createOutputLines(in))
.forEach(out -> output(out));
Having said that, I would be very surprised if anything other than your IO has a material impact on performance. You would need to be doing very complex processing of your input for it to be more than a rounding error.
As other people noticed, you cannot gain much parallel performance (if any) from processing a sequential stream. What I'd naively do to improve your current solution:
int
-> String
-> char[]
-> byte[]
-> output);String.valueOf
may affect the item above, probably a "save-into-byte-array" version of Integer.toString
(like sprintf
in C) would be great);StringBuilder
by javac
(for simple ... + ... + ...
expressions if I'm not wrong);print(...)
and println(...)
);JsonReader
with a more efficient JSON parser.Here is an example supposing you've provided the most realistic example you can:
public static void main(final String... args)
throws IOException {
// generate a sample ZIP file first
try ( final ZipOutputStream zipOutputStream = new ZipOutputStream(new FileOutputStream("./in.zip"));
final JsonWriter jsonWriter = new JsonWriter(new OutputStreamWriter(zipOutputStream)) ) {
zipOutputStream.putNextEntry(new ZipEntry("n_array.json"));
jsonWriter.beginArray();
for ( int i = 1; i <= 1_000_000; i++ ) {
jsonWriter.value(i);
}
jsonWriter.endArray();
}
// process the file
final Stopwatch stopwatch = Stopwatch.createStarted();
try ( final ZipInputStream zipInputStream = new ZipInputStream(new FileInputStream("./in.zip"));
final ZipOutputStream zipOutputStream = new ZipOutputStream(new FileOutputStream("./out.zip")) ) {
@Nullable
final ZipEntry nextEntry = zipInputStream.getNextEntry();
if ( nextEntry == null || !nextEntry.getName().equals("n_array.json") ) {
throw new AssertionError();
}
zipOutputStream.putNextEntry(new ZipEntry("n_array.lst"));
processJsonArray(zipInputStream, zipOutputStream);
}
System.out.println("Done in " + stopwatch.elapsed(TimeUnit.MILLISECONDS) + "ms");
}
private static final byte[] newLine = System.getProperty("line.separator")
.getBytes();
private static void processJsonArray(@WillNotClose final InputStream in, @WillNotClose final OutputStream out)
throws IOException {
final JsonReader jsonReader = new JsonReader(new InputStreamReader(in));
jsonReader.beginArray();
final byte[] nBuffer = new byte[16];
final byte[] seqBuffer = new byte[16];
for ( int seq = 0; jsonReader.hasNext(); ) {
final int n = jsonReader.nextInt();
final int nLength = toBytes(nBuffer, String.valueOf(n));
// #1 of twice/three times
out.write(seqBuffer, 0, toBytes(seqBuffer, String.valueOf(++seq)));
out.write('_');
out.write(nBuffer, 0, nLength);
out.write(newLine);
// #2 of twice/three times
out.write(seqBuffer, 0, toBytes(seqBuffer, String.valueOf(++seq)));
out.write('_');
out.write(nBuffer, 0, nLength);
out.write(newLine);
if ( n % 2 == 1 ) {
// #3 of three times
out.write(seqBuffer, 0, toBytes(seqBuffer, String.valueOf(++seq)));
out.write('_');
out.write(nBuffer, 0, nLength);
out.write(newLine);
}
}
jsonReader.endArray();
}
private static int toBytes(final byte[] buffer, final String s) {
final int length = s.length();
for ( int i = 0; i < length; i++ ) {
buffer[i] = (byte) s.charAt(i);
}
return length;
}
The code above takes ~5s without proper benchmarking and warming up at my machine (whilst your version without intermediate ByteArrayOutputStream
takes about 25s).
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With