I want to process a list of files in a subtask in my script and I'm using Proc::Async
to spawn the subprocesses doing the work. The downside is that if I have a large list of files to process, it will spawn many subprocesses. I want to know how to limit the number of concurrent subprocesses that Proc::Async
spawns?
You can explicitly limit the number of Proc::Async
processes using this react block technique which Jonathan Worthington demonstrated in his concurrency/parallelism/asynchrony talk at the 2019 German Perl Workshop (see slide 39, for example). I'm using the Linux command echo N
as my "external process" in the code below.
#!/bin/env perl6
my @items = <foo bar baz>;
for @items -> $item {
start { say "Planning on processing $item" }
}
# Run 2 processes at a time
my $degree = 2;
react {
# Start $degree processes at first
run-one-process for 1..$degree;
# Run one, run-one again when it ends, thus maintaining $degree active processes at a time
sub run-one-process {
my $item = @items.shift // return;
my $proc = Proc::Async.new('echo', "processing $item");
my @output;
# Capture output
whenever $proc.stdout.lines { push @output, $_; }
# Print all the output, then start the next process
whenever $proc.start {
@output.join("\n").say;
run-one-process
}
}
}
Old Answer:
Based on Jonathan Worthington's talk Parallelism, Concurrency, and Asynchrony in Perl 6 (video, slides), this sounds most like parallelism (i.e. choosing to do multiple things at once; see slide 18). Asynchrony is reacting to things in the future, the timing of which we cannot control; see slides 39 and 40. As @raiph pointed out in his comment you can have one, the other, or both.
If you care about the order of results, then use hyper
, but if the order isn't important, then use race
.
In this example, adapted from Jonathan Worthington's slides, you build a pipeline of steps in which data is processed in batches of 32 filenames using 4 workers:
sub MAIN($data-dir) {
my $filenames = dir($data-dir).race(batch => 32, degree => 4);
my $data = $filenames.map(&slurp);
my $parsed = $data.map(&parse-climate-data);
my $european = $parsed.grep(*.continent eq 'Europe');
my $max = $european.max(by => *.average-temp);
say "$max.place() is the hottest!";
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With