Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to limit the max number of parallel threads in perl

I have a program (Perl) that kicks off a huge amount of threads (each one in charge of creating graphics based on data processing). Each thread I start using:

my @threads //list to store threads that have been launched

push @threads,threads->create(\mySubName,params...);

The threads fire off correctly but after a while, after I have opened several of them Perl interpreter crashes (I assume it is related to memory?). So my solution is to limit the number of threads I open at a time, I picked 15. And I want to add a sub before each create line to check if it is ok to fire off the next thread or perform a sleep while I wait for one to finish. This is how I tried to do it.

sub checkThreads{
    my $addThread = 0;
    until($addThread){
        my $totalThreads = 0;
        foreach my $task (@threads){
            if($task->is_running()){$totalThreads++;}
        }
        if($totalThreads <= 15 ){
            print "Ok to add new thread, carry on!\n";
            $addthread = 1;
        }else{
            print "Waiting for $totalThreads threads to fire next one...\n";
            sleep 2;
        }
    }
}

So each time I want to create a new thread I would just call

&checkThreads;

And that would take care to create a delay while I wait for some threads to clean up. The problem is that when I call that sub, the moment I hit the line where I check:

$task->is_running()

The program exits and stops running without any error or warning. I just want a sub that counts the running threads to limit them.

How can I perform this count successfully?

Other things I have tried are evaluating the following line:

scalar(threads->list());

But that gives me a weird value, like it is an unblessed reference I believe that looks like:

threads=SCALAR(0x80fea8c)
like image 572
gorba Avatar asked Jul 21 '12 23:07

gorba


2 Answers

Thread::Semaphore provides a counting semaphore to limit concurrency:

my $sem = Thread::Semaphore->new(15); # max 15 threads
my @threads = map {
    # request a thread slot, waiting if none are available:
    $sem->down;
    threads->create(\&mySubName, @params)
} 0..100;
$_->join for @threads;

And in your function:

sub mySubName {
    do_stuff();
    # release slot:
    $sem->up;
}
like image 180
Richard Simões Avatar answered Oct 15 '22 03:10

Richard Simões


Looking the docs,

my $count = threads->list();

should work, contrary to what you say. What do the docs for the version of threads you are using say? Well, you could use the following as a workaround.

my $count = () = threads->list();
like image 33
ikegami Avatar answered Oct 15 '22 02:10

ikegami