通过套接字对的多个进程有时会挂起

发布于 2024-11-29 14:03:52 字数 6895 浏览 0 评论 0原文

我正在尝试实现一些可以为我提供解决方案的东西:

       | --> cmd3 --> cmd4 -->
cmd2-->|
       | --> cmd5 --> cmd6 -->

等等...

这是进程的多次执行,并通过其他进程的线程链传递结果,每个命令链应该在不同的线程中运行。 我选择socketpair来实现IPC,因为pipe有一个瓶颈,缓冲区大小限制为64K。 当我用单链测试程序时 - 它按预期工作,但是当我运行主命令及其输出时,我通过套接字对发送以读取每个线程中多个进程的结尾 - 程序卡住了(看起来像死锁)

什么我做错了:

#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <signal.h>
#include <sys/time.h>
#include <sys/types.h>
#include <sys/ioctl.h>
#include <fcntl.h>
#include <sys/socket.h>

typedef struct command {
    char** argv;
    int num_children;
    struct command* master_cmd;
    struct command** chains;
    struct command* next;
    int fd;
} command;

void be_child(command* cmd);
int execute_master_command_and_pipe_to_childs(command* cmd, int input);
int run_pipeline_sockets(command *cmd, int input);
void waitfor(int fd);

int main(int argc, char* argv[]) {

    handle_segfault();

    command* cmd1 = (command*) malloc(sizeof(command));
    command* cmd2 = (command*) malloc(sizeof(command));
    command* cmd3 = (command*) malloc(sizeof(command));
    command* cmd4 = (command*) malloc(sizeof(command));
    command* cmd5 = (command*) malloc(sizeof(command));
    command* cmd6 = (command*) malloc(sizeof(command));

    command* chains1[2];

    chains1[0] = cmd3;
    chains1[1] = cmd5;

    char* args1[] = { "cat", "/tmp/test.log", NULL };
    char* args3[] = { "sort", NULL, NULL };
    char* args4[] = { "wc", "-l", NULL };
    char* args5[] = { "wc", "-l", NULL };
    char* args6[] = { "wc", "-l", NULL };

    cmd1->argv = args1;
    cmd2->argv = NULL;
    cmd3->argv = args3;
    cmd4->argv = args4;
    cmd5->argv = args5;
    cmd6->argv = args6;

    cmd1->master_cmd = NULL;
    cmd1->next = NULL;
    cmd1->chains = NULL;
    cmd1->num_children = -1;

    cmd2->master_cmd = cmd1;
    cmd2->chains = chains1;
    cmd2->next = NULL;
    cmd2->num_children = 2;

    cmd3->master_cmd = NULL;
    cmd3->next = cmd4;
    cmd3->chains = NULL;
    cmd3->num_children = -1;

    cmd4->master_cmd = NULL;
    cmd4->next = NULL;
    cmd4->chains = NULL;
    cmd4->num_children = -1;

    cmd5->master_cmd = NULL;
    cmd5->next = cmd6;
    cmd5->chains = NULL;
    cmd5->num_children = -1;

    cmd6->master_cmd = NULL;
    cmd6->next = NULL;
    cmd6->chains = NULL;
    cmd6->num_children = -1;

    int rc = execute_master_command_and_pipe_to_childs(cmd2, -1);

    return 0;
}

int execute_master_command_and_pipe_to_childs(command* cmd, int input) {

    int num_children = cmd->num_children;
    int write_pipes[num_children];
    pthread_t threads[num_children];
    command* master_cmd = cmd->master_cmd;

    pid_t pid;
    int i;

    for (i = 0; i < num_children; i++) {
        int new_pipe[2];
        if (socketpair(AF_LOCAL, SOCK_STREAM, 0, new_pipe) < 0) {
            int errnum = errno;
            fprintf(STDERR_FILENO, "ERROR (%d: %s)\n", errnum,
                    strerror(errnum));
            return EXIT_FAILURE;
        }

        if (cmd->chains[i] != NULL) {
            cmd->chains[i]->fd = new_pipe[0];

            if (pthread_create(&threads[i], NULL, (void *) be_child,
                    cmd->chains[i]) != 0) {
                perror("pthread_create"), exit(1);
            }

            write_pipes[i] = new_pipe[1];
        } else {
            perror("ERROR\n");
        }
    }

    if (input != -1) {
        waitfor(input);
    }

    int pipefd = run_pipeline_sockets(master_cmd, input);

    int buffer[1024];

    int len = 0;
    while ((len = read(pipefd, buffer, sizeof(buffer))) != 0) {
        int j;
        for (j = 0; j < num_children; j++) {
            if (write(write_pipes[j], &buffer, len) != len) {
                fprintf(STDERR_FILENO, "Write failed (child %d)\n", j);
                exit(1);
            }

        }

    }

    close(pipefd);

    for (i = 0; i < num_children; i++) {
        close(write_pipes[i]);
    }

    for (i = 0; i < num_children; i++) {
        if (pthread_join(threads[i], NULL) != 0) {
            perror("pthread_join"), exit(1);
        }
    }

}

void waitfor(int fd) {
    fd_set rfds;
    struct timeval tv;
    int retval;

    FD_ZERO(&rfds);
    FD_SET(fd, &rfds);

    tv.tv_sec = 0;
    tv.tv_usec = 500000;

    retval = select(fd + 1, &rfds, NULL, NULL, &tv);

    if (retval == -1)
        perror("select()");
    else if (retval) {
        printf("Data is available now on: %d\n", fd);
    } else {
        printf("No data on: %d\n", fd);
        ///waitfor(fd);
    }
}

void be_child(command* cmd) {

    printf(
            "fd = %d , argv = %s , args = %s , next = %d , master_cmd = %d , next_chain = %d\n",
            cmd->fd, cmd->argv[0], cmd->argv[1], cmd->next, cmd->master_cmd,
            cmd->chains);

    waitfor(cmd->fd);

    int fd = run_pipeline_sockets(cmd, cmd->fd);

    waitfor(fd);

    int buffer[1024];

    int len = 0;

    while ((len = read(fd, buffer, sizeof(buffer))) != 0) {
        write(STDERR_FILENO, &buffer, len);
    }

    close(cmd->fd);
    close(fd);

}

int run_pipeline_sockets(command *cmd, int input) {
    int pfds[2] = { -1, -1 };
    int pid = -1;

    if (socketpair(AF_LOCAL, SOCK_STREAM, 0, pfds) < 0) {
        int errnum = errno;
        fprintf(STDERR_FILENO, "socketpair failed (%d: %s)\n", errnum,
                strerror(errnum));
        return EXIT_FAILURE;
    }

    if ((pid = fork()) == 0) { /* child */
        if (input != -1) {
            dup2(input, STDIN_FILENO);
            close(input);
        }
        if (pfds[1] != -1) {
            dup2(pfds[1], STDOUT_FILENO);
            close(pfds[1]);
        }
        if (pfds[0] != -1) {
            close(pfds[0]);
        }
        execvp(cmd->argv[0], cmd->argv);
        exit(1);
    } else { /* parent */
        if (input != -1) {
            close(input);
        }
        if (pfds[1] != -1) {
            close(pfds[1]);
        }
        if (cmd->next != NULL) {
            run_pipeline_sockets(cmd->next, pfds[0]);
        } else {
            return pfds[0];
        }
    }
}

void segfault_sigaction(int signal, siginfo_t *si, void *arg) {
    printf("Caught segfault at address %p\n", si->si_addr);
    printf("Caught segfault errno %p\n", si->si_errno);
    exit(0);
}

void handle_segfault(void) {
    struct sigaction sa;

    memset(&sa, 0, sizeof(sigaction));
    sigemptyset(&sa.sa_mask);
    sa.sa_sigaction = segfault_sigaction;
    sa.sa_flags = SA_SIGINFO;

    sigaction(SIGSEGV, &sa, NULL);
}

I am trying to implement something that will give me a solution for:

       | --> cmd3 --> cmd4 -->
cmd2-->|
       | --> cmd5 --> cmd6 -->

and so on...

This is multiple executions of processes and pipe the results via chains of other's processes with threads, each commands chain should run in different thread.
I choose socketpair for the implementation of IPC, because pipe has a a bottleneck with the buffer size limit 64K.
When I test the program with single chain - it's work as expected, but when I am running master command and the output of it I send via socketpair to read end of multiple processes in each thread - the program stuck (look like a deadlock)

Whats I am doing wrong:

#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <signal.h>
#include <sys/time.h>
#include <sys/types.h>
#include <sys/ioctl.h>
#include <fcntl.h>
#include <sys/socket.h>

typedef struct command {
    char** argv;
    int num_children;
    struct command* master_cmd;
    struct command** chains;
    struct command* next;
    int fd;
} command;

void be_child(command* cmd);
int execute_master_command_and_pipe_to_childs(command* cmd, int input);
int run_pipeline_sockets(command *cmd, int input);
void waitfor(int fd);

int main(int argc, char* argv[]) {

    handle_segfault();

    command* cmd1 = (command*) malloc(sizeof(command));
    command* cmd2 = (command*) malloc(sizeof(command));
    command* cmd3 = (command*) malloc(sizeof(command));
    command* cmd4 = (command*) malloc(sizeof(command));
    command* cmd5 = (command*) malloc(sizeof(command));
    command* cmd6 = (command*) malloc(sizeof(command));

    command* chains1[2];

    chains1[0] = cmd3;
    chains1[1] = cmd5;

    char* args1[] = { "cat", "/tmp/test.log", NULL };
    char* args3[] = { "sort", NULL, NULL };
    char* args4[] = { "wc", "-l", NULL };
    char* args5[] = { "wc", "-l", NULL };
    char* args6[] = { "wc", "-l", NULL };

    cmd1->argv = args1;
    cmd2->argv = NULL;
    cmd3->argv = args3;
    cmd4->argv = args4;
    cmd5->argv = args5;
    cmd6->argv = args6;

    cmd1->master_cmd = NULL;
    cmd1->next = NULL;
    cmd1->chains = NULL;
    cmd1->num_children = -1;

    cmd2->master_cmd = cmd1;
    cmd2->chains = chains1;
    cmd2->next = NULL;
    cmd2->num_children = 2;

    cmd3->master_cmd = NULL;
    cmd3->next = cmd4;
    cmd3->chains = NULL;
    cmd3->num_children = -1;

    cmd4->master_cmd = NULL;
    cmd4->next = NULL;
    cmd4->chains = NULL;
    cmd4->num_children = -1;

    cmd5->master_cmd = NULL;
    cmd5->next = cmd6;
    cmd5->chains = NULL;
    cmd5->num_children = -1;

    cmd6->master_cmd = NULL;
    cmd6->next = NULL;
    cmd6->chains = NULL;
    cmd6->num_children = -1;

    int rc = execute_master_command_and_pipe_to_childs(cmd2, -1);

    return 0;
}

int execute_master_command_and_pipe_to_childs(command* cmd, int input) {

    int num_children = cmd->num_children;
    int write_pipes[num_children];
    pthread_t threads[num_children];
    command* master_cmd = cmd->master_cmd;

    pid_t pid;
    int i;

    for (i = 0; i < num_children; i++) {
        int new_pipe[2];
        if (socketpair(AF_LOCAL, SOCK_STREAM, 0, new_pipe) < 0) {
            int errnum = errno;
            fprintf(STDERR_FILENO, "ERROR (%d: %s)\n", errnum,
                    strerror(errnum));
            return EXIT_FAILURE;
        }

        if (cmd->chains[i] != NULL) {
            cmd->chains[i]->fd = new_pipe[0];

            if (pthread_create(&threads[i], NULL, (void *) be_child,
                    cmd->chains[i]) != 0) {
                perror("pthread_create"), exit(1);
            }

            write_pipes[i] = new_pipe[1];
        } else {
            perror("ERROR\n");
        }
    }

    if (input != -1) {
        waitfor(input);
    }

    int pipefd = run_pipeline_sockets(master_cmd, input);

    int buffer[1024];

    int len = 0;
    while ((len = read(pipefd, buffer, sizeof(buffer))) != 0) {
        int j;
        for (j = 0; j < num_children; j++) {
            if (write(write_pipes[j], &buffer, len) != len) {
                fprintf(STDERR_FILENO, "Write failed (child %d)\n", j);
                exit(1);
            }

        }

    }

    close(pipefd);

    for (i = 0; i < num_children; i++) {
        close(write_pipes[i]);
    }

    for (i = 0; i < num_children; i++) {
        if (pthread_join(threads[i], NULL) != 0) {
            perror("pthread_join"), exit(1);
        }
    }

}

void waitfor(int fd) {
    fd_set rfds;
    struct timeval tv;
    int retval;

    FD_ZERO(&rfds);
    FD_SET(fd, &rfds);

    tv.tv_sec = 0;
    tv.tv_usec = 500000;

    retval = select(fd + 1, &rfds, NULL, NULL, &tv);

    if (retval == -1)
        perror("select()");
    else if (retval) {
        printf("Data is available now on: %d\n", fd);
    } else {
        printf("No data on: %d\n", fd);
        ///waitfor(fd);
    }
}

void be_child(command* cmd) {

    printf(
            "fd = %d , argv = %s , args = %s , next = %d , master_cmd = %d , next_chain = %d\n",
            cmd->fd, cmd->argv[0], cmd->argv[1], cmd->next, cmd->master_cmd,
            cmd->chains);

    waitfor(cmd->fd);

    int fd = run_pipeline_sockets(cmd, cmd->fd);

    waitfor(fd);

    int buffer[1024];

    int len = 0;

    while ((len = read(fd, buffer, sizeof(buffer))) != 0) {
        write(STDERR_FILENO, &buffer, len);
    }

    close(cmd->fd);
    close(fd);

}

int run_pipeline_sockets(command *cmd, int input) {
    int pfds[2] = { -1, -1 };
    int pid = -1;

    if (socketpair(AF_LOCAL, SOCK_STREAM, 0, pfds) < 0) {
        int errnum = errno;
        fprintf(STDERR_FILENO, "socketpair failed (%d: %s)\n", errnum,
                strerror(errnum));
        return EXIT_FAILURE;
    }

    if ((pid = fork()) == 0) { /* child */
        if (input != -1) {
            dup2(input, STDIN_FILENO);
            close(input);
        }
        if (pfds[1] != -1) {
            dup2(pfds[1], STDOUT_FILENO);
            close(pfds[1]);
        }
        if (pfds[0] != -1) {
            close(pfds[0]);
        }
        execvp(cmd->argv[0], cmd->argv);
        exit(1);
    } else { /* parent */
        if (input != -1) {
            close(input);
        }
        if (pfds[1] != -1) {
            close(pfds[1]);
        }
        if (cmd->next != NULL) {
            run_pipeline_sockets(cmd->next, pfds[0]);
        } else {
            return pfds[0];
        }
    }
}

void segfault_sigaction(int signal, siginfo_t *si, void *arg) {
    printf("Caught segfault at address %p\n", si->si_addr);
    printf("Caught segfault errno %p\n", si->si_errno);
    exit(0);
}

void handle_segfault(void) {
    struct sigaction sa;

    memset(&sa, 0, sizeof(sigaction));
    sigemptyset(&sa.sa_mask);
    sa.sa_sigaction = segfault_sigaction;
    sa.sa_flags = SA_SIGINFO;

    sigaction(SIGSEGV, &sa, NULL);
}

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

山田美奈子 2024-12-06 14:03:52

我会从一个非常不同的角度来看待这个问题:而不是想出一个大的数据结构来管理管道树,并使用线程(进程中的 io 阻塞可能会阻塞其线程),我只会使用进程。

当您只使用 1K 缓冲区时,我也不明白 64K 缓冲区如何成为您的瓶颈。

2 个简单的函数应该指导这一点:(为简洁起见,省略了错误处理,并使用伪代码 parsecmd() 函数将空格分隔的字符串转换为参数向量)

int mkproc(char *cmd, int outfd)
{
    Command c = parsecmd(cmd);
    int pipeleft[2];
    pipe(pipeleft);
    if(!fork()){
        close(pipeleft[1]);
        dup2(pipeleft[0], 0);
        dup2(outfd, 1);
        execvp(c.name, c.argv);
    }
    close(pipeleft[0]);
    return pipeleft[1];
}

Mkproc 获取它将写入的 fd,并且返回它将读取的内容。这样链就很容易初始化:

int chain_in = mkproc("cat foo.txt", mkproc("sort", mkproc("wc -l", 1)));

接下来是:

int mktree(char *cmd, int ofd0, ...)
{
    int piperight[2];
    pipe(piperight);

    int cmdin = mkproc(cmd, piperight[1]);
    close(piperight[1]);
    if(!fork()){
        uchar buf[4096];
        int n;

        while((n=read(piperight[0], buf, sizeof buf))>0){
            va_list ap;
            int fd;
            va_start(ap, ofd0);
            for(fd=ofd0; fd!=-1; fd=va_arg(ap, int)){
                write(fd, buf, n);
            }
            va_end(ap);
        }
    }
    return cmdin;
}

在这两者之间,很容易构造任意复杂度的树,如下所示:

int tree_in = mktree("cat foo.txt", 
                  mktree("rot13",
                      mkproc("uniq", mkproc("wc -l", 1)),
                      mkproc("wc -l", open("out.txt", O_WRONLY)), -1),
                  mkproc("sort", 2), -1);

这会将排序后的 foo.txt 输出到 stderr,即 rot13 中的行数'd foo.txt 到 out.txt,以及 rot13'd foo.txt 到 stdout 的非重复行数。

I would come at this problem from a very different angle: rather than coming up with a large data structure to manage the pipe tree, and using threads (where an io blockage in a process may block in its threads) I would use only processes.

I also fail to see how a 64K buffer is your bottleneck when you're only using a 1K buffer.

2 simple functions should guide this: (error handling omitted for brevity, and using a pseudocodey parsecmd() function which turns a space separated string into an argument vector)

int mkproc(char *cmd, int outfd)
{
    Command c = parsecmd(cmd);
    int pipeleft[2];
    pipe(pipeleft);
    if(!fork()){
        close(pipeleft[1]);
        dup2(pipeleft[0], 0);
        dup2(outfd, 1);
        execvp(c.name, c.argv);
    }
    close(pipeleft[0]);
    return pipeleft[1];
}

Mkproc takes the fd it will write to, and returns what it will read from. This way chains are really easy to initalize:

int chain_in = mkproc("cat foo.txt", mkproc("sort", mkproc("wc -l", 1)));

the next is:

int mktree(char *cmd, int ofd0, ...)
{
    int piperight[2];
    pipe(piperight);

    int cmdin = mkproc(cmd, piperight[1]);
    close(piperight[1]);
    if(!fork()){
        uchar buf[4096];
        int n;

        while((n=read(piperight[0], buf, sizeof buf))>0){
            va_list ap;
            int fd;
            va_start(ap, ofd0);
            for(fd=ofd0; fd!=-1; fd=va_arg(ap, int)){
                write(fd, buf, n);
            }
            va_end(ap);
        }
    }
    return cmdin;
}

Between the two of these, it is very easy to construct trees of arbitrary complexity, as so:

int tree_in = mktree("cat foo.txt", 
                  mktree("rot13",
                      mkproc("uniq", mkproc("wc -l", 1)),
                      mkproc("wc -l", open("out.txt", O_WRONLY)), -1),
                  mkproc("sort", 2), -1);

This would output a sorted foo.txt to stderr, the number of lines in rot13'd foo.txt to out.txt, and the number of non-duplicate lines of rot13'd foo.txt to stdout.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文