Perl,如何并行从url获取数据?

发布于 2024-11-26 06:51:09 字数 610 浏览 1 评论 0 原文

我需要从许多不公开任何服务的网络数据提供商处获取一些数据,因此我必须编写类似的内容,例如使用 WWW::Mechanize:

use WWW::Mechanize;
@urls = ('http://www.first.data.provider.com', 'http://www.second.data.provider.com', 'http://www.third.data.provider.com');
%results = {};
foreach my $url (@urls) {
 $mech = WWW::Mechanize->new();
 $mech->get($url);
 $mech->form_number(1);
 $mech->set_fields('user' => 'myuser', pass => 'mypass');
 $resp = $mech->submit();
 $results{$url} = parse($resp->content());
}
consume(%results);

是否有一些(可能简单;-)方法来获取数据一个通用的 %results 变量,同时,即:并行地,来自所有提供者?

I need to fetch some data from many web data providers, who do not expose any service, so I have to write something like this, using for example WWW::Mechanize:

use WWW::Mechanize;
@urls = ('http://www.first.data.provider.com', 'http://www.second.data.provider.com', 'http://www.third.data.provider.com');
%results = {};
foreach my $url (@urls) {
 $mech = WWW::Mechanize->new();
 $mech->get($url);
 $mech->form_number(1);
 $mech->set_fields('user' => 'myuser', pass => 'mypass');
 $resp = $mech->submit();
 $results{$url} = parse($resp->content());
}
consume(%results);

Is there some (possibly simple ;-) way to fetch data to a common %results variable, simultaneously, i.e: in parallel, from all the providers?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(4

甜扑 2024-12-03 06:51:09

Perl 中应避免使用线程使用线程主要用于
在 Windows 上模拟 UNIX 风格的 fork;除此之外,就没有意义了。

(如果你关心的话,这个事实的实现非常清楚。在 perl 中,
解释器是一个 PerlInterpreter 对象。 线程的方式
工作原理是创建一堆线程,然后创建一个全新的
每个线程中的 PerlInterpreter 对象。线程绝对共享
什么都没有,甚至比子进程还少; fork 让你
写时复制,但是使用线程,所有复制都是在 Perl 中完成的
空间!慢!)

如果你想在同一个进程中同时做很多事情,
在 Perl 中做到这一点的方法是使用事件循环,例如
EV
活动,或
POE,或使用 Coro。 (你可以
还可以根据 AnyEvent API 编写代码,这将让
您使用任何事件循环。这是我更喜欢的。)区别
两者之间是你编写代码的方式。

AnyEvent(以及 EV、Event、
POE 等)迫使您以面向回调的方式编写代码
风格。控制不是从上到下流动,而是在
延续传递风格。函数不返回值,它们调用
其他函数及其结果。这允许你运行很多IO
并行操作——当给定的 IO 操作产生时
结果,将调用处理这些结果的函数。什么时候
另一个IO操作完成后,该函数将被调用。和
很快。

这种方法的缺点是你必须重写你的
代码。所以有一个名为 Coro 的模块为 Perl 提供了真正的功能
(用户空间)线程可以让您从上到下编写代码,
但仍然是非阻塞的。 (这样做的缺点是
大量修改了 Perl 的内部结构。但它似乎工作得很好。)

所以,既然我们不想重写
WWW::Mechanize
今晚,我们将使用 Coro。 Coro 附带一个名为
Coro::LWP 这将使
LWP 的所有调用
非阻塞。它将阻塞当前线程(“coroutine”,在 Coro
行话),但它不会阻塞任何其他线程。这意味着你可以使
一次处理大量请求,并在结果出现时对其进行处理
可用的。 Coro 的扩展能力比您的网络连接更好;
每个协程仅使用几 k 的内存,因此很容易拥有数十
周围有成千上万的人。

考虑到这一点,让我们看一些代码。这是一个启动的程序
并行三个 HTTP 请求,并打印每个请求的长度
回复。这与您正在做的类似,减去实际的
加工;但你可以把你的代码放在我们计算的地方
长度,它的工作原理是一样的。

我们将从通常的 Perl 脚本样板开始:

#!/usr/bin/env perl

use strict;
use warnings;

然后我们将加载 Coro 特定的模块:

use Coro;
use Coro::LWP;
use EV;

Coro 在幕后使用事件循环;如果出现以下情况,它会为您选择一个
你想要的,但我们只会明确指定 EV。这是最好的活动
环形。

然后我们将加载工作所需的模块,这就是:

use WWW::Mechanize;

现在我们准备好编写我们的程序了。首先,我们需要一个 URL 列表:

my @urls = (
    'http://www.google.com/',
    'http://www.jrock.us/',
    'http://stackoverflow.com/',
);

然后我们需要一个函数来生成线程并完成我们的工作。做一个
Coro 上的新线程,您可以像 async { body; 一样调用 async
线;转到此处}
。这将创建一个线程,启动它,然后
继续程序的其余部分。

sub start_thread($) {
    my $url = shift;
    return async {
        say "Starting $url";
        my $mech = WWW::Mechanize->new;
        $mech->get($url);
        printf "Done with $url, %d bytes\n", length $mech->content;
    };
}

这就是我们计划的重点。我们只是把我们正常的LWP程序
在 async 内部,它将神奇地实现非阻塞。 获取块,
但其他协程将在等待它获取数据时运行
来自网络。

现在我们只需要启动线程:

start_thread $_ for @urls;

最后,我们要开始处理事件:

EV::loop;

就是这样。当您运行此命令时,您将看到一些输出,例如:

Starting http://www.google.com/
Starting http://www.jrock.us/
Starting http://stackoverflow.com/
Done with http://www.jrock.us/, 5456 bytes
Done with http://www.google.com/, 9802 bytes
Done with http://stackoverflow.com/, 194555 bytes

如您所见,请求是并行发出的,并且您没有
求助于线程

更新

您在原始帖子中提到您想要限制并行运行的 HTTP 请求的数量。一种方法是使用信号量,
Coro 中的 Coro::Semaphore

信号量就像一个计数器。当您想要使用信号量保护的资源时,您可以“关闭”信号量。这会减少计数器并继续运行您的程序。但是,如果当您尝试降低信号量时计数器为零,您的线程/协程将进入休眠状态,直到它非零。当计数再次增加时,您的线程将唤醒,关闭信号量并继续。最后,当您使用完信号量保护的资源时,您可以“启动”信号量并为其他线程提供运行的机会。

这使您可以控制对共享资源的访问,例如“发出 HTTP 请求”。

您需要做的就是创建一个您的 HTTP 请求线程将共享的信号量:

my $sem = Coro::Semaphore->new(5);

5 表示“让我们在阻塞之前调用 'down' 5 次”,或者换句话说,“让有 5 个并发 HTTP 请求”。

在添加任何代码之前,让我们先讨论一下可能会出现什么问题。可能发生的不好的事情是线程“向下”信号量,但在完成后永远不会“向上”信号量。那么没有任何东西可以使用该资源,并且您的程序最终可能什么也不做。发生这种情况的方式有很多种。如果你写了一些像 $sem->down; 这样的代码做某事; $sem->up,你可能会觉得安全,但是如果“做某事”抛出异常怎么办?然后信号量就会被放下,这很糟糕。

幸运的是,Perl 可以很容易地拥有作用域 Guard 对象,当保存该对象的变量超出范围时,它会自动运行代码范围。我们可以将代码设置为 $sem->up,这样我们就不必担心在不打算持有资源的情况下持有资源。

Coro::Semaphore 集成了守卫的概念,这意味着您可以说 my $guard = $sem->guard,当控制流离开作用域时,它将自动降低信号量并提高信号量你呼叫了守卫

考虑到这一点,为了限制并行请求的数量,我们所要做的就是保护使用 HTTP 的协程顶部的信号量:

async {
    say "Waiting for semaphore";
    my $guard = $sem->guard;
    say "Starting";
    ...;
    return result;
}

解决注释:

如果您不想要您的程序要想永远活下去,有几种选择。一种是在另一个线程中运行事件循环,然后在每个工作线程上join。这也允许您将结果从线程传递到主程序:

async { EV::loop };

# start all threads
my @running = map { start_thread $_ } @urls;

# wait for each one to return
my @results = map { $_->join } @running;

for my $result (@results) {
    say $result->[0], ': ', $result->[1];
}

您的线程可以返回如下结果:

sub start_thread($) {
    return async {
        ...;
        return [$url, length $mech->content];
    }
}

这是在数据结构中收集所有结果的一种方法。如果您不想返回内容,请记住所有协程共享状态。因此,您可以将:

my %results;

放在程序的顶部,并让每个协程更新结果:

async {
    ...;
    $results{$url} = 'whatever';
};

当所有协程完成运行时,您的哈希将填充结果。不过,您必须加入每个协程才能知道答案何时准备就绪。

最后,如果您将此作为 Web 服务的一部分执行,则应使用支持协程的 Web 服务器,例如 Corona。这将在协程中运行每个 HTTP 请求,除了能够并行发送 HTTP 请求之外,还允许您并行处理多个 HTTP 请求。这将很好地利用内存、CPU 和网络资源,并且非常容易维护!

(基本上,您可以将我们的程序从上面剪切粘贴到处理 HTTP 请求的协程中;创建新的协程并在协程内join 就可以了。)

threads are to be avoided in Perl. use threads is mostly for
emulating UNIX-style fork on Windows; beyond that, it's pointless.

(If you care, the implementation makes this fact very clear. In perl,
the interpreter is a PerlInterpreter object. The way threads
works is by making a bunch of threads, and then creating a brand-new
PerlInterpreter object in each thread. Threads share absolutely
nothing, even less than child processes do; fork gets you
copy-on-write, but with threads, all the copying is done in Perl
space! Slow!)

If you'd like to do many things concurrently in the same process, the
way to do that in Perl is with an event loop, like
EV,
Event, or
POE, or by using Coro. (You can
also write your code in terms of the AnyEvent API, which will let
you use any event loop. This is what I prefer.) The difference
between the two is how you write your code.

AnyEvent (and EV, Event,
POE, and so on) forces you to write your code in a callback-oriented
style. Instead of control flowing from top to bottom, control is in a
continuation-passing style. Functions don't return values, they call
other functions with their results. This allows you to run many IO
operations in parallel -- when a given IO operation has yielded
results, your function to handle those results will be called. When
another IO operation is complete, that function will be called. And
so on.

The disadvantage of this approach is that you have to rewrite your
code. So there's a module called Coro that gives Perl real
(user-space) threads that will let you write your code top-to-bottom,
but still be non-blocking. (The disadvantage of this is that it
heavily modifies Perl's internals. But it seems to work pretty well.)

So, since we don't want to rewrite
WWW::Mechanize
tonight, we're going to use Coro. Coro comes with a module called
Coro::LWP that will make
all calls to LWP be
non-blocking. It will block the current thread ("coroutine", in Coro
lingo), but it won't block any other threads. That means you can make
a ton of requests all at once, and process the results as they become
available. And Coro will scale better than your network connection;
each coroutine uses just a few k of memory, so it's easy to have tens
of thousands of them around.

With that in mind, let's see some code. Here's a program that starts
three HTTP requests in parallel, and prints the length of each
response. It's similar to what you're doing, minus the actual
processing; but you can just put your code in where we calculate the
length and it will work the same.

We'll start off with the usual Perl script boilerplate:

#!/usr/bin/env perl

use strict;
use warnings;

Then we'll load the Coro-specific modules:

use Coro;
use Coro::LWP;
use EV;

Coro uses an event loop behind the scenes; it will pick one for you if
you want, but we'll just specify EV explicitly. It's the best event
loop.

Then we'll load the modules we need for our work, which is just:

use WWW::Mechanize;

Now we're ready to write our program. First, we need a list of URLs:

my @urls = (
    'http://www.google.com/',
    'http://www.jrock.us/',
    'http://stackoverflow.com/',
);

Then we need a function to spawn a thread and do our work. To make a
new thread on Coro, you call async like async { body; of the
thread; goes here }
. This will create a thread, start it, and
continue with the rest of the program.

sub start_thread($) {
    my $url = shift;
    return async {
        say "Starting $url";
        my $mech = WWW::Mechanize->new;
        $mech->get($url);
        printf "Done with $url, %d bytes\n", length $mech->content;
    };
}

So here's the meat of our program. We just put our normal LWP program
inside async, and it will be magically non-blocking. get blocks,
but the other coroutines will run while waiting for it to get the data
from the network.

Now we just need to start the threads:

start_thread $_ for @urls;

And finally, we want to start handling events:

EV::loop;

And that's it. When you run this, you'll see some output like:

Starting http://www.google.com/
Starting http://www.jrock.us/
Starting http://stackoverflow.com/
Done with http://www.jrock.us/, 5456 bytes
Done with http://www.google.com/, 9802 bytes
Done with http://stackoverflow.com/, 194555 bytes

As you can see, the requests are made in parallel, and you didn't have
to resort to threads!

Update

You mentioned in your original post that you want to limit the number of HTTP requests that run in parallel. One way to do that is with a semaphore,
Coro::Semaphore in Coro.

A semaphore is like a counter. When you want to use the resource that a semaphore protects, you "down" the semaphore. This decrements the counter and continues running your program. But if the counter is at zero when you try to down the semaphore, your thread/coroutine will go to sleep until it is non-zero. When the count goes up again, your thread will wake up, down the semaphore, and continue. Finally, when you're done using the resource that the semaphore protects, you "up" the semaphore and give other threads the chance to run.

This lets you control access to a shared resource, like "making HTTP requests".

All you need to do is create a semaphore that your HTTP request threads will share:

my $sem = Coro::Semaphore->new(5);

The 5 means "let us call 'down' 5 times before we block", or, in other words, "let there be 5 concurrent HTTP requests".

Before we add any code, let's talk about what can go wrong. Something bad that could happen is a thread "down"-ing the semaphore, but never "up"-ing it when it's done. Then nothing can ever use that resource, and your program will probably end up doing nothing. There are lots of ways this could happen. If you wrote some code like $sem->down; do something; $sem->up, you might feel safe, but what if "do something" throws an exception? Then the semaphore will be left down, and that's bad.

Fortunately, Perl makes it easy to have scope Guard objects, that will automatically run code when the variable holding the object goes out of scope. We can make the code be $sem->up, and then we'll never have to worry about holding a resource when we don't intend to.

Coro::Semaphore integrates the concept of guards, meaning you can say my $guard = $sem->guard, and that will automatically down the semaphore and up it when control flows away from the scope where you called guard.

With that in mind, all we have to do to limit the number of parallel requests is guard the semaphore at the top of our HTTP-using coroutines:

async {
    say "Waiting for semaphore";
    my $guard = $sem->guard;
    say "Starting";
    ...;
    return result;
}

Addressing the comments:

If you don't want your program to live forever, there are a few options. One is to run the event loop in another thread, and then join on each worker thread. This lets you pass results from the thread to the main program, too:

async { EV::loop };

# start all threads
my @running = map { start_thread $_ } @urls;

# wait for each one to return
my @results = map { $_->join } @running;

for my $result (@results) {
    say $result->[0], ': ', $result->[1];
}

Your threads can return results like:

sub start_thread($) {
    return async {
        ...;
        return [$url, length $mech->content];
    }
}

This is one way to collect all your results in a data structure. If you don't want to return things, remember that all the coroutines share state. So you can put:

my %results;

at the top of your program, and have each coroutine update the results:

async {
    ...;
    $results{$url} = 'whatever';
};

When all the coroutines are done running, your hash will be filled with the results. You'll have to join each coroutine to know when the answer is ready, though.

Finally, if you are doing this as part of a web service, you should use a coroutine-aware web server like Corona. This will run each HTTP request in a coroutine, allowing you to handle multiple HTTP requests in parallel, in addition to being able to send HTTP requests in parallel. This will make very good use of memory, CPU, and network resources, and will be pretty easy to maintain!

(You can basically cut-n-paste our program from above into the coroutine that handles the HTTP request; it's fine to create new coroutines and join inside a coroutine.)

沙沙粒小 2024-12-03 06:51:09

看起来 ParallelUserAgent 就是您要找的。

Looks like ParallelUserAgent is what you're looking for.

梦里人 2024-12-03 06:51:09

好吧,您可以创建线程来执行此操作 - 具体请参阅 perldoc perlthrtutThread::Queue。所以,它可能看起来像这样。

use WWW::Mechanize;
use threads;
use threads::shared;
use Thread::Queue;
my @urls=(#whatever
);
my %results :shared;
my $queue=Thread::Queue->new();
foreach(@urls)
{
   $queue->enqueue($_);
}

my @threads=();
my $num_threads=16; #Or whatever...a pre-specified number of threads.

foreach(1..$num_threads)
{
    push @threads,threads->create(\&mechanize);
}

foreach(@threads)
{
   $queue->enqueue(undef);
}

foreach(@threads)
{
   $_->join();
}

consume(\%results);

sub mechanize
{
    while(my $url=$queue->dequeue)
    {
        my $mech=WWW::Mechanize->new();
        $mech->get($url);
        $mech->form_number(1);
        $mech->set_fields('user' => 'myuser', pass => 'mypass');
        $resp = $mech->submit();
        $results{$url} = parse($resp->content());
    }
}

请注意,由于您将结果存储在散列中(而不是将内容写入文件),因此您不需要任何类型的锁定除非存在覆盖值的危险。在这种情况下,您需要通过将

$results{$url} = parse($resp->content());

替换为来锁定 %results

{
    lock(%results);
    $results{$url} = parse($resp->content());
}

Well, you could create threads to do it--specifically see perldoc perlthrtut and Thread::Queue. So, it might look something like this.

use WWW::Mechanize;
use threads;
use threads::shared;
use Thread::Queue;
my @urls=(#whatever
);
my %results :shared;
my $queue=Thread::Queue->new();
foreach(@urls)
{
   $queue->enqueue($_);
}

my @threads=();
my $num_threads=16; #Or whatever...a pre-specified number of threads.

foreach(1..$num_threads)
{
    push @threads,threads->create(\&mechanize);
}

foreach(@threads)
{
   $queue->enqueue(undef);
}

foreach(@threads)
{
   $_->join();
}

consume(\%results);

sub mechanize
{
    while(my $url=$queue->dequeue)
    {
        my $mech=WWW::Mechanize->new();
        $mech->get($url);
        $mech->form_number(1);
        $mech->set_fields('user' => 'myuser', pass => 'mypass');
        $resp = $mech->submit();
        $results{$url} = parse($resp->content());
    }
}

Note that since you're storing your results in a hash (instead of writing stuff to a file), you shouldn't need any kind of locking unless there's a danger of overwriting values. In which case, you'll want to lock %results by replacing

$results{$url} = parse($resp->content());

with

{
    lock(%results);
    $results{$url} = parse($resp->content());
}
秉烛思 2024-12-03 06:51:09

尝试 https://metacpan.org/module/Parallel::Iterator - 看到一个非常上周对此进行了很好的介绍,其中一个示例是 URL 的并行检索——它也在 pod 示例中进行了介绍。它比手动使用线程更简单(尽管它在下面使用了 fork)。

据我所知,您仍然可以使用 WWW::Mechanize,但要避免搞乱线程之间的内存共享。这是此任务的更高级别模型,并且可能更简单一些,使@Jack Maney 的机械化例程的主要逻辑保持不变。

Try https://metacpan.org/module/Parallel::Iterator -- saw a very good presentation about it last week, and one of the examples was parallel retrieval of URLs -- it's also covered in the pod example. It's simpler than using threads manually (although it uses fork underneath).

As far as I can tell, you'd still be able to use WWW::Mechanize, but avoid messing about with memory sharing between threads. It's a higher-level model for this task, and might be a little simpler, leaving the main logic of @Jack Maney's mechanize routine intact.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文