Consider the following shell script:
gzip -dc in.gz | sed -e 's/@/_at_/g' | gzip -c > out.gz
This has three processes working in parallel to decompress a stream, modify it, and re-compress it. Running time
I can see my user time is about twice that of my real time, which indicates the program is effectively working in parallel.
I've attempted to create the same program in Java by placing each task in it's own thread. Unfortunately, the multithreaded Java program is only about 30% faster than the single threaded version for the above sample. I've tried using both an Exchanger and a ConcurrentLinkedQueue. The ConcurrentLinkedQueue linked queue causes a lot of contention, although all three threads are generally kept busy. The Exchanger has lower contention, but is more complicated, and the doesn't seem to keep the slowest worker running 100% of the time.
I'm trying to figure out a pure Java solution to this problem without looking at one of the byte code weaving frameworks or a JNI based MPI.
Most of the concurrency research and APIs concern themselves with divide-and-conquer algorithms, giving each node work which is orthogonal and non-dependent on prior calculations. Another approach to concurrency is the pipeline approach, where each worker does some work and passes the data onto the next worker.
I'm not trying to find the most efficient way to sed a gzip'd file, but rather I'm looking at how to efficiently break down tasks in a pipeline, in order to reduce the runtime to that of the slowest task.
Current timings for a 10m line file are as follows:
Testing via shell real 0m31.848s user 0m58.946s sys 0m1.694s Testing SerialTest real 0m59.997s user 0m59.263s sys 0m1.121s Testing ParallelExchangerTest real 0m41.573s user 1m3.436s sys 0m1.830s Testing ConcurrentQueueTest real 0m44.626s user 1m24.231s sys 0m10.856s
I'm offering a bounty for a 10% improvement in Java, as measured by real time on a four core system with 10m rows of test data. Current sources are available on Bitbucket.
The simplest way to avoid problems with concurrency is to share only immutable data between threads. Immutable data is data which cannot be changed. To make a class immutable define the class and all its fields as final. Also ensure that no reference to fields escape during construction.
You can execute streams in serial or in parallel. When a stream executes in parallel, the Java runtime partitions the stream into multiple substreams. Aggregate operations iterate over and process these substreams in parallel and then combine the results.
Pipelines concurrency defines how many pipelines can run simultaneously.
Firstly, the process will only be as fast as the slowest piece. If the timing breakdown is:
by going multi-threaded you'll be done in at best 5 seconds instead of 7.
Secondly, rather than using the queues you're using, instead try to replicate the functionality of what you're copying and use PipedInputStream
and PipedOutputStream
to chain together processes.
Edit: there are a few ways of handling related tasks with Java concurrency utils. Divide it into threads. Firstly create a common base class:
public interface Worker { public run(InputStream in, OutputStream out); }
What this interface does is represent some arbitrary job that process input and generates output. Chain these together and you have a pipeline. You can abstract away the boilerplate too. For this we need a class:
public class UnitOfWork implements Runnable { private final InputStream in; private final OutputStream out; private final Worker worker; public UnitOfWork(InputStream in, OutputStream out, Worker worker) { if (in == null) { throw new NullPointerException("in is null"); } if (out == null) { throw new NullPointerException("out is null"); } if (worker == null) { throw new NullPointerException("worker is null"); } this.in = in; this.out = out; this.worker = worker; } public final void run() { worker.run(in, out); } }
So, for example, the Unzip
PART:
public class Unzip implements Worker { protected void run(InputStream in, OutputStream out) { ... } }
and so on for Sed
and Zip
. What then binds it together is this:
public static void pipe(InputStream in, OutputStream out, Worker... workers) { if (workers.length == 0) { throw new IllegalArgumentException("no workers"); } OutputStream last = null; List<UnitOfWork> work = new ArrayList<UnitOfWork>(workers.length); PipedOutputStream last = null; for (int i=0; i<workers.length-2; i++) { PipedOutputStream out = new PipedOutputStream(); work.add(new UnitOfWork( last == null ? in, new PipedInputStream(last), out, workers[i]); last = out; } work.add(new UnitOfWork(new PipedInputStream(last), out, workers[workers.length-1); ExecutorService exec = Executors.newFixedThreadPool(work.size()); for (UnitOfWork w : work) { exec.submit(w); } exec.shutdown(); try { exec.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS); } catch (InterruptedExxception e) { // do whatever } }
I'm not sure you can do much better than that and there is minimal code to write for each job. Then your code becomes:
public static processFile(String inputName, String outputName) { pipe(new FileInputStream(inputFile), new FileOutputStream(outputFile), new Zip(), new Sed(), new Unzip()); }
I individually verified the time taken, it seem like reading takes less than 10% of the time,and reading plus processing takes less than 30% of the whole time. So I took ParallelExchangerTest (best performer in your code) and modified it to just have 2 thread, first thread does reading & replace, and second thread does the writing.
Here are the figures to compare (on my machine Intel dual core (not core2) running ubuntu with 1gb ram)
> Testing via shell
real 0m41.601s
user 0m58.604s
sys 0m1.032s
> Testing ParallelExchangerTest
real 1m55.424s
user 2m14.160s
sys 0m4.768s
> ParallelExchangerTestMod (2 thread)
real 1m35.524s
user 1m55.319s
sys 0m3.580s
I knew that string processing takes longer time so I replace line.repalce with matcher.replaceAll, I got this figures
> ParallelExchangerTestMod_Regex (2 thread)
real 1m12.781s
user 1m33.382s
sys 0m2.916s
Now I took a step ahead, instead of reading one line at a time, I read char[] buffer of various sizes and timed it, (with the regexp search/replace,) I got these figures
> Testing ParallelExchangerTestMod_Regex_Buff (100 bytes processing at a time)
real 1m13.804s
user 1m32.494s
sys 0m2.676s
> Testing ParallelExchangerTestMod_Regex_Buff (500 bytes processing at time)
real 1m6.286s
user 1m29.334s
sys 0m2.324s
> Testing ParallelExchangerTestMod_Regex_Buff (800 bytes processing at time)
real 1m12.309s
user 1m33.910s
sys 0m2.476s
Looks like 500 bytes is optimal for the size of data.
I forked and have a copy of my changes here
https://bitbucket.org/chinmaya/java-concurrent_response/
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With