I currently have a script that kicks off threads to perform various actions on several directories. A snippet of my script is:
#main
sub BuildInit {
my $actionStr = "";
my $compStr = "";
my @component_dirs;
my @compToBeBuilt;
foreach my $comp (@compList) {
@component_dirs = GetDirs($comp); #populates @component_dirs
}
print "Printing Action List: @actionList\n";
#---------------------------------------
#---- Setup Worker Threads ----------
for ( 1 .. NUM_WORKERS ) {
async {
while ( defined( my $job = $q->dequeue() ) ) {
worker($job);
}
};
}
#-----------------------------------
#---- Enqueue The Work ----------
for my $action (@actionList) {
my $sem = Thread::Semaphore->new(0);
$q->enqueue( [ $_, $action, $sem ] ) for @component_dirs;
$sem->down( scalar @component_dirs );
print "\n------>> Waiting for prior actions to finish up... <<------\n";
}
# Nothing more to do - notify the Queue that we're not adding anything else
$q->end();
$_->join() for threads->list();
return 0;
}
#worker
sub worker {
my ($job) = @_;
my ( $component, $action, $sem ) = @$job;
Build( $component, $action );
$sem->up();
}
#builder method
sub Build {
my ( $comp, $action ) = @_;
my $cmd = "$MAKE $MAKE_INVOCATION_PATH/$comp ";
my $retCode = -1;
given ($action) {
when ("depend") { $cmd .= "$action >nul 2>&1" } #suppress output
when ("clean") { $cmd .= $action }
when ("build") { $cmd .= 'l1' }
when ("link") { $cmd .= '' } #add nothing; default is to link
default { die "Action: $action is unknown to me." }
}
print "\n\t\t*** Performing Action: \'$cmd\' on $comp ***" if $verbose;
if ( $action eq "link" ) {
# hack around potential race conditions -- will only be an issue during linking
my $tries = 1;
until ( $retCode == 0 or $tries == 0 ) {
last if ( $retCode = system($cmd) ) == 2; #compile error; stop trying
$tries--;
}
}
else {
$retCode = system($cmd);
}
push( @retCodes, ( $retCode >> 8 ) );
#testing
if ( $retCode != 0 ) {
print "\n\t\t*** ERROR IN $comp: $@ !! ***\n";
print "\t\t*** Action: $cmd -->> Error Level: " . ( $retCode >> 8 ) . "\n";
#exit(-1);
}
return $retCode;
}
The print
statement I'd like to be thread-safe is: print "\n\t\t*** Performing Action: \'$cmd\' on $comp ***" if $verbose;
Ideally, I would like to have this output, and then each component that is having the $action
performed on it, would output in related chunks. However, this obviously doesn't work right now - the output is interleaved for the most part, with each thread spitting out it's own information.
E.g.,:
ComponentAFile1.cpp
ComponentAFile2.cpp
ComponentAFile3.cpp
ComponentBFile1.cpp
ComponentCFile1.cpp
ComponentBFile2.cpp
ComponentCFile2.cpp
ComponentCFile3.cpp
... etc.
I considered executing the system commands using backticks, and capturing all of the output in a big string or something, then output it all at once, when the thread terminates. But the issue with this is (a) it seems super inefficient, and (b) I need to capture stderr
.
Can anyone see a way to keep my output for each thread separate?
clarification: My desired output would be:
ComponentAFile1.cpp
ComponentAFile2.cpp
ComponentAFile3.cpp
------------------- #some separator
ComponentBFile1.cpp
ComponentBFile2.cpp
------------------- #some separator
ComponentCFile1.cpp
ComponentCFile2.cpp
ComponentCFile3.cpp
... etc.
The print() function is a built-in function for printing a string on stdout and is not thread-safe. The print() function takes a string message, or an object that can be converted to a string.
the standard C printf() and scanf() functions use stdio so they are thread-safe.
Perl can do asynchronous programming with modules like IO::Async or Coro, but it's single threaded. You can compile Perl with threads, which provide multi-threaded computing.
To ensure your output isn't interrupted, access to STDOUT and STDERR must be mutually exclusive. That means that between the time a thread starts printing and finishes printing, no other thread can be allowed to print. This can be done using Thread::Semaphore[1].
Capturing the output and printing it all at once allows you to reduce the amount of time a thread holds a lock. If you don't do that, you'll effectively make your system single-threaded system as each thread attempts lock STDOUT and STDERR while one thread runs.
Other options include:
In both of those cases, you only need to lock it for a very short time span.
# Once
my $mutex = Thread::Semaphore->new(); # Shared by all threads.
# When you want to print.
$mutex->down();
print ...;
STDOUT->flush();
STDERR->flush();
$mutex->up();
or
# Once
my $mutex = Thread::Semaphore->new(); # Shared by all threads.
STDOUT->autoflush();
STDERR->autoflush();
# When you want to print.
$mutex->down();
print ...;
$mutex->up();
You can utilize the blocking behavior of $sem->down
if it attempts to decrease the semaphore counter below zero, as mentioned in perldoc perlthrtut
:
If
down()
attempts to decrement the counter below zero, it blocks until the counter is large enough.
So here's what one could do:
my $sem = Thread::Semaphore->new( 1 );
worker
and Build
for my $thr_counter ( 1 .. NUM_WORKERS ) {
async {
while ( defined( my $job = $q->dequeue() ) ) {
worker( $job, $thr_counter );
}
};
}
sub worker {
my ( $job, $counter ) = @_;
Build( $component, $action, $counter );
}
->down
and ->up
inside Build
(and nowhere else)sub Build {
my ( $comp, $action, $counter ) = @_;
... # Execute all concurrently-executed code here
$sem->down( 1 << ( $counter -1 ) );
print "\n\t\t*** Performing Action: \'$cmd\' on $comp ***" if $verbose;
# Execute all sequential 'chunks' here
$sem->up( 1 << ( $counter - 1) );
}
By using the thread counter to left-shift the semaphore counter, it guarantees that the threads won't trample on one another:
+-----------+---+---+---+---+
| Thread | 1 | 2 | 3 | 4 |
+-----------+---+---+---+---+
| Semaphore | 1 | 2 | 4 | 8 |
+-----------+---+---+---+---+
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With