多个线程之一的 pthread_join

发布于 2024-12-03 10:09:57 字数 333 浏览 1 评论 0原文

我的问题类似于 我该如何使用 pthread 时检查线程是否终止?。但我没有完全得到答案。

我的问题是......我创建了一定数量的线程,比如n。一旦 main 检测到任何一个线程的退出,它就会创建另一个线程,从而保持并发度为 n 等等。

主线程如何检测线程的退出。 pthread_join 等待特定线程退出,但在我的例子中,它可以是 n 个线程中的任何一个。

谢谢

My question is similar to How do I check if a thread is terminated when using pthread?. but i did not quite get an answer.

My problem is...I create a certain number of threads say n. As soon as main detects the exit of any one thread it creates another thread thus keeping the degree of concurrency as n and so on.

How does the main thread detect the exit of a thread. pthread_join waits for a particular thread to exit but in my case it can be any one of the n threads.

Thanks

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(5

八巷 2024-12-10 10:09:57

最明显的是,无需像 aix 建议的那样重构代码,而是让每个线程设置一些内容来指示它已完成(可能是所有线程之间共享的数组中的值,每个工作线程一个槽),然后向条件变量发出信号。主线程等待条件变量,每次醒来时,处理所有已指示自己已完成的线程:可能有多个。

当然,这意味着如果线程被取消,您永远不会收到信号,因此请使用取消处理程序或不要取消线程。

Most obvious, without restructuring your code as aix suggests, is to have each thread set something to indicate that it has finished (probably a value in an array shared between all threads, one slot per worker thread), and then signal a condition variable. Main thread waits on the condition variable and each time it wakes up, handle all threads that have indicated themselves finished: there may be more than one.

Of course that means that if the thread is cancelled you never get signalled, so use a cancellation handler or don't cancel the thread.

撕心裂肺的伤痛 2024-12-10 10:09:57

有几种方法可以解决这个问题。

一种自然的方法是拥有一个固定大小n的线程池,并拥有一个队列,主线程将任务放入其中,工作人员将从队列中拾取任务并处理它们。这将保持恒定的并发度。

另一种方法是将信号量的初始值设置为n。每次创建工作线程时,信号量的值都需要递减。每当工作人员即将终止时,它都需要增加(“发布”)信号量。现在,在主线程中等待信号量将被阻塞,直到剩下的工作线程少于 n 个;然后将生成一个新的工作线程并恢复等待。由于您不会在工作线程上使用 pthread_join,因此应该分离它们 (pthread_detach)。

There are several ways to solve this.

One natural way is to have a thread pool of fixed size n and have a queue into which the main thread would place tasks and from which the workers would pick up tasks and process them. This will maintain a constant degree of concurrency.

An alternative is to have a semaphore with the initial value set to n. Every time a worker thread is created, the value of the semaphore would need to be decremented. Whenever a worker is about to terminate, it would need to increment ("post") the semaphore. Now, waiting on the semaphore in the main thread will block until there's fewer than n workers left; a new worker thread would then be spawned and the wait resumed. Since you won't be using pthread_join on the workers, they should be detached (pthread_detach).

你好,陌生人 2024-12-10 10:09:57

如果您想了解线程退出(通过 pthread_exit 或取消),您可以使用带有 pthread_cleanup_push 通知子线程退出的主线程(通过条件变量,信号量或类似信号量),因此它可以等待它,或者简单地启动一个新信号量(假设子进程首先被分离)。

或者,我建议让线程等待更多工作(如 @aix 所建议),而不是结束。

If you want to be informed of a thread exiting (via pthread_exit or cancellation), you can use a handler with pthread_cleanup_push to inform the main thread of the child exiting (via a condition variable, semaphore or similar) so it can either wait on it, or simply start a new one (assuming the child is detached first).

Alternately, I'd suggest having the threads wait for more work (as suggested by @aix), rather than ending.

昇り龍 2024-12-10 10:09:57

如果你的父线程需要做其他事情,那么它不能一直阻塞在pthread_join上,你需要一种方法从子线程向主线程发送消息来告诉它调用pthread_join。为此,您可以使用多种 IPC 机制。

当子线程完成工作后,它会通过 IPC 向主线程发送某种消息,说“我完成了我的工作”,并传递自己的线程 id,然后主线程知道调用 pthread_join< /code> 在该线程 id 上。

If your parent thread needs to do other other things, then it can't just constantly be blocking on pthread_join, You will need a way to send a message to the main thread from the child thread to tell it to call pthread_join. There are a number of IPC mechanisms that you could use for this.

When a child thread has done it's work, it would then send some sort of message to the main thread through IPC saying "I completed my job" and also pass its own thread id, then the main thread knows to call pthread_join on that thread id.

财迷小姐 2024-12-10 10:09:57

一种简单的方法是使用管道作为(工作)线程和主线程之间的通信通道。当线程终止时,它将其结果(以下示例中的线程 id)写入管道。主线程等待管道并在管道可用时立即从中读取线程结果。

与互斥体或信号量不同,管道文件描述符可以由应用程序主事件循环(例如 libevent)轻松处理。不同线程对同一管道的写入是原子的,只要它们写入 < code>PIPE_BUF 或更少字节(在我的 Linux 上为 4096)。

下面是一个创建十个线程的演示,每个线程都有不同的生命周期。然后主线程等待任何线程终止并打印其线程 ID。当所有十个线程都完成时它终止。

$ cat test.cc
#include <iostream>
#include <pthread.h>
#include <unistd.h>
#include <stdlib.h>
#include <time.h>

void* thread_fun(void* arg) {
    // do something
    unsigned delay = rand() % 10;
    usleep(delay * 1000000);

    // notify termination
    int* thread_completed_fd = static_cast<int*>(arg);
    pthread_t thread_id = pthread_self();
    if(sizeof thread_id != write(*thread_completed_fd, &thread_id, sizeof thread_id))
        abort();

    return 0;
}

int main() {
    int fd[2];
    if(pipe(fd))
        abort();

    enum { THREADS = 10 };

    time_t start = time(NULL);

    // start threads
    for(int n = THREADS; n--;) {
        pthread_t thread_id;
        if(pthread_create(&thread_id, NULL, thread_fun, fd + 1))
            abort();
        std::cout << time(NULL) - start << " sec: started thread " << thread_id << '\n';
    }

    // wait for the threads to finish
    for(int n = THREADS; n--;) {
        pthread_t thread_id;
        if(sizeof thread_id != read(fd[0], &thread_id, sizeof thread_id))
            abort();
        if(pthread_join(thread_id, NULL)) // detached threads don't need this call
            abort();
        std::cout << time(NULL) - start << " sec: thread " << thread_id << " has completed\n";
    }

    close(fd[0]);
    close(fd[1]);
}

$ g++ -o test -pthread -Wall -Wextra -march=native test.cc
$ ./test
0 sec: started thread 140672287479552
0 sec: started thread 140672278759168
0 sec: started thread 140672270038784
0 sec: started thread 140672261318400
0 sec: started thread 140672252598016
0 sec: started thread 140672243877632
0 sec: started thread 140672235157248
0 sec: started thread 140672226436864
0 sec: started thread 140672217716480
0 sec: started thread 140672208996096
1 sec: thread 140672208996096 has completed
2 sec: thread 140672226436864 has completed
3 sec: thread 140672287479552 has completed
3 sec: thread 140672243877632 has completed
5 sec: thread 140672252598016 has completed
5 sec: thread 140672261318400 has completed
6 sec: thread 140672278759168 has completed
6 sec: thread 140672235157248 has completed
7 sec: thread 140672270038784 has completed
9 sec: thread 140672217716480 has completed

One easy way is to use a pipe as a communication channel between the (worker) threads and your main thread. When a thread terminates it writes its result (thread id in the following example) to the pipe. The main thread waits on the pipe and reads the thread result from it as soon as it becomes available.

Unlike mutex or semaphore, a pipe file descriptor can be easily handled by the application main event loop (such as libevent). The writes from different threads to the same pipe are atomic as long as they write PIPE_BUF or less bytes (4096 on my Linux).

Below is a demo that creates ten threads each of which has a different life span. Then the main thread waits for any thread to terminate and prints its thread id. It terminates when all ten threads have completed.

$ cat test.cc
#include <iostream>
#include <pthread.h>
#include <unistd.h>
#include <stdlib.h>
#include <time.h>

void* thread_fun(void* arg) {
    // do something
    unsigned delay = rand() % 10;
    usleep(delay * 1000000);

    // notify termination
    int* thread_completed_fd = static_cast<int*>(arg);
    pthread_t thread_id = pthread_self();
    if(sizeof thread_id != write(*thread_completed_fd, &thread_id, sizeof thread_id))
        abort();

    return 0;
}

int main() {
    int fd[2];
    if(pipe(fd))
        abort();

    enum { THREADS = 10 };

    time_t start = time(NULL);

    // start threads
    for(int n = THREADS; n--;) {
        pthread_t thread_id;
        if(pthread_create(&thread_id, NULL, thread_fun, fd + 1))
            abort();
        std::cout << time(NULL) - start << " sec: started thread " << thread_id << '\n';
    }

    // wait for the threads to finish
    for(int n = THREADS; n--;) {
        pthread_t thread_id;
        if(sizeof thread_id != read(fd[0], &thread_id, sizeof thread_id))
            abort();
        if(pthread_join(thread_id, NULL)) // detached threads don't need this call
            abort();
        std::cout << time(NULL) - start << " sec: thread " << thread_id << " has completed\n";
    }

    close(fd[0]);
    close(fd[1]);
}

$ g++ -o test -pthread -Wall -Wextra -march=native test.cc
$ ./test
0 sec: started thread 140672287479552
0 sec: started thread 140672278759168
0 sec: started thread 140672270038784
0 sec: started thread 140672261318400
0 sec: started thread 140672252598016
0 sec: started thread 140672243877632
0 sec: started thread 140672235157248
0 sec: started thread 140672226436864
0 sec: started thread 140672217716480
0 sec: started thread 140672208996096
1 sec: thread 140672208996096 has completed
2 sec: thread 140672226436864 has completed
3 sec: thread 140672287479552 has completed
3 sec: thread 140672243877632 has completed
5 sec: thread 140672252598016 has completed
5 sec: thread 140672261318400 has completed
6 sec: thread 140672278759168 has completed
6 sec: thread 140672235157248 has completed
7 sec: thread 140672270038784 has completed
9 sec: thread 140672217716480 has completed
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文