Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

What happens when different thread schedulers are used in the same react block?

This is a follow up question to is whenever signal() in react block order dependent? .

The following code using the default scheduler $*SCHEDULER lets the user exit immediately by pressing CTRL-C in the following event loop:

use v6;
my %scheduler;
my $use-default-scheduler = True;
if $use-default-scheduler {
    %scheduler = scheduler => $*SCHEDULER;
}    
react {
    whenever signal(SIGINT, |%scheduler) {
        say "Got signal";
        exit;
    }
    whenever Supply.from-list($*IN.lines, |%scheduler) {
        say "Got line";
        exit if $++ == 1 ;
    }
}

I am interested in what happens if I use two different thread schedulers in the same react loop? If I use the default thread scheduler of Supply.from-list() instead of $*SCHEDULER, by setting $use-default-scheduler = False in the above code. Now, the user cannot exit the react block immediately by pressing CTRL-C. If he presses CTRL-C the program simply hangs until enter is pressed.

So what actually happens here? Does the react focus on only one event loop at a time? (I here imagine two event loops, one for the default sceduler used in the first whenever for the SIGINT signal, and the other for the $*IN.lines supply). So the the react is now focusing on the from-list() scheduler for the $*IN.lines but somehow SIGINT has been ignored while in this event loop? So pressing CTRL-C does not change the state of the react block?

like image 792
Håkon Hægland Avatar asked May 22 '19 07:05

Håkon Hægland


1 Answers

To see what actually happens, let's rewrite the program to (sort-of) cover what react does. I'm going to ignore numerous details that don't matter too much to the question at hand.

To keep things a bit more compact, I'll just rewrite this segment of the provided program:

react {
    whenever signal(SIGINT, |%scheduler) {
        say "Got signal";
        exit;
    }
    whenever Supply.from-list($*IN.lines, |%scheduler) {
        say "Got line";
        exit if $++ == 1 ;
    }
}

First of all, a react { ... } is really just like await supply { ... } - that is, it taps the supply { } block and awaits its end.

await supply {
    whenever signal(SIGINT, |%scheduler) {
        say "Got signal";
        exit;
    }
    whenever Supply.from-list($*IN.lines, |%scheduler) {
        say "Got line";
        exit if $++ == 1 ;
    }
}

But what is a supply block? At its heart, supply (and so react) offer:

  • Concurrency control, so it will process one message at a time (thus we need some kind of lock; it uses Lock::Async for this)
  • Completion and error propagation (in a blatant bit of cheating, we'll use a Promise to implement this, because we only really want the react part; the real thing produces a resulting Supply that we can emit values into)
  • Completing automatically when there are no outstanding subscriptions (we'll track those in an SetHash)

Thus, we could rewrite the program as something like this:

await do {
    # Concurency control
    my $lock = Lock::Async.new;
    # Completion/error conveyance
    my $done = Promise.new;
    # What's active?
    my %active is SetHash;

    # An implementation a bit like that behind the `whenever` keyword, but with
    # plenty of things that don't matter for this question missing.
    sub whenever-impl(Supply $s, &block) {
        # Tap the Supply
        my $tap;
        $s.tap:
            # When it gets tapped, add the tap to our active set.
            tap => {
                $tap = $_; 
                %active{$_} = True;
            },
            # Run the handler for any events
            { $lock.protect: { block($_) } },
            # When this one is done, remove it from the %active list; if it's
            # the last thing, we're done overall.
            done => {
                $lock.protect: {
                    %active{$tap}:delete;
                    $done.keep() unless %active;
                }
            },
            # If there's an async error, close all taps and pass it along.
            quit => {
                $lock.protect: -> $err {
                    .close for %active.keys;
                    $done.quit($err);
                }
            }
    }

    # We hold the lock while doing initial setup, so you can rely on having
    # done all initialization before processing a first message.
    $lock.protect: {
        whenever-impl signal(SIGINT, |%scheduler), {
            say "Got signal";
            exit;
        }
        whenever-impl Supply.from-list($*IN.lines, |%scheduler), {
            say "Got line";
            exit if $++ == 1 ;
        }
    }

    $done
}

Notice that there's nothing whatsoever about schedulers or event loops in here; a supply or react doesn't care about who a message comes from, it just cares about its own integrity, enforced through the Lock::Async. Also notice that it doesn't introduce any concurrency either: it really is just a concurrency control construct.

Typically, one uses supply and react with data sources where you tap them and immediately get back control. We then proceed through to set up further whenever blocks, fall out of the setup phase, and the lock is available for any messages we receive. This behavior is what you get with just about all supplies that you usually encounter. It's the case with signal(...). It's also the case when you give Supply.from-list(...) an explicit scheduler, passing in $*SCHEDULER; in such a case, it schedules the loop that reads from $*IN on the pool and immediately hands back control.

The problem comes when we encounter something that does not behave like that. If we tap Supply.from-list($*IN.lines), it defaults to doing the read from $*IN on the current thread to produce a value to emit, because Supply.from-list uses CurrentThreadScheduler as its default. And what does that do? Just run the code it's asked to run immediately!

This leaves us with one more mystery, however. Given Lock::Async is not reentrant, then if we:

  1. Acquire the lock to do setup
  2. Call tap on the Supply.from-list(...), which runs synchronously and tries to emit a value
  3. Try to acquire the lock so we can process the value

Then we'd just get a deadlock, because we're trying to acquire a non-reentrant lock that is already held - by us. Indeed, if you run my desugar of the program here, that is precisely what happens: it hangs. However, the original code does not hang; it just behaves a bit awkwardly. What gives?

One of the things the real implementation does is detect such cases during the setup phase; it then takes a continuation, and resumes it after the setup phase is completed. This means we can do things like:

my $primes = supply {
    .emit for ^Inf .grep(*.is-prime);
}
react {
    whenever $primes { .say }
    whenever Promise.in(3) { done }
}

And have it work out. I won't reproduce that fun here, but it should be possible with sufficiently cunning use of gather/take.

like image 166
Jonathan Worthington Avatar answered Oct 31 '22 06:10

Jonathan Worthington