Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Perl, how to fetch data from urls in parallel?

I need to fetch some data from many web data providers, who do not expose any service, so I have to write something like this, using for example WWW::Mechanize:

use WWW::Mechanize;
@urls = ('http://www.first.data.provider.com', 'http://www.second.data.provider.com', 'http://www.third.data.provider.com');
%results = {};
foreach my $url (@urls) {
 $mech = WWW::Mechanize->new();
 $mech->get($url);
 $mech->form_number(1);
 $mech->set_fields('user' => 'myuser', pass => 'mypass');
 $resp = $mech->submit();
 $results{$url} = parse($resp->content());
}
consume(%results);

Is there some (possibly simple ;-) way to fetch data to a common %results variable, simultaneously, i.e: in parallel, from all the providers?

like image 458
MarcoS Avatar asked Jul 25 '11 09:07

MarcoS


2 Answers

Looks like ParallelUserAgent is what you're looking for.

like image 25
Dave Cross Avatar answered Oct 01 '22 06:10

Dave Cross


threads are to be avoided in Perl. use threads is mostly for emulating UNIX-style fork on Windows; beyond that, it's pointless.

(If you care, the implementation makes this fact very clear. In perl, the interpreter is a PerlInterpreter object. The way threads works is by making a bunch of threads, and then creating a brand-new PerlInterpreter object in each thread. Threads share absolutely nothing, even less than child processes do; fork gets you copy-on-write, but with threads, all the copying is done in Perl space! Slow!)

If you'd like to do many things concurrently in the same process, the way to do that in Perl is with an event loop, like EV, Event, or POE, or by using Coro. (You can also write your code in terms of the AnyEvent API, which will let you use any event loop. This is what I prefer.) The difference between the two is how you write your code.

AnyEvent (and EV, Event, POE, and so on) forces you to write your code in a callback-oriented style. Instead of control flowing from top to bottom, control is in a continuation-passing style. Functions don't return values, they call other functions with their results. This allows you to run many IO operations in parallel -- when a given IO operation has yielded results, your function to handle those results will be called. When another IO operation is complete, that function will be called. And so on.

The disadvantage of this approach is that you have to rewrite your code. So there's a module called Coro that gives Perl real (user-space) threads that will let you write your code top-to-bottom, but still be non-blocking. (The disadvantage of this is that it heavily modifies Perl's internals. But it seems to work pretty well.)

So, since we don't want to rewrite WWW::Mechanize tonight, we're going to use Coro. Coro comes with a module called Coro::LWP that will make all calls to LWP be non-blocking. It will block the current thread ("coroutine", in Coro lingo), but it won't block any other threads. That means you can make a ton of requests all at once, and process the results as they become available. And Coro will scale better than your network connection; each coroutine uses just a few k of memory, so it's easy to have tens of thousands of them around.

With that in mind, let's see some code. Here's a program that starts three HTTP requests in parallel, and prints the length of each response. It's similar to what you're doing, minus the actual processing; but you can just put your code in where we calculate the length and it will work the same.

We'll start off with the usual Perl script boilerplate:

#!/usr/bin/env perl

use strict;
use warnings;

Then we'll load the Coro-specific modules:

use Coro;
use Coro::LWP;
use EV;

Coro uses an event loop behind the scenes; it will pick one for you if you want, but we'll just specify EV explicitly. It's the best event loop.

Then we'll load the modules we need for our work, which is just:

use WWW::Mechanize;

Now we're ready to write our program. First, we need a list of URLs:

my @urls = (
    'http://www.google.com/',
    'http://www.jrock.us/',
    'http://stackoverflow.com/',
);

Then we need a function to spawn a thread and do our work. To make a new thread on Coro, you call async like async { body; of the thread; goes here }. This will create a thread, start it, and continue with the rest of the program.

sub start_thread($) {
    my $url = shift;
    return async {
        say "Starting $url";
        my $mech = WWW::Mechanize->new;
        $mech->get($url);
        printf "Done with $url, %d bytes\n", length $mech->content;
    };
}

So here's the meat of our program. We just put our normal LWP program inside async, and it will be magically non-blocking. get blocks, but the other coroutines will run while waiting for it to get the data from the network.

Now we just need to start the threads:

start_thread $_ for @urls;

And finally, we want to start handling events:

EV::loop;

And that's it. When you run this, you'll see some output like:

Starting http://www.google.com/
Starting http://www.jrock.us/
Starting http://stackoverflow.com/
Done with http://www.jrock.us/, 5456 bytes
Done with http://www.google.com/, 9802 bytes
Done with http://stackoverflow.com/, 194555 bytes

As you can see, the requests are made in parallel, and you didn't have to resort to threads!

Update

You mentioned in your original post that you want to limit the number of HTTP requests that run in parallel. One way to do that is with a semaphore, Coro::Semaphore in Coro.

A semaphore is like a counter. When you want to use the resource that a semaphore protects, you "down" the semaphore. This decrements the counter and continues running your program. But if the counter is at zero when you try to down the semaphore, your thread/coroutine will go to sleep until it is non-zero. When the count goes up again, your thread will wake up, down the semaphore, and continue. Finally, when you're done using the resource that the semaphore protects, you "up" the semaphore and give other threads the chance to run.

This lets you control access to a shared resource, like "making HTTP requests".

All you need to do is create a semaphore that your HTTP request threads will share:

my $sem = Coro::Semaphore->new(5);

The 5 means "let us call 'down' 5 times before we block", or, in other words, "let there be 5 concurrent HTTP requests".

Before we add any code, let's talk about what can go wrong. Something bad that could happen is a thread "down"-ing the semaphore, but never "up"-ing it when it's done. Then nothing can ever use that resource, and your program will probably end up doing nothing. There are lots of ways this could happen. If you wrote some code like $sem->down; do something; $sem->up, you might feel safe, but what if "do something" throws an exception? Then the semaphore will be left down, and that's bad.

Fortunately, Perl makes it easy to have scope Guard objects, that will automatically run code when the variable holding the object goes out of scope. We can make the code be $sem->up, and then we'll never have to worry about holding a resource when we don't intend to.

Coro::Semaphore integrates the concept of guards, meaning you can say my $guard = $sem->guard, and that will automatically down the semaphore and up it when control flows away from the scope where you called guard.

With that in mind, all we have to do to limit the number of parallel requests is guard the semaphore at the top of our HTTP-using coroutines:

async {
    say "Waiting for semaphore";
    my $guard = $sem->guard;
    say "Starting";
    ...;
    return result;
}

Addressing the comments:

If you don't want your program to live forever, there are a few options. One is to run the event loop in another thread, and then join on each worker thread. This lets you pass results from the thread to the main program, too:

async { EV::loop };

# start all threads
my @running = map { start_thread $_ } @urls;

# wait for each one to return
my @results = map { $_->join } @running;

for my $result (@results) {
    say $result->[0], ': ', $result->[1];
}

Your threads can return results like:

sub start_thread($) {
    return async {
        ...;
        return [$url, length $mech->content];
    }
}

This is one way to collect all your results in a data structure. If you don't want to return things, remember that all the coroutines share state. So you can put:

my %results;

at the top of your program, and have each coroutine update the results:

async {
    ...;
    $results{$url} = 'whatever';
};

When all the coroutines are done running, your hash will be filled with the results. You'll have to join each coroutine to know when the answer is ready, though.

Finally, if you are doing this as part of a web service, you should use a coroutine-aware web server like Corona. This will run each HTTP request in a coroutine, allowing you to handle multiple HTTP requests in parallel, in addition to being able to send HTTP requests in parallel. This will make very good use of memory, CPU, and network resources, and will be pretty easy to maintain!

(You can basically cut-n-paste our program from above into the coroutine that handles the HTTP request; it's fine to create new coroutines and join inside a coroutine.)

like image 56
jrockway Avatar answered Oct 01 '22 07:10

jrockway