Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to implement semaphore thread communication in Perl?

My Perl script needs to run multiple threads simultaneously...

use threads ('yield', 'exit' => 'threads_only');
use threads::shared;
use strict;
use warnings;
 no warnings 'threads';
use LWP::UserAgent;
use HTTP::Request;
use HTTP::Async;
use ...

...and such threads need to obtain some information from web, so HTTP::Async is used.

my $request = HTTP::Request->new;
   $request->protocol('HTTP/1.1');
   $request->method('GET');
   $request->header('User-Agent' => '...');

my $async = HTTP::Async->new( slots            => 100,
                              timeout          => REQUEST_TIMEOUT,
                              max_request_time => REQUEST_TIMEOUT );

But some threads need to access web only when other thread(s) says so.

my $start = [Time::HiRes::gettimeofday()];
my @threads = ();
foreach ... {
  $thread = threads->create(
    sub {
           local $SIG{KILL} = sub { threads->exit };
           my $url = shift;
           if ($url ... ) {
             # wait for "go" signal from other threads
           }
           my ($response, $data);
           $request->url($url);
           $data = '';
           $async->add($request);
           while ($response = $async->wait_for_next_response) {
             threads->yield();
             $data .= $response->as_string;
           }
           if ($data ... ) {
             # send "go" signal to waiting threads
           }
         }
       }, $_);

  if (defined $thread) {
    $thread->detach;
    push (@threads, $thread);
  }
}

There might be one or more threads waiting for "go" signal and there might be one or more threads that such "go" signal can send. At the beginning the status of semaphore is "wait" and once it turns to "go", it will stay so.

Finally, app checks max running time. If threads are running too long, self-termination signal is sent.

my $running;
do {
  $running = 0;
  foreach my $thread (@threads) {
    $running++ if $thread->is_running();
  }
  threads->yield();
} until (($running == 0) || 
         (Time::HiRes::tv_interval($start) > MAX_RUN_TIME));
$running = 0;
foreach my $thread (@threads) {
  if ($thread->is_running()) {
    $thread->kill('KILL');
    $running++;
  }
}
threads->yield();

Now to the point. My questions are:

  1. How can I most effectively code waiting "semaphore" in the script (see comments in script above). Should I simply use just shared variable with some dummy sleep loop?

  2. Do I need to add some sleep loop at the end of app to give time to threads for self-destruction?

like image 690
Ωmega Avatar asked May 07 '12 20:05

Ωmega


1 Answers

You might look at Thread::Queue to perform this work. You could setup a queue that would handle the signaling between the threads waiting for the 'go' signal and the threads sending the 'go' signal. Here's a quick mock-up that I haven't tested:

...
use Thread::Queue;
...
# In main body
my $q = Thread::Queue->new();
...
$thread = threads->create(
    sub {
           local $SIG{KILL} = sub { threads->exit };
           my $url = shift;
           if ($url ... ) {
             # wait for "go" signal from other threads
             my $mesg = $q->dequeue();
             # you could put in some termination code if the $mesg isn't 'go'
             if ($mesg ne 'go') { ... }
           }
           ...
           if ($data ... ) {
             # send "go" signal to waiting threads
             $q->enqueue('go');
           }
         }
       }, $_);
...

The threads that need to wait for a 'go' signal will wait on the dequeue method until something enters the queue. Once a message enters the queue one thread and only one thread will grab the message and process it.

If you wish to stop the threads so that they won't run, you can insert a stop message to the head of the queue.

$q->insert(0, 'stop') foreach (@threads);

There are examples in Thread::Queue and threads CPAN distributions that show this in more detail.

In response to your second question, the answer is, unfortunately, it depends. When you proceed to terminate your threads, what kind of clean up is required for a clean shutdown? What's the worst case scenario that could occur if the rug was yanked out from beneath the thread? You would want to plan in any time for the clean up to occur. The other option you could do is wait on each thread to actually complete.

The reason for my comment asking if you could remove the detach call is because this method allows the main thread to exit and not care what was happening to any child threads. Instead, if you remove this call, and add:

$_->join() foreach threads->list();

to the end of your main block, this will require the main application to wait for each thread to actually complete.

If you leave the detach method in place, then you will need to sleep at the end of your code if you require your threads to perform any sort of clean-up. When you call detach on a thread, what you are telling Perl is that you don't care what the thread is doing when your main thread exits. If the main thread exits and there are threads that still running that have been detached, then the program will finish with no warnings. However, if you don't require any clean-up, and you still call detach, feel free to exit whenever you like.

like image 87
Joel Avatar answered Nov 09 '22 04:11

Joel