This is a follow up question to is whenever signal() in react block order dependent? .
The following code using the default scheduler $*SCHEDULER
lets the user exit immediately by pressing CTRL-C in the following event loop:
use v6;
my %scheduler;
my $use-default-scheduler = True;
if $use-default-scheduler {
%scheduler = scheduler => $*SCHEDULER;
}
react {
whenever signal(SIGINT, |%scheduler) {
say "Got signal";
exit;
}
whenever Supply.from-list($*IN.lines, |%scheduler) {
say "Got line";
exit if $++ == 1 ;
}
}
I am interested in what happens if I use two different thread schedulers in the same react
loop? If I use the default thread scheduler of Supply.from-list()
instead of $*SCHEDULER
, by setting $use-default-scheduler = False
in the above code. Now, the user cannot exit the react
block immediately by pressing CTRL-C
. If he presses CTRL-C
the program simply hangs until enter is pressed.
So what actually happens here? Does the react
focus on only one event loop at a time? (I here imagine two event loops, one for the default sceduler used in the first whenever
for the SIGINT signal, and the other for the $*IN.lines
supply). So the the react
is now focusing on the from-list()
scheduler for the $*IN.lines
but somehow SIGINT has been ignored while in this event loop? So pressing CTRL-C
does not change the state of the react
block?
To see what actually happens, let's rewrite the program to (sort-of) cover what react
does. I'm going to ignore numerous details that don't matter too much to the question at hand.
To keep things a bit more compact, I'll just rewrite this segment of the provided program:
react {
whenever signal(SIGINT, |%scheduler) {
say "Got signal";
exit;
}
whenever Supply.from-list($*IN.lines, |%scheduler) {
say "Got line";
exit if $++ == 1 ;
}
}
First of all, a react { ... }
is really just like await supply { ... }
- that is, it taps the supply { }
block and await
s its end.
await supply {
whenever signal(SIGINT, |%scheduler) {
say "Got signal";
exit;
}
whenever Supply.from-list($*IN.lines, |%scheduler) {
say "Got line";
exit if $++ == 1 ;
}
}
But what is a supply
block? At its heart, supply
(and so react
) offer:
Lock::Async
for this)Promise
to implement this, because we only really want the react
part; the real thing produces a resulting Supply
that we can emit
values into)SetHash
)Thus, we could rewrite the program as something like this:
await do {
# Concurency control
my $lock = Lock::Async.new;
# Completion/error conveyance
my $done = Promise.new;
# What's active?
my %active is SetHash;
# An implementation a bit like that behind the `whenever` keyword, but with
# plenty of things that don't matter for this question missing.
sub whenever-impl(Supply $s, &block) {
# Tap the Supply
my $tap;
$s.tap:
# When it gets tapped, add the tap to our active set.
tap => {
$tap = $_;
%active{$_} = True;
},
# Run the handler for any events
{ $lock.protect: { block($_) } },
# When this one is done, remove it from the %active list; if it's
# the last thing, we're done overall.
done => {
$lock.protect: {
%active{$tap}:delete;
$done.keep() unless %active;
}
},
# If there's an async error, close all taps and pass it along.
quit => {
$lock.protect: -> $err {
.close for %active.keys;
$done.quit($err);
}
}
}
# We hold the lock while doing initial setup, so you can rely on having
# done all initialization before processing a first message.
$lock.protect: {
whenever-impl signal(SIGINT, |%scheduler), {
say "Got signal";
exit;
}
whenever-impl Supply.from-list($*IN.lines, |%scheduler), {
say "Got line";
exit if $++ == 1 ;
}
}
$done
}
Notice that there's nothing whatsoever about schedulers or event loops in here; a supply
or react
doesn't care about who a message comes from, it just cares about its own integrity, enforced through the Lock::Async
. Also notice that it doesn't introduce any concurrency either: it really is just a concurrency control construct.
Typically, one uses supply
and react
with data sources where you tap
them and immediately get back control. We then proceed through to set up further whenever
blocks, fall out of the setup phase, and the lock is available for any messages we receive. This behavior is what you get with just about all supplies that you usually encounter. It's the case with signal(...)
. It's also the case when you give Supply.from-list(...)
an explicit scheduler, passing in $*SCHEDULER
; in such a case, it schedules the loop that reads from $*IN
on the pool and immediately hands back control.
The problem comes when we encounter something that does not behave like that. If we tap Supply.from-list($*IN.lines)
, it defaults to doing the read from $*IN
on the current thread to produce a value to emit
, because Supply.from-list
uses CurrentThreadScheduler
as its default. And what does that do? Just run the code it's asked to run immediately!
This leaves us with one more mystery, however. Given Lock::Async
is not reentrant, then if we:
tap
on the Supply.from-list(...)
, which runs synchronously and tries to emit
a valueThen we'd just get a deadlock, because we're trying to acquire a non-reentrant lock that is already held - by us. Indeed, if you run my desugar of the program here, that is precisely what happens: it hangs. However, the original code does not hang; it just behaves a bit awkwardly. What gives?
One of the things the real implementation does is detect such cases during the setup phase; it then takes a continuation, and resumes it after the setup phase is completed. This means we can do things like:
my $primes = supply {
.emit for ^Inf .grep(*.is-prime);
}
react {
whenever $primes { .say }
whenever Promise.in(3) { done }
}
And have it work out. I won't reproduce that fun here, but it should be possible with sufficiently cunning use of gather
/take
.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With