I have to write a script that get some URLs in parallel and do some work. In the past I have always used Parallel::ForkManager
for such things, but now I wanted to learn something new and try asynchronous programming with AnyEvent
(and AnyEvent::HTTP
or AnyEvent::Curl::Multi
) ... but I'm having problem understanding AnyEvent and writing a script that should:
I have read many manuals, tutorials, but its still hard for me to understand differences between blocking and non-blocking code. I have found similar script at http://perlmaven.com/fetching-several-web-pages-in-parallel-using-anyevent, where Mr. Szabo explains the basics, but I still cant understand how to implement something like:
...
open my $fh, "<", $file;
while ( my $line = <$fh> )
{
# http request, read response, update MySQL
}
close $fh
...
... and add a concurrency limit in this case.
I would be very grateful for help ;)
Following Ikegami's advice I gave Net::Curl::Multi
a try. I'm very pleased with results. After years of using Parallel::ForkManager
just for concurrent grabbing thousands of URLs, Net::Curl::Multi
seems to be awesome.
Here is my code with while
loop on filehandle. It seems to work as it should, but considering it's my first time writing something like this I would like to ask more experienced Perl users to take a look and tell me if there are some potential bugs, something I missed, etc.
Also, if I may ask: as I don't fully understand how Net::Curl::Multi
's concurrency works, please tell me whether I should expect any problems with putting MySQL UPDATE command (via DBI
) inside RESPONSE
loop (besides higher server load obviously - I expect final script to run with about 50 concurrent N::C::M
workers, maybe more).
#!/usr/bin/perl
use Net::Curl::Easy qw( :constants );
use Net::Curl::Multi qw( );
sub make_request {
my ( $url ) = @_;
my $easy = Net::Curl::Easy->new();
$easy->{url} = $url;
$easy->setopt( CURLOPT_URL, $url );
$easy->setopt( CURLOPT_HEADERDATA, \$easy->{head} );
$easy->setopt( CURLOPT_FILE, \$easy->{body} );
return $easy;
}
my $maxWorkers = 10;
my $multi = Net::Curl::Multi->new();
my $workers = 0;
my $i = 1;
open my $fh, "<", "urls.txt";
LINE: while ( my $url = <$fh> )
{
chomp( $url );
$url .= "?$i";
print "($i) $url\n";
my $easy = make_request( $url );
$multi->add_handle( $easy );
$workers++;
my $running = 0;
do {
my ($r, $w, $e) = $multi->fdset();
my $timeout = $multi->timeout();
select $r, $w, $e, $timeout / 1000
if $timeout > 0;
$running = $multi->perform();
RESPONSE: while ( my ( $msg, $easy, $result ) = $multi->info_read() ) {
$multi->remove_handle( $easy );
$workers--;
printf( "%s getting %s\n", $easy->getinfo( CURLINFO_RESPONSE_CODE ), $easy->{url} );
}
# dont max CPU while waiting
select( undef, undef, undef, 0.01 );
} while ( $workers == $maxWorkers || ( eof && $running ) );
$i++;
}
close $fh;
Net::Curl is a rather good library that's extremely fast. Furthermore, it can handle parallel requests too! I'd recommend using this instead of AnyEvent.
use Net::Curl::Easy qw( :constants );
use Net::Curl::Multi qw( );
sub make_request {
my ( $url ) = @_;
my $easy = Net::Curl::Easy->new();
$easy->{url} = $url;
$easy->setopt( CURLOPT_URL, $url );
$easy->setopt( CURLOPT_HEADERDATA, \$easy->{head} );
$easy->setopt( CURLOPT_FILE, \$easy->{body} );
return $easy;
}
my $max_running = 10;
my @urls = ( 'http://www.google.com/' );
my $multi = Net::Curl::Multi->new();
my $running = 0;
while (1) {
while ( @urls && $running < $max_running ) {
my $easy = make_request( shift( @urls ) );
$multi->add_handle( $easy );
++$running;
}
last if !$running;
my ( $r, $w, $e ) = $multi->fdset();
my $timeout = $multi->timeout();
select( $r, $w, $e, $timeout / 1000 )
if $timeout > 0;
$running = $multi->perform();
while ( my ( $msg, $easy, $result ) = $multi->info_read() ) {
$multi->remove_handle( $easy );
printf( "%s getting %s\n", $easy->getinfo( CURLINFO_RESPONSE_CODE ), $easy->{url} );
}
}
This does exactly what you want, in an asynchronous fashion, and it does that by wrapping Net::Curl
in a safe fashion:
#!/usr/bin/env perl
package MyDownloader;
use strict;
use warnings qw(all);
use Moo;
extends 'YADA::Worker';
has '+use_stats'=> (default => sub { 1 });
has '+retry' => (default => sub { 10 });
after init => sub {
my ($self) = @_;
$self->setopt(
encoding => '',
verbose => 1,
);
};
after finish => sub {
my ($self, $result) = @_;
if ($self->has_error) {
print "ERROR: $result\n";
} else {
# do the interesting stuff here
printf "Finished downloading %s: %d bytes\n", $self->final_url, length ${$self->data};
}
};
around has_error => sub {
my $orig = shift;
my $self = shift;
return 1 if $self->$orig(@_);
return 1 if $self->getinfo('response_code') =~ m{^5[0-9]{2}$}x;
};
1;
package main;
use strict;
use warnings qw(all);
use Carp;
use YADA;
my $q = YADA->new(
max => 8,
timeout => 30,
);
open(my $fh, '<', 'file_with_urls_per_line.txt')
or croak "can't open queue: $!";
while (my $url = <$fh>) {
chomp $url;
$q->append(sub {
MyDownloader->new($url)
});
}
close $fh;
$q->wait;
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With