C中线程同步和条件变量的问题

发布于 2024-11-27 03:47:51 字数 1424 浏览 2 评论 0原文

我有三个线程,一个是主线程,另外两个是工作线程。当有工作要做时,第一个线程会唤醒两个线程之一。每个线程在被唤醒时都会执行一些计算,并且在执行此操作时,如果它发现有更多工作要做,则可以唤醒其他工作线程或简单地决定自行完成该工作(例如,通过将工作添加到本地队列)。 当工作线程有工作要做时,主线程必须等待工作完成。我用条件变量实现了如下(这里报告的代码隐藏了很多细节,如果有什么难以理解的地方请询问):

MAIN THREAD (pseudocode):

//this function can be called from the main several time. It blocks the main thread till the work is done.
void new_work(){

//signaling to worker threads if work is available

    //Now, the threads have been awakened, it's time to sleep till they have finished.
    pthread_mutex_lock(&main_lock);
    while (work > 0)    //work is a shared atomic integer, incremented each time there's work to do and decremented when finished executing some work unit
       pthread_cond_wait(&main_cond);
    pthread_mutex_unlock(&main_lock);

}

WORKER THREADS:

while (1){

   pthread_mutex_lock(&main_lock);
    if (work == 0)
       pthread_cond_signal(&main_cond);
    pthread_mutex_unlock(&main_lock);  

    //code to let the worker thread wait again -- PROBLEM!

   while (I have work to do, in my queue){
       do_work()
   }

}

这是问题:当一个工作线程唤醒主线程 我不确定工作线程是否调用 wait 将其自身置于等待新工作的状态。即使我使用另一个条件变量实现此等待,主线程也可能处于唤醒状态,执行一些工作,直到达到必须唤醒尚未调用等待的线程的程度...这可以导致不好的结果。我尝试了多种方法来解决这个问题,但我找不到解决方案,也许有一个明显的方法可以解决它,但我错过了。

您能提供解决此类问题的方案吗?我使用的是 C 语言,并且可以使用您认为适合的任何同步机制,例如 pthreads 或 posix 信号量。

谢谢

I have three threads, one thread is the main and the other two are worker threads. The first thread, when there is work to be done wakes up one of the two threads. Each thread when awakened perform some computation and while doing this if it finds more work to do can wake up the other working thread or simply decide to do the job by itself (By adding work to a local queue, for example).
While the worker threads have work to do, the main thread must wait for the work to be done. I have implemented this with condition variables as follows (the code reported here hides a lot of details, please ask if there's something non understandable):

MAIN THREAD (pseudocode):

//this function can be called from the main several time. It blocks the main thread till the work is done.
void new_work(){

//signaling to worker threads if work is available

    //Now, the threads have been awakened, it's time to sleep till they have finished.
    pthread_mutex_lock(&main_lock);
    while (work > 0)    //work is a shared atomic integer, incremented each time there's work to do and decremented when finished executing some work unit
       pthread_cond_wait(&main_cond);
    pthread_mutex_unlock(&main_lock);

}

WORKER THREADS:

while (1){

   pthread_mutex_lock(&main_lock);
    if (work == 0)
       pthread_cond_signal(&main_cond);
    pthread_mutex_unlock(&main_lock);  

    //code to let the worker thread wait again -- PROBLEM!

   while (I have work to do, in my queue){
       do_work()
   }

}

Here is the problem: when a worker thread wakes up the main thread I'm not sure that the worker thread calls a wait to put itself in a waiting state for new work. Even if I implement this wait with another condition variable, it can happen that the main thread is awake, does some work until reaches a point in which he has to wake up the thread that has not called a wait yet... and this can lead to bad results. I've tried several ways to solve this issue but I couldn't find a solution, maybe there is an obvious way to solve it but I'm missing it.

Can you provide a scheme to solve this kind of problem? I'm using the C language and I can use whatever synchronization mechanism you think can be suited, like pthreads or posix semaphores.

Thanks

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(4

鲜血染红嫁衣 2024-12-04 03:47:51

处理此问题的常用方法是使用单个工作队列并防止其溢出和下溢。像这样的东西(我省略了“pthread_”前缀):

mutex queue_mutex;
cond_t queue_not_full, queue_not_empty;

void enqueue_work(Work w) {
    mutex_lock(&queue_mutex);
    while (queue_full())
        cond_wait(&queue_not_full, &queue_mutex);
    add_work_to_queue(w);
    cond_signal(&queue_not_empty);
    mutex_unlock(&queue_mutex);
}

Work dequeue_work() {
    mutex_lock(&queue_mutex);
    while (queue_empty())
        cond_wait(&queue_not_empty, &queue_mutex);
    Work w = remove_work_from_queue();
    cond_signal(&queue_not_full);
    mutex_unlock(&queue_mutex);
}

注意这些函数之间的对称性:enqueue <->出队,清空<->满,not_empty <->不满。

这为任意数量的产生工作的线程和任意数量的消耗工作的线程提供了一个线程安全的有界大小队列。 (实际上,这是使用条件变量的典型示例。)如果您的解决方案看起来与此不完全一样,那么它可能应该非常接近......

The usual way to handle this is to have a single work queue and protect it from overflow and underflow. Something like this (where I have left off the "pthread_" prefixes):

mutex queue_mutex;
cond_t queue_not_full, queue_not_empty;

void enqueue_work(Work w) {
    mutex_lock(&queue_mutex);
    while (queue_full())
        cond_wait(&queue_not_full, &queue_mutex);
    add_work_to_queue(w);
    cond_signal(&queue_not_empty);
    mutex_unlock(&queue_mutex);
}

Work dequeue_work() {
    mutex_lock(&queue_mutex);
    while (queue_empty())
        cond_wait(&queue_not_empty, &queue_mutex);
    Work w = remove_work_from_queue();
    cond_signal(&queue_not_full);
    mutex_unlock(&queue_mutex);
}

Note the symmetry between these functions: enqueue <-> dequeue, empty <-> full, not_empty <-> not full.

This provides a thread-safe bounded-size queue for any number of threads producing work and any number of threads consuming work. (Actually, it is sort of the canonical example for the use of condition variables.) If your solution does not look exactly like this, it should probably be pretty close...

尝蛊 2024-12-04 03:47:51

如果您希望主线程将工作分配给其他两个线程,然后等到两个线程都完成其工作后再继续,您也许可以通过屏障来完成此任务。

屏障是一种同步构造,您可以使用它使线程在代码中的某个点等待,直到一定数量的线程准备好继续前进。本质上,您初始化一个 pthread 屏障,表示 x 个线程必须等待它,然后才允许任何线程继续。当每个线程完成其工作并准备继续时,它将在屏障上等待,一旦x个线程到达屏障,它们都被允许继续。

在您的情况下,您可能可以执行以下操作:

pthread_barrier_t barrier;
pthread_barrier_init(&barrier, 3);

master()
{
  while (work_to_do) {
    put_work_on_worker_queues();
    pthread_barrier_wait(&barrier);
  }
}

worker()
{
  while(1) {
    while (work_on_my_queue()) {
      do_work();
    }
    pthread_barrier_wait(&barrier);
  }
}

这应该使您的主线程完成工作,然后等待两个工作线程完成它们所分配的工作(如果有),然后再继续。

If you want the main thread to distribute work to the other two, then wait until both threads have completed their work before moving on, you might be able to accomplish this with a barrier.

A barrier is a synchronization construct that you can use to make threads wait at a certain point in your code until a set number of threads are all ready to move on. Essentially, you initialize a pthread barrier, saying that x number of threads must wait on it before any are allowed to continue. As each thread finishes its work and is ready to go on, it will wait on the barrier, and once x number of threads have reached the barrier, they are all allowed to continue.

In your case, you might be able to do something like:

pthread_barrier_t barrier;
pthread_barrier_init(&barrier, 3);

master()
{
  while (work_to_do) {
    put_work_on_worker_queues();
    pthread_barrier_wait(&barrier);
  }
}

worker()
{
  while(1) {
    while (work_on_my_queue()) {
      do_work();
    }
    pthread_barrier_wait(&barrier);
  }
}

This should make your main thread give out work, then wait both worker threads to complete the work they were given (if any) before moving on.

美男兮 2024-12-04 03:47:51

你能有一个由主线程管理的“新作业”队列吗?主线程可以一次向每个工作线程分发 1 个作业。主线程还将侦听工作人员完成的作业。如果工作线程发现需要执行的新作业,只需将其添加到“新作业”队列中,主线程将分发它。

伪代码:

JobQueue NewJobs;
Job JobForWorker[NUM_WORKERS];

workerthread()
{
  while(wait for new job)
  {
    do job (this may include adding new jobs to NewJobs queue)
    signal job complete to main thread
  }
}

main thread()
{
  while(whatever)
  {
    wait for job completion on any worker thread
    now a worker thread is free put a new job on it
  }
}

Could you have "new job" queue, which is managed by the main thread? The main thread could dish out 1 job at a time to each worker thread. The main thread would also listen for completed jobs by the workers. If a worker thread finds a new job that needs doing just add it to the "new job" queue and the main thread will distribute it.

Pseudocode:

JobQueue NewJobs;
Job JobForWorker[NUM_WORKERS];

workerthread()
{
  while(wait for new job)
  {
    do job (this may include adding new jobs to NewJobs queue)
    signal job complete to main thread
  }
}

main thread()
{
  while(whatever)
  {
    wait for job completion on any worker thread
    now a worker thread is free put a new job on it
  }
}
想你只要分分秒秒 2024-12-04 03:47:51

我相信您所遇到的是生产者-消费者问题的变体。您正在做的是编写计数信号量的临时实现(用于提供的不仅仅是互斥)。

如果我正确地阅读了您的问题,那么您要做的就是让工作线程阻塞,直到有可用的工作单元,然后在工作单元可用后执行该工作单元。您的问题是有太多可用工作并且主线程尝试解锁已经在工作的工作人员的情况。我将按如下方式构建您的代码。

sem_t main_sem;
sem_init(&main_sem, 0, 0);

void new_work() {
    sem_post(&main_sem);
    pthread_cond_wait(&main_cond);
}

void do_work() {
    while (1) {
        sem_wait(&main_sem);
        // do stuff
        // do more stuff
        pthread_cond_signal(&main_sem);
    }
}

现在,如果工作线程生成更多工作,那么它们可以简单地将 sem_post 发送到信号量,并简单地推迟 pthread_cond_signal 直到所有工作完成。

但请注意,如果您实际上需要主线程在工作线程工作时始终阻塞,那么当您只需调用执行该工作的函数时,将工作推送到另一个线程是没有用的。

I believe that what you have here is a variation on the producer-consumer problem. What you are doing is writing up an ad-hoc implementation of a counting semaphore (one that is used to provide more than just mutual exclusion).

If I've read your question right, what you are trying to do is have the worker threads block until there is a unit of work available and then perform a unit of work once it becomes available. Your issue is with the case where there is too much work available and the main thread tries to unblock a worker that is already working. I would structure your code as follows.

sem_t main_sem;
sem_init(&main_sem, 0, 0);

void new_work() {
    sem_post(&main_sem);
    pthread_cond_wait(&main_cond);
}

void do_work() {
    while (1) {
        sem_wait(&main_sem);
        // do stuff
        // do more stuff
        pthread_cond_signal(&main_sem);
    }
}

Now, if the worker threads generate more work then they can simply sem_post to the semaphore and simply defer the pthread_cond_signal till all the work is done.

Note however, if you actually need the main thread to always block when the worker is working, it's not useful to push the work to another thread when you could just call a function that does the work.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文