Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Perl async tasks for "any" code, no matter what it is?

I've been writing a "checker" system that performs various "checks" on various services, systems, databases, files, etc. A "check" is generic in nature and can be anything. All checks are reported in a common format weather they pass or fail, whatever that may be.

It is written in a modular OO fashion so that developers can simply follow the framework and write checks independently of one and other. Each object contains a shared reporting object which after they run a check they simply $self->{'reporting'}->report(params). The params are defined and developers are assumed to report appropriately. The reporting object then indexes these reports. My main loader script has entries such as the following:

my $reportingObject = new Checks::Reporting(params);
my @checks;

push @checks, new Checks::Check_One($reportingObject, params));
push @checks, new Checks::Check_One($reportingObject, params));
.
.
push @checks, new Checks::Check_N($reportingObject, params));

To kick off the checks and finalize the report once they are done I have been doing:

foreach my $check (@checks) {
    $check->run_stuff();
}

$reportingObject->finalize_report();

Now since these checks are totally independent (do not worry about the reporting object) they can be run in parallel. As an improvement I have done:

my @threads;
foreach my $check (@checks) {
    push @threads, async { $check->run_stuff(); }
}

foreach my $thread (@threads) {
    $thread->join;
}

#All threads are complete, and thus all checks are done
$reportingObject->finalize_report();

As I said earlier the developers will write Checks independently of each other. Some checks are simple and others are not. The simple checks may not have asynchronous code in them, but others might need to run asynchronously internally such as

sub do_check {
   my @threads;
   my @list = @{$self->{'list'}};

   foreach my $item (@list) {
      push @threads, async { 
                   #do_work_on_$item
                   #return 1 or 0 for success or fail
               };
      foreach my $thread (@threads) {
          my $res =  $thread->join;
          if($res == 1) {
              $self->{'reporting'}->report(params_here);
          }
      }
   }
}

As you can see the threading model allows me to do things in very vague terms. Each "Check" no matter what it is runs independently in its own thread. If an individual developer has asynchronous stuff to do, no matter what it is, he simply does it independently in its own thread. I want a model similar to this.

Unfortunately threads are slow and inefficient. All of the async libraries have specific watchers such as IO, etc. I do not want anything specific. I would like an event based model that allows me to simply kick off async tasks, no matter what they are, and simply notify when they are all done so I can move on.

Hopefully that explains it and you can point me in the right direction.

like image 641
john doe Avatar asked Jun 26 '13 15:06

john doe


1 Answers

This seems like a good fit for a boss-worker model:

  • Spawn a few workers at the beginning of the program. Make sure they all have access to a queue.

  • Enqueue as many checks as you like. The workers dequeue the checks, execute them, and enqueue the result in an output queue.

  • Your main thread looks at the results from the output thread, and does whatever it wants.

  • Join the workers in an END block

You probably want to look at Thread::Queue::Any if there is a chance you want to put coderefs into the queue.

Here is a fully runnable example:

use strict; use feature 'say';
use threads; use threads::shared; use Thread::Queue::Any;
use constant NUM_THREADS => 5;
local $Storable::Deparse = 1; local $Storable::Eval = 1;  # needed to serialize code

my $check_q  = Thread::Queue::Any->new;
my $result_q = Thread::Queue::Any->new;

# start the workers
{
  my $running :shared = NUM_THREADS;
  my @threads  = map threads->new(\&worker, $check_q, $result_q, \$running), 1..NUM_THREADS;

  END { $_->join for @threads }
}

# enqueue the checks
$check_q->enqueue($_) for sub {1}, sub{2}, sub{"hi"}, sub{ die };
$check_q->enqueue(undef) for 1..NUM_THREADS; # end the queue

while(defined( my $result = $result_q->dequeue )) {
  report($$result);
}

sub report {
  say shift // "FAILED";
}

sub worker {
  my ($in, $out, $running_ref) = @_;
  while (defined( my $check = $in->dequeue )) {
    my $result = eval { $check->() };
    $out->enqueue(\$result);
  }

  # last thread closes the door
  lock $$running_ref;
  --$$running_ref || $out->enqueue(undef);
}

This prints

1
2
hi
FAILED

in a slightly random order.

like image 184
amon Avatar answered Sep 29 '22 20:09

amon