Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to manage more than 32k threads

I've just learned multi-threaded programming today due to a project requirement.

I have a string processing task which can be nicely divided into small subtasks.

while (...){
    ...
    // assign task for handler
    Thread t = new Thread(new PCHandler(counter,pc));
    t.start();
    counter++;
}

The problem is that I will need around 500K threads for this task. And I run into an error:

Caused by: java.lang.OutOfMemoryError: unable to create new native thread

I searched the web and it seems JVM only allows me to make maximum 32K threads. There are some instructions to extend this limit by modifying the profile file. But I want to avoid modify user's computer. So could you give me an advice how to manage them wisely within the limit?

like image 695
sean Avatar asked Oct 02 '13 13:10

sean


People also ask

How many maximum threads can you run in the 32 bit machine?

The number of threads with the default stack size is approximately 2000 threads on a 32-bit system and 8000 billion on a 64-bit system.

What are the maximum number of threads that can be run concurrently?

What I know is that the maximum number of threads that can run concurrently on a normal CPU of a modern computer ranges from 8 to 16 threads. On the other hand, using GPUs thousands of threads can run concurrently without the scheduler interrupting any thread to schedule another one.

How can I increase the maximum number of JVM threads?

You can change these values by (temporal) running ulimit command or (permanent) editing /etc/security/limits. conf . This value is the system-global (including non-JVM processes) maximum number of threads. Check cat /proc/sys/kernel/threads-max , and increase if necessary.

How many threads can a process handle?

There is no limit of "you can't have more than this many" of threads or processes in Windows, but there are limits to how much memory you can use within the system, and when that runs out, you can't create more threads.


2 Answers

The problem is that I will need around 500K threads for this task. And I run into a [memory error].

Sounds to me that you should be using a thread-pool so you can submit a large number of jobs but only run them in a smaller number of threads.

// create a thread pool with 10 threads, this can be optimized to your hardware
ExecutorService threadPool = Executors.newFixedThreadPool(10);
// submit your handlers to the thread-pool
for (PCHandler handler : handlersToDo) {
    threadPool.submit(handler);
}
// once we have submitted all jobs to the thread pool, it should be shutdown
threadPool.shutdown();
...

If this won't work then I'd like to know more details about a system that actually needs 500k concurrently running threads. You may be able to achieve this with some memory setting tweaking and increasing the core memory on your box but I suspect that re-architecting your application is in order.

As @Peter mentions in comments, to optimize the number of threads in the pool you can get the number of available processors and other system specs to figure this out. But it depends highly on how CPU intensive your PCHandler class is. The more IO it does, the more concurrency can be taken advantage of. Probably doings some test runs with different values passed to the newFixedThreadPool(...) method is in order to determine the optimal setting there.

Also, depending on how large the 500k job objects are, you may want to limit their creation. To do that, you could create a thread-pool with a bounded queue which would limit the number of jobs that can be outstanding at any one point in time.

like image 175
Gray Avatar answered Nov 07 '22 17:11

Gray


Definitely not a good option to manage so many threads in a single machine by a single application unless it is a 16+ core machine or higher.

Consider factors like is your work I/O intensive or CPU intensive and make appropriate choices. Read here and here

I usually use

int maxThreadCount = Runtime.getRuntime().availableProcessors();
  ExecutorService executor = 
    new ThreadPoolExecutor(
      0, maxThreadCount - 1,
      1, TimeUnit.SECONDS,
      new LinkedBlockingDeque<>(maxThreadCount * 2),
      Executors.defaultThreadFactory(),
      new ThreadPoolExecutor.CallerRunsPolicy());

Now do the processing by adding your tasks and wait until everything is finished:

while (moreTaskstoDo) {
Callable c =...
    executor.submit(c);
}
executor.shutdown();
executor.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS);

Now with Java 8+ in place you could think of doing it more efficiently.

I did a small benchmarking myself. The below code is inspired by article and you can read more about Java 8 Handbook

Consider this function of finding a total.

//approach 1: old school
private static void findingTotalOldSchool()  {
    long total = 0;
    long start = System.nanoTime();

    for (long i = 1; i < LIMIT; i++) {
        total = total + (i * FACTOR);
    }

    long duration = (System.nanoTime() - start) / 1_000_000;
    System.out.println("Duration: "+duration);
    System.out.println("Total: "+total);
}

public static Range range(int max)  {
    return new Range(max);
}

// Approach 2: custom iterator
private static void findingTotalCustomIterator() {
    long total = 0;
    long start = System.nanoTime();

    for (long i : range(LIMIT)) {
        total = total + i * FACTOR;
    }

    long duration = (System.nanoTime() - start) / 1_000_000;
    System.out.println("Duration: "+duration);
    System.out.println("Total: "+total);
}

// Approach 3: using streams
private static void findingTotalStream() {
    long start = System.nanoTime(); 
    long total = 0;

    total = LongStream.range(1, LIMIT)
            .map(t -> t * FACTOR)
            .sum();

    long duration = (System.nanoTime() - start) / 1_000_000;
    System.out.println("Duration: "+duration);
    System.out.println("Total: "+total);
}

// Approach 4: using parallel streams
private static void findingTotalParallelStream() {
    long start = System.nanoTime(); 
    long total = 0;

    total = LongStream.range(1, LIMIT)
            .parallel()
            .map(t -> t * FACTOR)
            .sum();

    long duration = (System.nanoTime() - start) / 1_000_000;
    System.out.println("Duration: "+duration);
    System.out.println("Total: "+total);
}

// Approach 5: Using Completable Futures alone
private static void findingTotalCFS() {
     long start = System.nanoTime();

     List<CompletableFuture<Long>> futures = 
             LongStream.range(1, LIMIT).boxed()
             .map(t -> CompletableFuture.supplyAsync(() -> t * FACTOR ))
             .collect(Collectors.toList());
     //Code here --- could run ahead hence joining on futures
     long total = futures.stream().map(CompletableFuture::join).mapToLong(t->t).sum();

     long duration = (System.nanoTime() - start) / 1_000_000;
     System.out.println("Futures used: "+futures.size());
     System.out.println("Duration: "+duration);
     System.out.println("Total: "+total);
}

// Approach 6: Using Completable Futures managed by Executor Service
private static void findingTotalCFSE() {
    long start = System.nanoTime();

    ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() + 1);
    List<CompletableFuture<Long>> futures =
             LongStream.range(1, LIMIT).boxed()
             .map(t -> CompletableFuture.supplyAsync(() -> {
                    return t * FACTOR;
            }, executor))
             .collect(Collectors.toList());

     long total = futures.stream().map(CompletableFuture::join).mapToLong(t->t).sum();
     executor.shutdownNow();

     long duration = (System.nanoTime() - start) / 1_000_000;
     System.out.println("Futures used: "+futures.size());
     System.out.println("Duration: "+duration);
     System.out.println("Total: "+total);
}

// Approach 7: Using Executor service alone
private static void findingTotalES() {
    long start = System.nanoTime();

    ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() + 1);
    long total  = LongStream.
        range(1, LIMIT)
        .boxed()
        .map((i)->executorService.submit(new Operation(i, FACTOR)))
        .map((Future<Long> future)-> {
            try {
                return future.get();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }catch (ExecutionException e) {
                // Extract the actual exception from its wrapper
                Throwable t = e.getCause();
            } 
            return 0;
        })
        .mapToLong(t->t.longValue())
        .sum();

    executorService.shutdown();

    long duration = (System.nanoTime() - start) / 1_000_000;
    System.out.println("Duration: "+duration);
    System.out.println("Total: "+total);
}

class Operation implements Callable<Long> {

    long i; int j;
    Operation(long i, int j) { this.i = i; this.j = j; }

    @Override
    public Long call() {
        return i * j;
    }
}


class Range implements Iterable<Integer> {

    private int limit;

    public Range(int limit) {
        this.limit = limit;
    }

    @Override
    public Iterator<Integer> iterator() {
        final int max = limit;
        return new Iterator<Integer>() {

            private int current = 0;

            @Override
            public boolean hasNext() {
                return current < max;
            }

            @Override
            public Integer next() {
                if (hasNext()) {
                    return current++;   
                } else {
                    throw new NoSuchElementException("Range reached the end");
                }
            }

            @Override
            public void remove() {
                throw new UnsupportedOperationException("Can't remove values from a Range");
            }
        };
    }
}

We ran test runs with 2 sets of data. Each test should be run individually and not as part of a single whole run (as JVM optimizes and the result might vary).

//first run
final static int FACTOR = 1;
final static int LIMIT = 10000;

//second run
final static int FACTOR = 9876;
final static int LIMIT = 1000000;


System.out.println("-----Traditional Loop-----");
findingTotalOldSchool();
// 0 ms
// 4 ms     

System.out.println("-----Custom Iterator----");
findingTotalCustomIterator();
// 1 ms
// 15 ms


System.out.println("-----Streams-----");
findingTotalStream();
// 38 ms
// 33 ms        


System.out.println("-----Parallel Streams-----");
findingTotalParallelStream();
// 29 ms
// 64 ms


System.out.println("-----Completable Futures with Streams-----");
findingTotalCFS();
// 77 ms
// 635 ms       


System.out.println("-----Executor Service with Streams-----");
findingTotalES();
// 323 ms
// 12632 ms

System.out.println("-----Completable Futures with Executor Service with Streams-----");
findingTotalCFSE();
// 77 ms
// 844 ms   

Observations:

  • Traditional loop is fast most of the cases.
  • Use parallel streams when performance or IO operations are involved.
  • For simple iterations (involving substitutions or simple numeric calculations) go for traditional loop.
  • Completable Futures with Executor Service is flexible and a go to option when you need more control on the number of threads, etc. If what your doing is complex go for higher order systems that helps you to distribute it horizontally like Akka or Vert.x
like image 43
John Avatar answered Nov 07 '22 18:11

John