Consider this code where a tap takes awhile to complete. All the blocks are running simultaneously (immediately outputting) then sleeping. Most don't finish because the program ends sooner then they do:
my $supply = Supply.interval(0.2);
my $tap = $supply.tap: { say "1 $^a"; sleep 5; };
sleep 5;
The output (elided) has 25 lines (one for each tick of 0.2 in 5 seconds):
1. 0
1. 1
...
1. 24
Then I change that supply to .share
:
my $supply = Supply.interval(0.2).share;
my $tap = $supply.tap: { say "1. $^a"; sleep 5 };
sleep 5;
I only see one line of input but I expected the same output:
1. 1
The .share
makes it possible for multiple taps to get the same values.
my $supply = Supply.interval(0.2).share;
my $tap = $supply.tap: { say "1. $^a"; sleep 5 };
my $tap2 = $supply.tap: { say "2. $^a"; };
sleep 5;
Still the output has output only for the first tap and still has only one line. I expected 25 lines for each:
1. 1
The basic rules for Supply
are:
.map({ ...something with state... })
can be trusted not to cause conflicts over the state)Rule 3 doesn't really apply to share
since there's separate downstream operation chains after that point, but rules 1 and 2 do. The purpose of share
is to allow publish/subscribe, and also to provide for re-use of a chunk of processing by multiple downstream message processors. Introducing parallel message processing is a separate concern from this.
The are various options. One is to have the messages for parallel processing stuck into a Channel
. This explicitly introduces a place for the messages to be buffered (well, until you run out of memory...which is exactly why Supply
comes with a sender-pays back-pressure model). Coercing a Channel
back into a Supply
gets the values pulled from the Channel
and emitted on that Supply
on a pool thread. That way looks like:
my $supply = Supply.interval(0.2).share;
my $tap = $supply.Channel.Supply.tap: { say "1. $^a"; sleep 5 };
my $tap2 = $supply.tap: { say "2. $^a"; };
sleep 5;
Note that since whenever
automatically coerces the thing it's asked to react to to a Supply
, then that'd look like whenever $supply.Channel { }
, which makes it a pretty short solution - but at the same time nicely explicit in that it indicates how the normal back-pressure mechanism is being side-stepped. The other property of this solution is that it retains the order of the messages and still gives one-at-a-time processing downstream of the Channel
.
The alternative is to react to each message by instead starting some asynchronous piece of work to handle it. The start
operation on a Supply
schedules the block it is passed to run on the thread pool for each message that is received, thus not blocking the arrival of the next message. The result is a Supply
of Supply
. This forces one to tap each inner Supply
to actually make anything happen, which seems slightly counter-intuitive at first, but actually is for the good of the programmer: it makes it clear there's an extra bit of async work to keep track of. I very strongly suggest using this in combination with the react
/whenever
syntax, which does subscription management and error propagation automatically. The most direct transformation of the code in the question is:
my $supply = Supply.interval(0.2).share;
my $tap = supply { whenever $supply.start({ say "1. $^a"; sleep 5 }) { whenever $_ {} } }.tap;
my $tap2 = $supply.tap: { say "2. $^a"; };
sleep 5;
Although it's also possible to instead write it as:
my $supply = Supply.interval(0.2).share;
my $tap = supply { whenever $supply -> $a { whenever start { say "1. $a"; sleep 5 } {} } }.tap;
my $tap2 = $supply.tap: { say "2. $^a"; };
sleep 5;
Which points to the possibility writing a parallelize
Supply
combinator:
my $supply = Supply.interval(0.2).share;
my $tap = parallelize($supply, { say "1. $^a"; sleep 5 }).tap;
my $tap2 = $supply.tap: { say "2. $^a"; };
sleep 5;
sub parallelize(Supply $messages, &operation) {
supply {
whenever $messages -> $value {
whenever start operation($value) {
emit $_;
}
}
}
}
The output of this approach is rather different from the Channel
one, since the operations are all kicked off as soon as the message comes in. Also it doesn't retain message order. There's still an implicit queue (unlike the explicit one with the Channel
approach), it's just that now it's the thread pool scheduler's work queue and the OS scheduler that has to keep track of the in-progress work. And again, there's no back-pressure, but notice that it would be entirely possible to implement that by keeping track of outstanding Promises
and blocking further incoming messages with an await Promise.anyof(@outstanding)
.
Finally, I'll note that there is some consideration of hyper whenever
and race whenever
constructs to provide some language-level mechanism for dealing with parallel processing of Supply
messages. However the semantics of such, and how they play into the supply
-block design goals and safety properties, represent significant design challenges.
The taps of a Supply
are run sequentially within a single thread. So the code of the second tap will only be run after the first tap (which sleeps for 5 seconds). This shows in the following code:
my $supply = Supply.interval(0.2).share;
my $tap = $supply.tap: { say "1. $^a in #{+$*THREAD}" };
my $tap2 = $supply.tap: { say "2. $^a in #{+$*THREAD}" };
sleep 0.5;
===================
1. 1 in #4
2. 1 in #4
1. 2 in #4
2. 2 in #4
So the answer is currently: no
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With