When writing concurrent code, it's fairly common to want to spin off a separate (green or OS) thread and then ask the code in that thread to react to various thread-safe messages. Raku supports this pattern in a number of ways.
For example, many of the Channel examples in the docs show code that's similar to the code below (which prints one through ten across two threads).
my $channel = Channel.new;
start { react whenever $channel { say $_ }}
for ^10 { $channel.send($_) }
sleep 1
However, if we switch from the single-consumer world of Channel
s to the multi-consumer world of live Supply
s, the equivalent code no longer works.
my Supplier $supplier .= new;
start { react whenever $supplier { say $_ }}
for ^10 { $supplier.emit($_) }
sleep 1;
This code prints nothing. As I understand it, this is because the react
block was not listening when the values were emit
ed – it doesn't take long to start
a thread and react
to events, but it takes even less time to emit
ten values. And, logically enough, moving the sleep 1
line above the for
loop causes the values to print again.
And that's all fair enough – after all, the reason to use a live Supply
rather than an on-demand one is because you want the live semantics. That is, you want to only react
to future events, not to past ones.
But my question is whether there's a way to ask a react
block in a thread I've start
ed whether it's ready and/or to wait for it to be ready before sending data. (await
ing the start
block waits until the thread is done, rather than until it's ready, so that doesn't help here).
I'm also open to answers saying that I'm approaching this incorrectly/there's an X-Y problem – it's entirely possible that I'm straining against the direction the language is trying to push me or that live Supply
s aren't the correct concurrency abstraction here.
For this specific case (which is a relatively common one), the answer would be to use a Supplier::Preserving
:
my Supplier::Preserving $supplier .= new;
start { react whenever $supplier { say $_ }}
for ^10 { $supplier.emit($_) }
sleep 1;
Which retains sent values until $supplier
is first tapped, and then emits them.
An alternative, more general, solution is to use a Promise
:
my Supplier $supplier .= new;
# A Promise used just for synchronization
my Promise $ready .= new;
start react {
# Set up the subscriptions...
whenever $supplier { say $_ }
# ...and then signal that they are ready.
$ready.keep;
}
# Wait for the subscriptions to be set up...
await $ready;
# ...and off we go.
for ^10 { $supplier.emit($_) }
sleep 1;
The whenever
s in a react
block set up subscriptions as they are encountered, so by the time the Promise
is kept, all of the subscriptions will have been made. (Further, although not important here, no messages are processed until the body of the react
block has finished setting everything up.)
Finally I'll note that while Supplier
is often reached for, many times one would be better off writing a supply
block that emit
s the values. The example in the question is (quite reasonably enough) abstracted from a concrete application, but it's almost always worth asking, "can I do what I want by writing a supply
block" before reaching for a Supplier
or Supplier::Preserving
. If you really do need to broadcast values or need to distribute asynchronous inputs to multiple places, there's a solid case for Supplier
; if it's just a single stream of values to be produced once tapped, there probably isn't.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With