Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Need help with designing "infinite" threads

I have some database table and need to process records from it 5 at a time as long as app is running. So, it looks like this:

  1. Get a record that hasn't been processed yet or not processing now by other threads.
  2. Process it (this is a long process that depends on internet connection so it could timeout/throw errors).
  3. Move to the next record. When reached end of table start from beginning.

I don't have much experience with threads, so I see two possible strategies:

Approach A.

1.Create new ExecutorService:

ExecutorService taskExecutor = Executors.newFixedThreadPool(5);

2.Add 5 tasks to it:

for (int i = 0; i < 5; i++) {
    taskExecutor.execute(new MyTask());
}

3.Each task will be infinite loop, that: reads a record from the table, processes it, and then gets another record.

The problems with this approach is how to inform other threads about which records are currenly processing. To do this I can either use "status" field in the table or just use some CopyOnWriteArraySet that holds currently processing ID's.

Approach B.

1.Create the same ExecutorService:

ExecutorService taskExecutor = Executors.newFixedThreadPool(5);

2. Have an infinite loop that selects records that need to be processed and passes them to the executor:

while (true) {
    //get next record here
    taskExecutor.execute(new MyTask(record));
    //monitor the queue and wait until some thread is done processing,
    //so I can add another record
}

3.Each task processes a single record.

The problem with this approach is that I need to add tasks to the executor's queue slower than they are processed to not let them pile up over time. It means I need to monitor not only which tasks are currently running but also when they are done processing, so I can add new records to the queue.

Personally I think first approach is better (easier), but I feel that the second one is more correct. What do you think? Or maybe I should do something completely different?

Also I can use Spring or Quartz libraries for this if needed.

Thanks.

like image 693
serg Avatar asked Dec 22 '22 12:12

serg


1 Answers

I think that CompletionService (and ExecutorCompletionService) can help you.

You submit all your tasks via completion service, and it allows you to wait until one of thread (any thread) finishes its task. This way you can submit next task as soon as there is free thread. This would imply that you use approach B.

Pseudo code:

Create ThreadPoolExecutor and ExecutorCompletionService wrapping it

while (true) {
  int freeThreads = executor.getMaximumPoolSize() - executor.getActiveCount()
  fetch 'freeThreads' tasks and submit to completion service (which
                                      in turn sends it to executor)

  wait until completion service reports finished task (with timeout)
}

Timeout in wait helps you to avoid situation when there was no task in the queue, so all threads are idle, and you are waiting until one of them finishes -- which would never happen.

You can check for number of free threads via ThreadPoolExecutor methods: getActiveCount (active threads) and getMaximumPoolSize (max available configured threads). You will need to create ThreadPoolExecutor directly, or to cast object returned from Executors.newFixedThreadPool(), although I would prefer direct creation... see source of Executors.newFixedThreadPool() method for details.

like image 76
Peter Štibraný Avatar answered Jan 02 '23 19:01

Peter Štibraný