Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Multiplexing callbacks

Suppose I have a number of tasks in one application which may finish in any order. And I need to run some code when all of the tasks have finished. If that matters, the application is running under AnyEvent, but without Coro.

To some extent, AnyEvent's $cv->begin/$cv->end allow what I want. However, I'd like to have more fine-grained control. For instance, I'd like to be unable to "finish" a task twice. Ability to gather data from all tasks would also be nice.

Of course, this can be done. Set up lots of callbacks that share a hash; delete keys from that hash whenever a task finishes; call the megacallback when the hash is empty. I wonder if there's a more civilized way of doing it, maybe some CPAN module?

For instance, here's an imaginary API that would fill my need.

#!/usr/bin/perl -w 
use strict;

use Some::Module;

# Set goals
my $cb = Some::Module->new( sub { say 'BOOM!' } );
$cb->begin( qw(foo bar) );

# Much later, as tasks start getting done
$cb->end( foo => 42 );       # "return" value from task 'foo'
$cb->begin( 'baz' );         # can add more tasks, why not
$cb->end( 'bar' );           # just finish task 'bar'
# still waiting for 'baz' to finish at this point

# Finally, last hanging task is done
$cb->end( baz => 137 );      # BOOM!
# at this point, sub {}->( { foo=>42, bar=>undef, baz=>137 } ) 
#     has been called

See also my perlmonks question.

It there something like this?

like image 727
Dallaylaen Avatar asked Apr 05 '13 11:04

Dallaylaen


2 Answers

You might want to consider Future.

Specifically, for waiting on many things to complete, you can use Future->needs_all or similar:

my @things = ... # generate Futures to represent each thing

Future->needs_all( @things )
  ->on_done( sub {
     # some code here when all the things are done 
  });

Alternatively, you could also try Async::MergePoint which gives an API much closer to what you had in mind:

my $mp = Async::MergePoint->new( needs => [qw( foo bar )] );
$mp->close( on_done => sub {
   # some code here when all the things are done
});

$mp->done( foo => $value );
$mp->done( bar => );
like image 174
LeoNerd Avatar answered Nov 19 '22 05:11

LeoNerd


I'm certainly no expert in async stuff, but I think that Mojo::IOLoop::Delay (part of the Mojolicious suite) has a similar interface. Note that Mojo::IOLoop can be used with EV and thus AnyEvent.

Here is an example from the cookbook:

use Mojo::UserAgent;
use Mojo::IOLoop;

# Synchronize non-blocking requests portably
my $ua    = Mojo::UserAgent->new;
my $delay = Mojo::IOLoop->delay(sub {
  my ($delay, $tx, $tx2) = @_;
  ...
});
$ua->get('http://mojolicio.us'         => $delay->begin);
$ua->get('http://mojolicio.us/perldoc' => $delay->begin);
$delay->wait unless Mojo::IOLoop->is_running;

Also, note that $delay->begin returns a callback that is essentially the end method.

Other examples, like a cool 'steps' concept are shown in the ::Delay documentation.

EDIT

Here is a quick example. Note that a small syntax change has JUST happened in the delay class so this only works with Mojolicious 3.93+, not that this wasn't possible before, but the syntax was slightly different.

#!/usr/bin/env perl

use strict;
use warnings;
use v5.10;

use Mojo::IOLoop;

my $delay = Mojo::IOLoop->delay(sub{shift; say for @_});

my $end_foo = $delay->begin(0);
Mojo::IOLoop->timer( 0 => sub { $end_foo->('hi') } );

my $end_bar = $delay->begin(0);
Mojo::IOLoop->timer( 0 => sub { $end_bar->('bye') } );

$delay->wait unless $delay->ioloop->is_running; #start loop if necessary

Here we create the delay object, the argument is a finish event callback. For each async action I call begin which returns an end callback. By default these callbacks strip their first argument to remove a superfluous invocant (see example above), but we don't have those, so we pass 0 to indicate not to do that. For each action, I simply defer with a zero-wait timer. Arguments to the end callback are then queued for the end event, in order. Tada!

like image 42
Joel Berger Avatar answered Nov 19 '22 07:11

Joel Berger