Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to do asynchronous www-mechanize using anyevent

I've been doing a fair amount of research on the topic and while there are some questions out there that relate, I'm really having a hard time understanding how to properly do async programming using AnyEvent and www-mechanize. I'm trying to stick with mechanize because it has a clean interface and has functions built-in that I'm expecting to do: (like get all images of a site etc). If there is no reliable/good way to do what I want, then I'll start looking at AnyEvent::HTTP but I figure I'd ask first before moving in that direction.

I'm a newbie to AnyEvent programming but have done a fair amount of perl and javascript / jquery async calls with callbacks before. These make a lot of sense to me but it's just not clicking for me with AnyEvent + Mech.

Here is the code I'm working on that pulls URLs from an upstream queue. give the URL, I want to one get that says pulls in all the images on a page, and then async. grabs all the images.

So pseudo-code would look something like this:

  • grab url from queue
  • get page
  • get all img url links
  • do many async calls on the img urls (store the imgs for example in a backend)

I've read, I cannot (after researching errors) block in an AnyEvent callback. How do I structure my program to do the async calls without blocking?

AE events can only be processed when AE-aware functions block, so I'm using LWP::Protocol::AnyEvent::http. It replaces the normal HTTP backend for LWP (Net:HTTP) with AnyEvent::HTTP, which is AE-aware.

The worker gets created like:

my Worker->new(upstream_job_url => "tcp://127.0.0.1:5555', run_on_create => 1);

Async part is sub _recv_msg which calls _proc_msg.

I already have an AnyEvent loop watching the ZeroMQ socket as per the ZeroMQ perl binding docs...

Any help much appreciated!

Code:

package Worker;

use 5.12.0;

use Moose;
use AnyEvent;
use LWP::Protocol::AnyEvent::http;

use ZMQ::LibZMQ3;
use ZMQ::Constants qw/ZMQ_PUSH ZMQ_PULL ZMQ_POLLIN ZMQ_FD/;

use JSON;
use WWW::Mechanize;
use Carp;
use Coro;


has 'max_children' => (
    is => 'rw',
    isa => 'Int',
    required => 1,
    default => sub { 0 }
);

has 'upstream_job_url' => (
    is => 'rw',
    isa => 'URI',
    required => 1,
);

has ['uri','sink_url'] => (
    is => 'rw',
    isa => 'URI',
    required => 0,
);

has 'run_on_create' => (
    is => 'rw',
    isa => 'Bool',
    required => 1,
    default => sub { 1 }
);

has '_receiver' => (
    is => 'rw',
    isa => 'ZMQ::LibZMQ3::Socket',
    required => 0
);

sub BUILD {
    my $self = shift;
    $self->start if $self->run_on_create;
}

sub start
{
    my $self = shift;
    $self->_init_zmq();

    my $fh = zmq_getsockopt( $self->_receiver, ZMQ_FD );
    my $w; $w = AnyEvent->io( fh => $fh, poll => "r", cb => sub { $self->_recv_msg } );
    AnyEvent->condvar->recv;
}

sub _init_zmq
{   
    my $self = shift;
    my $c = zmq_init() or die "zmq_init: $!\n";
    my $recv = zmq_socket($c, ZMQ_PULL) or die "zmq_socket: $!\n";
    if( zmq_connect($recv, $self->upstream_job_url) != 0 ) {
        croak "zmq_connect: $!\n";
    }
    $self->_receiver($recv);
}

sub _recv_msg
{
    my $self = shift;
    while(my $message = zmq_msg_data(zmq_recvmsg($self->_receiver)) ) {
        my $msg = JSON::from_json($message, {utf8 => 1});
        $self->uri(URI->new($msg->{url}));
        $self->_proc_msg;
    }
}

sub _proc_msg
{
    my $self = shift;
    my $c = async { 
        my $ua = WWW::Mechanize->new;
        $ua->protocols_allowed(['http']); 
        print "$$ processing " . $self->uri->as_string . "... ";
        $ua->get($self->uri->as_string);
        if ($ua->success()) {
            say $ua->status . " OK";
        } else { 
            say $ua->status . " NOT OK";
        }
    }; 
    $c->join;
}

1;

As you can see, I was trying Coro in the _proc_msg, I've tried just doing mech calls but get an error

AnyEvent::CondVar: recursive blocking wait attempted at lib/Worker.pm line 91.

Because $mech is still blocking in the callback. I'm not sure how to do the mech calls in my callback properly.


At ikegami's request, i've added the driver program that sends the urls. For test purposes, I have it just reading a RSS feed, and sending the links to the workers to attempt to process. I was curious just about basic structure of anyevent with the callbacks but I'm more than happy just to get help on the program in general. Here is the driver code:

#!/usr/local/bin/perl

use strict;
use warnings;
use v5.12.0;

use lib './lib';

use Config::General;
use Getopt::Long;
use Carp;
use AnyEvent;
use AnyEvent::Feed;
use Parallel::ForkManager;
use ZMQ::LibZMQ3;
use ZMQ::Constants qw(ZMQ_PUSH ZMQ_PULL);
use Worker;

# Debug
use Data::Dumper;
$Data::Dumper::Deparse = 1;

my $config_file = "feeds.cfg";

GetOptions(
    "--config|c" => \$config_file,
    "--help|h" => sub { usage(); exit(0); }
);

sub usage() 
{
    say "TODO";
}

$SIG{INT} = sub { croak; }; $SIG{TERM} = sub { croak; };
$SIG{CHLD} = 'IGNORE';

my $conf = Config::General->new($config_file) or croak "Couldn't open config file '$config_file' $!\n";

my %config = $conf->getall();
my @readers = ();
my @feeds = load_feeds(\%config);

my $mgr = Parallel::ForkManager->new( $config{'max_download_children'} ) or croak "Can't create fork manager: $!\n";
my $context = zmq_init() or croak "zmq_init: $!\n";
my $sender = zmq_socket($context, ZMQ_PUSH) or die "zmq_socket: $!\n";

foreach my $feed_cfg (@feeds) {
    my $reader = AnyEvent::Feed->new(url => delete $feed_cfg->{url}, %$feed_cfg);
    push(@readers, $reader); # save, don't go out of scope
}

# Fork Downloader children. These processes will look for incoming data
# in the img_queue and download the images, storing them in nosql
for ( 1 .. $config{'max_download_children'} ) {
    my $pid = $mgr->start; 
    if (!$pid) {
        # Child
        my $worker = Worker->new({
            upstream_job_url => URI->new('tcp://127.0.0.1:5555')
        });
        $mgr->finish;
        say "$$ exiting.";
        exit(0);
    } else {
        # Parent
        say "[forked child $pid] my pid is $$";
    }
}

if (zmq_bind($sender, 'tcp://127.0.0.1:5555') < 0) {
    croak "zmq_bind: $!\n";
}

# Event loop 
AnyEvent->condvar->recv;

sub load_feeds
{
    my $conf = shift;
    my @feeds = ();
    foreach my $feed ( keys %{$conf->{'feeds'}} ) {
        my $feed_ref = $conf->{'feeds'};
        $feed_ref->{$feed}->{'name'} = $feed;
        $feed_ref->{$feed}->{'on_fetch'} = \&fetch_feed_cb;
        push(@feeds, $feed_ref->{$feed});   
    }
    return @feeds;
}

sub fetch_feed_cb
{
    my ($feed_reader, $new_entries, $feed, $error) = @_;
    if (defined $error) {
        say "Error fetching feed: $error";
        return;
    }
    say "$$ checking for new feeds";
    for (@$new_entries) {
        my ($hash, $entry) = @$_;
        say "$$ sending " . $entry->link;
        zmq_send($sender, JSON::to_json( { url => $entry->link }, { pretty => 1, utf8 => 1 } ));
    }
}

Here is a sample run:

[forked child 40790] my pid is 40789
[forked child 40791] my pid is 40789
[forked child 40792] my pid is 40789
40789 checking for new feeds
40789 sending http://feedproxy.google.com/~r/PerlNews/~3/f5nNM3zYBt0/
40789 sending http://feedproxy.google.com/~r/PerlNews/~3/Ay9V5pIpFBA/
40789 sending http://feedproxy.google.com/~r/PerlNews/~3/5XCVvt75ppU/
40789 sending http://feedproxy.google.com/~r/PerlNews/~3/mWprjBD3UhM/
40789 sending http://feedproxy.google.com/~r/PerlNews/~3/NngMs9pCQew/
40789 sending http://feedproxy.google.com/~r/PerlNews/~3/wiUsvafLGFU/
40789 sending http://feedproxy.google.com/~r/PerlNews/~3/QMp6gnZpFcA/
40789 sending http://feedproxy.google.com/~r/PerlNews/~3/kqUb_rpU5dE/
40789 sending http://feedproxy.google.com/~r/PerlNews/~3/tHItKqKhGXg/
40789 sending http://feedproxy.google.com/~r/PerlNews/~3/7LleQbVnPmE/
FATAL: $Coro::IDLE blocked itself - did you try to block inside an event loop callback? Caught at lib/Worker.pm line 99.
FATAL: $Coro::IDLE blocked itself - did you try to block inside an event loop callback? Caught at lib/Worker.pm line 99.
FATAL: $Coro::IDLE blocked itself - did you try to block inside an event loop callback? Caught at lib/Worker.pm line 99.
40791 processing http://feedproxy.google.com/~r/PerlNews/~3/Ay9V5pIpFBA/... 
40790 processing http://feedproxy.google.com/~r/PerlNews/~3/f5nNM3zYBt0/... 
40792 processing http://feedproxy.google.com/~r/PerlNews/~3/5XCVvt75ppU/... ^C at /usr/local/perls/perl5162/lib/perl5/site_perl/darwin-thread-multi-2level/AnyEvent/Loop.pm line 231.

If I don't explicitly do a 'use Coro;' in Worker.pm, the coro FATAL errors don't show. I don't know how async was working before without further runtime errors.

Sample config file (feeds.cfg):

max_download_children = 3
<feeds>
    <feed1>
        url="http://feeds.feedburner.com/PerlNews?format=xml"   
        interval=60
    </feed1>
</feeds>

So I spent a little more time with this today. So the error of my ways doing a $c->join. I shouldn't do that since I can't block in the callback. Coro will schedule the async block and it will be done when it's done. The only thing I need to make sure to do is to somehow know when all the asyncs are done which I think I can figure out. Now the tricky part is trying to figure out this little piece of mystery:

sub _recv_msg
{
    my $self = shift;
    while(my $message = zmq_msg_data(zmq_recvmsg($self->_receiver)) ) {
        my $msg = JSON::from_json($message, {utf8 => 1});
        $self->uri(URI->new($msg->{url}));
        $self->_proc_msg;
    }
}

This while loop causes my async { } threads in _proc_msg to NOT RUN. Remove the while loop and just handle the first msg and the coros run. Leave the while loop in place and they never will get run. Strange to me, haven't figured out why yet.


Further updates:

zmq_msg_recv was blocking. Also, zmq_send in the parent can block. Have to use ZMQ_NOBLOCK. I split the worker and main into separate programs entirely.

like image 494
mikew Avatar asked Nov 02 '22 13:11

mikew


1 Answers

you could use https://metacpan.org/pod/AnyEvent::HTTP::LWP::UserAgent for async calls.

  use AnyEvent::HTTP::LWP::UserAgent;
  use AnyEvent;

  my $ua = AnyEvent::HTTP::LWP::UserAgent->new;
  my @urls = (...);
  my $cv = AE::cv;
  $cv->begin;
  foreach my $url (@urls) {
      $cv->begin;
      $ua->get_async($url)->cb(sub {
          my $r = shift->recv;
          print "url $url, content " . $r->content . "\n";
          $cv->end;
      });
  }
  $cv->end;
  $cv->recv;
like image 171
user1126070 Avatar answered Nov 15 '22 12:11

user1126070