在 Perl 中分叉多个子项并使用管道进行双向通信

发布于 2024-12-20 00:09:49 字数 417 浏览 2 评论 0原文

我正在尝试创建一个具有多种处理功能的小型 Perl 程序。由于我的要求中存在一些小的变化,因此我无法在任何地方找到任何类似的示例脚本。

我需要从 STDIN 读取一个大日志文件,并将前 N 行(又是一个大数字)提供给第一个子进程,然后将接下来的 N 行提供给第二个子进程等。我还定义了一个常量,它是允许同时运行的最大子进程数。一旦子进程达到最大数量,父进程将等待子进程完成其工作并再给它 N 行。

父进程还收集每个子进程完成时返回的多行(5-10 行)输出并将其存储在数组中。然后Parent继续处理这个数组内容并最终显示结果。

是否有更好的示例脚本可供我修改和使用,或者有人可以通过在此处分享一个来帮助我吗?我更喜欢仅使用管道进行进程间通信,并尽可能让事情变得简单。

编辑: 有人可以举例说明如何仅使用 IO::Handle 模块中的管道来完成此操作吗?

I am trying to create a small Perl program which has multi processing capabilities. Since there are some small changes here and there in my requirements, I am not able to find any similar sample scripts anywhere.

I need to read a big logfile from STDIN and give first N number (a big number again) of lines to the first child process and then next N number of lines to the second child process etc. I have also a constant defined which is the maximum number of child processes allowed to run concurrently. Once maximum number of children reached, parent will wait for a child to finish its job and give another N number of lines to it.

The parent process also collects a multi-line (5-10 lines) output returned by each child process when they finish and stores it in an array. Parent then continues to process this array contents and display the results finally.

Is there a better sample script which I can modify and use or could someone help me by sharing one here? I prefer using only pipes for process intercommunication and keep things simpler as much as possible.

Edit:
Can someone show an example how this can be accomplished only using pipes from IO::Handle module ?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

罪#恶を代价 2024-12-27 00:09:49

使用 Forks::Super,这很容易限制同时进程的数量并处理进程间通信。例如,

use Forks::Super MAX_PROC => 10,     # allow 10 simultaneous processes
                 ON_BUSY => 'queue'; # don't block when >=10 jobs are active

@loglines = <>;

# set up all the background jobs
while (@loglines > 0) {
    $pid = fork {
        args => [ splice @loglines, 0, $N ],  # to pass to sub, below
        child_fh => "out",    # make child STDOUT readable by parent
        sub => sub {
            my @loglines = @_;
            my @result = ... do something with loglines ...
            print @results;   # use $pid->read_stdout() to read in child
        }
    };
}

# get the results
while ($pid = waitpid -1, 0) {
    last if $pid == -1;
    my @results_from_job = $pid->read_stdout();
    push @results, @results_from_job;
}

Use Forks::Super, which makes it easy to throttle the number of simultaneous processes and handle the interprocess communication. For example,

use Forks::Super MAX_PROC => 10,     # allow 10 simultaneous processes
                 ON_BUSY => 'queue'; # don't block when >=10 jobs are active

@loglines = <>;

# set up all the background jobs
while (@loglines > 0) {
    $pid = fork {
        args => [ splice @loglines, 0, $N ],  # to pass to sub, below
        child_fh => "out",    # make child STDOUT readable by parent
        sub => sub {
            my @loglines = @_;
            my @result = ... do something with loglines ...
            print @results;   # use $pid->read_stdout() to read in child
        }
    };
}

# get the results
while ($pid = waitpid -1, 0) {
    last if $pid == -1;
    my @results_from_job = $pid->read_stdout();
    push @results, @results_from_job;
}
哭了丶谁疼 2024-12-27 00:09:49

我发现线程对于此类过程要简单得多。您需要线程和 Threads::Queue 模块。该过程是建立一个队列来为工作线程提供数据,并建立一个队列来让它们返回结果。工作线程只是读取记录、处理记录并将结果发回的函数。我只是将这段代码放在一起并没有对其进行测试,因此它可能有错误,但我认为显示了总体思路:

use threads ();
use Thread::Queue;
#
#
#            Set limit on number of workers
#
my $MAX_THREADS = 5;
my $EOD = "\n\n";
#
#
#            Need a queue to feed the workers
#            and one for them to return results
#
my $Qresult  = Thread::Queue->new();
my $rec;
my $n;
#
#
#            load STDIN into the input queue
#
my $Qin = Thread::Queue->new(<>);
#
#
#            start worker threads
#
for($n = 0; $n < $MAX_THREADS; ++$n)
{
    async{ProcessRecord($n);};
    $Qin->enqueue($EOD);            #    need terminator for each
}
#
#
#
#            Wait for the results to come in
#
$n = 0;
while($n < $MAX_THREADS)
{
            $rec = $q->dequeue();
            if($rec eq $EOD)
            {
                ++$n;
                next;
            }

            :
            :
            :
    #-- process result --#
            :
            :
            :


    threads->yield();    #    let other threads get a chance
    sleep 1;
}
exit;    
######################################    
#
#
#            Worker threads draw from the queue
#            when a "terminator" is read, quit;
#
sub ProcessRecord 
{
    my $rec;
    my $result;

    while(1)
    {
        $rec = $Qin->dequeue();
        last if $rec eq $EOD;
            :
            :
            :
        #-- process record --#
            :
            :
            :
        $Qresult->enqueue($result);

        threads->yield();    #    let other threads get a chance
    }

    threads->exit();    
}

I have found threads to be much simpler for this sort of process. You need the threads and Threads::Queue modules. The process is to set up a queue to feed the worker threads and one for them to return their results. The worker thread is just a function to read a record, process it and send a result back. I just put this code together and have not tested it so it may be buggy but I think shows the general idea:

use threads ();
use Thread::Queue;
#
#
#            Set limit on number of workers
#
my $MAX_THREADS = 5;
my $EOD = "\n\n";
#
#
#            Need a queue to feed the workers
#            and one for them to return results
#
my $Qresult  = Thread::Queue->new();
my $rec;
my $n;
#
#
#            load STDIN into the input queue
#
my $Qin = Thread::Queue->new(<>);
#
#
#            start worker threads
#
for($n = 0; $n < $MAX_THREADS; ++$n)
{
    async{ProcessRecord($n);};
    $Qin->enqueue($EOD);            #    need terminator for each
}
#
#
#
#            Wait for the results to come in
#
$n = 0;
while($n < $MAX_THREADS)
{
            $rec = $q->dequeue();
            if($rec eq $EOD)
            {
                ++$n;
                next;
            }

            :
            :
            :
    #-- process result --#
            :
            :
            :


    threads->yield();    #    let other threads get a chance
    sleep 1;
}
exit;    
######################################    
#
#
#            Worker threads draw from the queue
#            when a "terminator" is read, quit;
#
sub ProcessRecord 
{
    my $rec;
    my $result;

    while(1)
    {
        $rec = $Qin->dequeue();
        last if $rec eq $EOD;
            :
            :
            :
        #-- process record --#
            :
            :
            :
        $Qresult->enqueue($result);

        threads->yield();    #    let other threads get a chance
    }

    threads->exit();    
}
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文