Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Using ArrayBlockingQueue makes the process slower

I just recently used ArrayBlockingQueue for my multi-thread process. But it seemed like it slowed down rather than speeding up. Can you guys help me out? I'm basically importing a file (about 300k rows) and parsing them and storing them in the DB

public class CellPool {
private static class RejectedHandler implements RejectedExecutionHandler {
    @Override
    public void rejectedExecution(Runnable arg0, ThreadPoolExecutor arg1) {
      System.err.println(Thread.currentThread().getName() + " execution rejected: " + arg0);     
    }
  }

  private static class Task implements Runnable {
    private JSONObject obj;

    public Task(JSONObject obj) {
      this.obj = obj;
    }

    @Override
    public void run() {
      try {
        Thread.sleep(1);
        runThis(obj);
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
    }

    public void runThis(JSONObject obj) {
        //where the rows are parsed and stored in the DB, etc
    }
  }

  public static void executeCellPool(String filename) throws InterruptedException {
    // fixed pool fixed queue
    BlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(300000, true);
    ThreadPoolExecutor executor = new ThreadPoolExecutor(90, 100, 1, TimeUnit.MINUTES, queue);

    DataSet ds = CommonDelimitedParser.getDataSet(filename);
    final String[] colNames = ds.getColumns();
    while (ds.next()) {
        JSONObject obj = new JSONObject();
        //some JSON object
        Task t = new Task(obj);
        executor.execute(t);
    }
  }

}

like image 472
Kara Avatar asked Dec 15 '15 06:12

Kara


People also ask

Is ArrayBlockingQueue thread safe?

ArrayBlockingQueue is thread-safe. The Iterator provided in iterator() method traverses the elements in order from first (head) to last (tail). It supports an optional fairness policy for ordering waiting producer and consumer threads.

What is ArrayBlockingQueue?

ArrayBlockingQueue class is a bounded blocking queue backed by an array. By bounded, it means that the size of the Queue is fixed. Once created, the capacity cannot be changed. Attempts to put an element into a full queue will result in the operation blocking.

Is ArrayBlockingQueue synchronized?

No, you do not need to synchronize access to the object properties, or even use volatile on the member variables. All actions performed by a thread before it queues an object on a BlockingQueue "happen-before" the object is dequeued. That means that any changes made by the first thread are visible to the second.


2 Answers

tl;dr Big queue sizes can have a negative impact, as can large thread counts. Ideally, you want your consumers and producers to be working at a similar rate.

The reason the addition of the queue is causing issues is because you're using a very large queue (which is not necessary) that is taking up resources. Typically, a blocking queue blocks producers when there is no space left in the queue and consumers when there are no objects left in the queue. By creating a such a large one of a static size, Java is assigning that space in memory when you almost certainly aren't using all of it. It would be more effective to force your producer to wait for space in the queue to clear up if your consumers are consumers too slowly. You don't need to store all of the lines from your file in the queue at the same time.

Thread Pool Executor Queues are discussed in the javadoc here.

Bounded queues. A bounded queue (for example, an ArrayBlockingQueue) helps prevent resource exhaustion when used with finite maximumPoolSizes, but can be more difficult to tune and control. Queue sizes and maximum pool sizes may be traded off for each other: Using large queues and small pools minimizes CPU usage, OS resources, and context-switching overhead, but can lead to artificially low throughput. If tasks frequently block (for example if they are I/O bound), a system may be able to schedule time for more threads than you otherwise allow. Use of small queues generally requires larger pool sizes, which keeps CPUs busier but may encounter unacceptable scheduling overhead, which also decreases throughput.

Your large thread size of 90, combined with your very large pool size of 300000, is most likely using a lot of memory, and resulting in additional thread scheduling overhead. I would drop both of them considerably. I don't know what hardware you are running on, but since it sounds like you're writing an IO intensive program, I would try double the number of threads your CPU can handle, and play around with sizes for your blocking queue to see what works (note: I haven't researched this, this is based on my experience running queues and executors. Happy for others to suggest a different count!).

Of note, though, is that the execute() method will throw a RejectedExecutionException on failure to add to the queue if your queue is too small. One way of monitoring the queue would be to check it's capacity before scheduling a task. You can do this by calling:

executor.getQueue().remainingCapacity()

Don't use the executor.getQueue() method to alter the queue in any way, but it can be used for monitoring.

An alternative is to use an unbounded queue, such as a LinkedBlockingQueue without a defined capacity. This way, you won't need to deal with queue sizes. However, if your producers are running much faster than your consumers, you will once again have the issue of consuming too much memory.

Also, kostya is right, a JDBC batch insert would be faster.

like image 59
AndyN Avatar answered Oct 29 '22 15:10

AndyN


If you want to persist records from a file into a relational database as fast as possible you should use JDBC batch insert rather than inserting records one by one.

like image 25
kostya Avatar answered Oct 29 '22 14:10

kostya