Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Process Json Array concurrently as well as in order as fast in Java

I need to find a solution to process Json array which contains 1 million elements and write to output as fast as possible . I choose threads to process the data concurrently. But the trickiest part is I need to write the data to the output in the order I received. Let me explain my problem with example.

Let say I have Json array as an input which has 10 elements. I need to check every integer whether the integer is even or odd first and then produce 2 lines per integer if its even or three line per integer if its odd. the line is of format

sequenceNumber_Integer

whereas sequence Number is incremented for every line. below is the example for the Json array of 4 elements which produce 10 lines of output.. I am using

Gson

to parse and iterate json array

[ 1, 2, 3, 4 ]

enter image description here

I am pretty new to Concurrent Programming, but I tried myself and managed to make it produce the result. Below is my example code.

    import com.google.gson.stream.JsonReader;

import java.io.BufferedOutputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.StringReader;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

public class SampleCheck {
    public static void main(String[] args) throws IOException, InterruptedException {
        String jsonStr = "[ 1, 2, 3, 4 ]";
        JsonReader jsonReader = new JsonReader(new StringReader(jsonStr));
        processJsonArray(jsonReader);
    }

    private static  void processJsonArray(JsonReader jsonReader) throws InterruptedException, IOException {
        String newLine = System.getProperty("line.separator");
        AtomicInteger writeIndex = new AtomicInteger(0);
        AtomicBoolean stop = new AtomicBoolean(false);
        ExecutorService executorService = Executors.newFixedThreadPool(4);
        ArrayBlockingQueue<Data> queue = new ArrayBlockingQueue<>(100);
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        DataOutputStream out = new DataOutputStream(byteArrayOutputStream);
        for (int i = 0; i < 4; i++) {
            executorService.submit(() -> {
                StringBuilder sb = new StringBuilder(5);
                while (!(stop.get() && queue.isEmpty())) {
                    Data data = queue.poll();
                    if (data == null) {
                        continue;
                    }
                    try {
                        int seq = data.getSeq();
                        String result = newLine;
                        if (data.getData() % 2 == 0) { //Even
                            result += seq++ + "_" + data.getData();
                            result += newLine;
                            result += seq + "_" + data.getData();

                        } else { //odd
                            result += seq++ + "_" + data.getData();
                            result += newLine;
                            result += seq++ + "_" + data.getData();
                            result += newLine;
                            result += seq + "_" + data.getData();
                        }
                        while (data.getIndex() > writeIndex.get()) {
                            //Do nothing and wait for other threads to complete
                        }
                        out.writeBytes(result);
                        writeIndex.incrementAndGet();

                    } catch (Exception ignore) {

                    }
                }
            });
        }
            int seq = 1;
            int index = 0;
            jsonReader.beginArray();
             while (true) {
            if(jsonReader.hasNext()) {
                int data = jsonReader.nextInt();
                queue.add(new Data(data, index, seq));
                index++;
                seq += (data % 2) == 0 ? 2 : 3;
            } else {
                break;
            }

        }
            stop.set(true);
            executorService.shutdown();
            executorService.awaitTermination(20, TimeUnit.MINUTES);
            out.close();
            System.out.println(new String(byteArrayOutputStream.toByteArray()));
        }

    private static class Data {
        private int data;
        private int index;
        private int seq;

        public Data(int data, int index, int seq) {
            this.data = data;
            this.index = index;
            this.seq = seq;
        }

        public int getData() {
            return data;
        }

        public int getIndex() {
            return index;
        }

        public int getSeq() {
            return seq;
        }
    }
}

But I need the experts advice to approach this problem in different way and gain the maximum performance. My code looks very verbose and I need the better solution than this if any or any changes I can accommodate to gain maximum performance. Can you guys help me or this code looks ok ?

PS: the above example is to show my problem. In real world, i get the data in zip stream (max 1 million) and write the lines to the zip output stream

Edit: Added more realistic exmaple. Processing Json array instead of List. I need help in processJsonArray method. in real world json reader needs to process 1 million elements

like image 649
sparker Avatar asked Jul 03 '20 03:07

sparker


2 Answers

This seems like a very good use case for parallel streams. Java will do all the hard work of splitting into separate threads and reassembling in order and you don't need to work on concurrency or threading at all.

Your code could be as simple as:

inputList.parallelStream()
    .flatMap(in -> createOutputLines(in))
    .forEach(out -> output(out));

Having said that, I would be very surprised if anything other than your IO has a material impact on performance. You would need to be doing very complex processing of your input for it to be more than a rounding error.

like image 175
sprinter Avatar answered Oct 06 '22 00:10

sprinter


As other people noticed, you cannot gain much parallel performance (if any) from processing a sequential stream. What I'd naively do to improve your current solution:

  • prefer byte arrays wherever possible (may affect if intermediate strings are created: int -> String -> char[] -> byte[] -> output);
  • avoid intermediate conversions wherever possible and save on to-string conversions (e.g., String.valueOf may affect the item above, probably a "save-into-byte-array" version of Integer.toString (like sprintf in C) would be great);
  • avoid intermediate strings while producing a result, especially for concatenation: string concatenation may be replaced with a more efficient StringBuilder by javac (for simple ... + ... + ... expressions if I'm not wrong);
  • write elements to the output stream / writer directly, without intermediate buffering that costs as objects in the heap;
  • use corresponding overload methods (for instance, print(...) and println(...));
  • probably worth making the output generation unrolled, not in loops (don't know if a certain JVM can optimize small loops);
  • replace Gson JsonReader with a more efficient JSON parser.

Here is an example supposing you've provided the most realistic example you can:

public static void main(final String... args)
        throws IOException {
    // generate a sample ZIP file first
    try ( final ZipOutputStream zipOutputStream = new ZipOutputStream(new FileOutputStream("./in.zip"));
            final JsonWriter jsonWriter = new JsonWriter(new OutputStreamWriter(zipOutputStream)) ) {
        zipOutputStream.putNextEntry(new ZipEntry("n_array.json"));
        jsonWriter.beginArray();
        for ( int i = 1; i <= 1_000_000; i++ ) {
            jsonWriter.value(i);
        }
        jsonWriter.endArray();
    }
    // process the file
    final Stopwatch stopwatch = Stopwatch.createStarted();
    try ( final ZipInputStream zipInputStream = new ZipInputStream(new FileInputStream("./in.zip"));
            final ZipOutputStream zipOutputStream = new ZipOutputStream(new FileOutputStream("./out.zip")) ) {
        @Nullable
        final ZipEntry nextEntry = zipInputStream.getNextEntry();
        if ( nextEntry == null || !nextEntry.getName().equals("n_array.json") ) {
            throw new AssertionError();
        }
        zipOutputStream.putNextEntry(new ZipEntry("n_array.lst"));
        processJsonArray(zipInputStream, zipOutputStream);
    }
    System.out.println("Done in " + stopwatch.elapsed(TimeUnit.MILLISECONDS) + "ms");
}

private static final byte[] newLine = System.getProperty("line.separator")
        .getBytes();

private static void processJsonArray(@WillNotClose final InputStream in, @WillNotClose final OutputStream out)
        throws IOException {
    final JsonReader jsonReader = new JsonReader(new InputStreamReader(in));
    jsonReader.beginArray();
    final byte[] nBuffer = new byte[16];
    final byte[] seqBuffer = new byte[16];
    for ( int seq = 0; jsonReader.hasNext(); ) {
        final int n = jsonReader.nextInt();
        final int nLength = toBytes(nBuffer, String.valueOf(n));
        // #1 of twice/three times
        out.write(seqBuffer, 0, toBytes(seqBuffer, String.valueOf(++seq)));
        out.write('_');
        out.write(nBuffer, 0, nLength);
        out.write(newLine);
        // #2 of twice/three times
        out.write(seqBuffer, 0, toBytes(seqBuffer, String.valueOf(++seq)));
        out.write('_');
        out.write(nBuffer, 0, nLength);
        out.write(newLine);
        if ( n % 2 == 1 ) {
            // #3 of three times
            out.write(seqBuffer, 0, toBytes(seqBuffer, String.valueOf(++seq)));
            out.write('_');
            out.write(nBuffer, 0, nLength);
            out.write(newLine);
        }
    }
    jsonReader.endArray();
}

private static int toBytes(final byte[] buffer, final String s) {
    final int length = s.length();
    for ( int i = 0; i < length; i++ ) {
        buffer[i] = (byte) s.charAt(i);
    }
    return length;
}

The code above takes ~5s without proper benchmarking and warming up at my machine (whilst your version without intermediate ByteArrayOutputStream takes about 25s).

like image 35
terrorrussia-keeps-killing Avatar answered Oct 05 '22 23:10

terrorrussia-keeps-killing