[线程同步]让一个打印线程等待n个线程完成一个周期的执行,然后通知n个线程完成另一个周期,重复

发布于 2025-01-11 12:45:33 字数 2402 浏览 0 评论 0原文

如何让单个线程等待 n 个线程完成单个执行周期,然后重复 i 个周期。该解决方案需要使用信号量向单个线程发出信号,表明所有 n 个线程都已完成一个执行周期。然后,单个线程应该向所有 n 个线程发出信号,表明它们可以继续执行一次,...重复。

我无法解决一些挑战:

  • 控制信号量,以便 n 个线程之一不会消耗超过一个周期(信号量发布/等待)。
  • n 个工作线程只有在运行时才知道,因此我们无法初始化 sem_t 数组[n]。

//these would have been read in at run time
int n = 4;
int control = 4;

void *worker(void* args){
    int num = (int)args;
    printf("Worker id: %d\n", num);
    int proceed = 5; // simulates "5" jobs in this workers queue
    while(proceed > 0){
        sem_wait(&execute);
        printf("Report from: %d\n", num);
        sem_post(&reports);
        proceed--;
    }
    control--;
} 

void *print(){
    while(control > 0){
        for(int i = 0; i < n; i++) {
            sem_wait(&reports);
        }
        printf("All reported\n");
        for(int i = 0; i < n; i++) {
            sem_post(&execute);
        }
    }
}

void main(){
    // control/n would have been scanned and passed to threads, the global control var 
    would be set after reading in n
    sem_init(&execute,0,n);    //initialization of the first semaphore
    sem_init(&reports,0,0);    //initialization of the second semaphore

    pthread_t printer;
    pthread_create(&printer,NULL, &print,NULL);

    pthread_t workers[n];
    for(int i = 0; i < n; i++) {
        int* num = malloc(sizeof(int));
        num = i;
        pthread_create(&(workers[i]),NULL, &worker, (void*)num);
    }
    for(int i = 0; i < n; i++) {
        pthread_join(workers[i],NULL);
    }

    pthread_join(printer,NULL);
}

输出:

[rnitz]$ gcc -Wall -Werror -w -g example2.c -std=gnu99 -lpthread 
[rnitz]$ ./a.out
Worker id: 0
Worker id: 3
Report from: 3
Report from: 3
Worker id: 2
Worker id: 1
Report from: 3
Report from: 0
All reported
Report from: 1
Report from: 0
Report from: 2
Report from: 3
All reported
Report from: 0
Report from: 1
Report from: 3
Report from: 2
All reported
Report from: 0
Report from: 0
Report from: 1
Report from: 2
All reported
Report from: 2
Report from: 1
Report from: 1
Report from: 2
All reported
[rnitz]$ 

非常明显的问题,一些工作人员正在吃掉信号量中所有可用的“循环”空间。

注意:此问题基于模拟 cpu,其中 n 是将在自己独立的作业队列上工作的 cpu 线程数;而单个打印线程负责打印给定 cpu 周期内正在处理的当前作业。打印线程将:

  1. 等待所有 cpu 完成一个周期
  2. 在每个 cpu 上打印当前作业
  3. 向 n 个 cpu 发出信号以完成另一个周期。

How can I have a single thread wait for n threads to complete a single execution cycle, then repeat this for i cycles. The solution needs to use semaphores to signal the single thread that all n threads have completed an execution cycle. The single thread should then signal ALL n threads that they can carry on with one more execution, ...repeat.

Some challenges I can't fix:

  • Controlling the semaphores so that one of the n threads doesn't eat up more than one cycle (semaphore post/wait).
  • The n number of workers is only known at run time, so we can't initialize a sem_t array[n].

:

//these would have been read in at run time
int n = 4;
int control = 4;

void *worker(void* args){
    int num = (int)args;
    printf("Worker id: %d\n", num);
    int proceed = 5; // simulates "5" jobs in this workers queue
    while(proceed > 0){
        sem_wait(&execute);
        printf("Report from: %d\n", num);
        sem_post(&reports);
        proceed--;
    }
    control--;
} 

void *print(){
    while(control > 0){
        for(int i = 0; i < n; i++) {
            sem_wait(&reports);
        }
        printf("All reported\n");
        for(int i = 0; i < n; i++) {
            sem_post(&execute);
        }
    }
}

void main(){
    // control/n would have been scanned and passed to threads, the global control var 
    would be set after reading in n
    sem_init(&execute,0,n);    //initialization of the first semaphore
    sem_init(&reports,0,0);    //initialization of the second semaphore

    pthread_t printer;
    pthread_create(&printer,NULL, &print,NULL);

    pthread_t workers[n];
    for(int i = 0; i < n; i++) {
        int* num = malloc(sizeof(int));
        num = i;
        pthread_create(&(workers[i]),NULL, &worker, (void*)num);
    }
    for(int i = 0; i < n; i++) {
        pthread_join(workers[i],NULL);
    }

    pthread_join(printer,NULL);
}

Output:

[rnitz]$ gcc -Wall -Werror -w -g example2.c -std=gnu99 -lpthread 
[rnitz]$ ./a.out
Worker id: 0
Worker id: 3
Report from: 3
Report from: 3
Worker id: 2
Worker id: 1
Report from: 3
Report from: 0
All reported
Report from: 1
Report from: 0
Report from: 2
Report from: 3
All reported
Report from: 0
Report from: 1
Report from: 3
Report from: 2
All reported
Report from: 0
Report from: 0
Report from: 1
Report from: 2
All reported
Report from: 2
Report from: 1
Report from: 1
Report from: 2
All reported
[rnitz]$ 

Pretty obvious issue where some workers are eating up all the available "cycle" room in the semaphore.

Note: This question is based on simulating a cpu, where n is the number of cpu threads that will work on an their own independent job queue; while a single print thread handles printing the current job being processed for a given cpu cycle. The print thread will:

  1. Wait for all cpus to finish a cycle
  2. Print the current job on each cpu
  3. Signal the n cpus to complete another cycle.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

迷迭香的记忆 2025-01-18 12:45:33

解决方案:sem_t* 在运行时进行 malloc 的信号量数组。我在任何地方都找不到这样的示例,但最终尝试并让它工作:)

我保留了问题,因为我在 SOF 上找不到任何类似的东西,可以理解的是,等待 n 个线程在每个周期同步可能并不常见

为此,您可以使用信号量数组,而不是递增/递减单个信号量信号量。至少您需要至少一个信号量数组,此解决方案使用 2 个。

如果每个工作线程要完成不同数量的作业,则可以扩展此解决方案,您可以使用一个全局 int,该全局 int 在线程完成时递减,这然后将在打印机线程的 for 循环中使用。这样,如果 n 个线程中的 1 个由于工作量较少而提前完成,则可以 sem_wait()/sem_post n-1 次。

sem_t* execute_arr;
sem_t* reports_arr;

//this both need to be the same value
int n = 4;
int control = 4;

void *worker(void* args){
    int num = (int)args;
    //printf("init arg: %d\n", num);
    int proceed = 5;
    while(proceed > 0){
        sem_wait(&execute_arr[num]);
        printf("Report from: %d\n", num);
        sem_post(&reports_arr[num]);
        proceed--;
    }
    control--;
    //free memory and/or destroy sems if needed
} 

void *print(){
    while(control > 0){
        for(int i = 0; i < n; i++) {
            sem_wait(&reports_arr[i]);
        }
        printf("All reported\n");
        for(int i = 0; i < n; i++) {
            sem_post(&execute_arr[i]);
        }
    }
}

void main(){
    execute_arr = (sem_t*)malloc(n * sizeof(sem_t));
    reports_arr = (sem_t*)malloc(n * sizeof(sem_t));
    for(int i = 0; i < n; i++) {
        sem_init(&execute_arr[i], 0, 1);
        sem_init(&reports_arr[i], 0, 0);
    }

    pthread_t printer;
    pthread_create(&printer,NULL, &print,NULL);

    pthread_t workers[n];
    for(int i = 0; i < n; i++) {
        int* num = malloc(sizeof(int));
        num = i;
        pthread_create(&(workers[i]),NULL, &worker, (void*)num);
    }
    for(int i = 0; i < n; i++) {
        pthread_join(workers[i],NULL);
    }

    pthread_join(printer,NULL);
    //free memory and/or destroy sems if needed
}

和固定输出:

[rnitz]$ ./a.out
Report from: 1
Report from: 0
Report from: 2
Report from: 3
All reported
Report from: 1
Report from: 3
Report from: 0
Report from: 2
All reported
Report from: 1
Report from: 3
Report from: 2
Report from: 0
All reported
Report from: 1
Report from: 3
Report from: 2
Report from: 0
All reported
Report from: 1
Report from: 3
Report from: 0
Report from: 2
All reported
[rnitz]$ 

Solution: sem_t* array of semaphores that are malloc'd at run time.. I couldn't find an example of this anywhere but ended up trying, and having it work :)

I'm keeping the question because I can't find anything similar on SOF, understandably waiting for n threads to synch on every cycle probably isn't common

For this, you can use arrays of semaphores instead of incrementing/decrementing a single semaphore. At minimum you will need at least one semaphore array, this solution uses 2.

This solution can be extended if the worker threads each have a different amount of jobs to complete, you can have a global int that decrements when a thread is finished, which would be then be used in the for-loops of the printer thread. That way if 1 of the n threads is done early due to less work, you can sem_wait()/sem_post n-1 times.

sem_t* execute_arr;
sem_t* reports_arr;

//this both need to be the same value
int n = 4;
int control = 4;

void *worker(void* args){
    int num = (int)args;
    //printf("init arg: %d\n", num);
    int proceed = 5;
    while(proceed > 0){
        sem_wait(&execute_arr[num]);
        printf("Report from: %d\n", num);
        sem_post(&reports_arr[num]);
        proceed--;
    }
    control--;
    //free memory and/or destroy sems if needed
} 

void *print(){
    while(control > 0){
        for(int i = 0; i < n; i++) {
            sem_wait(&reports_arr[i]);
        }
        printf("All reported\n");
        for(int i = 0; i < n; i++) {
            sem_post(&execute_arr[i]);
        }
    }
}

void main(){
    execute_arr = (sem_t*)malloc(n * sizeof(sem_t));
    reports_arr = (sem_t*)malloc(n * sizeof(sem_t));
    for(int i = 0; i < n; i++) {
        sem_init(&execute_arr[i], 0, 1);
        sem_init(&reports_arr[i], 0, 0);
    }

    pthread_t printer;
    pthread_create(&printer,NULL, &print,NULL);

    pthread_t workers[n];
    for(int i = 0; i < n; i++) {
        int* num = malloc(sizeof(int));
        num = i;
        pthread_create(&(workers[i]),NULL, &worker, (void*)num);
    }
    for(int i = 0; i < n; i++) {
        pthread_join(workers[i],NULL);
    }

    pthread_join(printer,NULL);
    //free memory and/or destroy sems if needed
}

And the fixed output:

[rnitz]$ ./a.out
Report from: 1
Report from: 0
Report from: 2
Report from: 3
All reported
Report from: 1
Report from: 3
Report from: 0
Report from: 2
All reported
Report from: 1
Report from: 3
Report from: 2
Report from: 0
All reported
Report from: 1
Report from: 3
Report from: 2
Report from: 0
All reported
Report from: 1
Report from: 3
Report from: 0
Report from: 2
All reported
[rnitz]$ 
蒲公英的约定 2025-01-18 12:45:33

对于此类问题,不使用系统信号量,而是使用互斥体和几个条件变量。

我将在代码示例中留下 main 未完成的初始化全局互斥体和条件变量的工作,作为您的练习。

pthread_mutex mutex;
pthread_cond_t cv_thread;
pthread_cond_t cv_main;

int job_number = 0;
int completions = 0;
int exit_condition = 0;
int n = 4; //number of worker threads

void *worker(void* args){
    int num = (int)args;
    int lastJobNumber = 0;
    int must_exit = 0;

    while (1) {    

        // wait for the main thread to indicate a new job is ready
        pthread_mutex_lock(&mutex);

            while ((lastJobNumber >= job_number) && !exit_condition) {
              pthread_cond_wait(&cv_thread, &mut);  // wait for job_number to change or for exit_conditon to be set
            }

            must_exit = exit_condition;

            if (!must_exit) {
                lastJobNumber = job_number;  // take on the new job!
            }

        pthread_mutex_unlock(&mutex);

        if (must_exit) {
            break;
        }

        printf("Report from: %d.  This thread is executing job %d\n", num, lastJobNumber);

        pthread_mutex_lock(&mutex);
            completions++;
        pthread_mutex_unlock(&mutex);
        pthread_cond_broadcast(&cv_main); // signal to main to wake up
    }
} 


void *print() {
   for (int i = 0; i < 5; i++) {
        pthread_mutex_lock(&mutex);
            job_number++;
            completions = 0;

            // signal to threads a new job is ready
            pthread_cond_broadcast(&cv_thread);

            // wait for all threads to indicate completion
            while (completions < n) {
                pthread_cond_wait(&cv_main, &mutex); 
            }

        pthread_mutex_unlock(&mutex);

        printf("All reported\n");
   }

    // signal all threads to exit
    pthread_mutex_lock(&mutex);
        exit_condition = 1;
    pthread_cond_broadcast(&cv_thread);
    pthread_mutex_unlock(&mutex);
}

Rather than use a system semaphore, a mutex and a couple of condition variables would work well here for this type of problem.

I'll leave the unfinished work of main to initialize the global mutex and condition variables in the code sample as an exercise for you.

pthread_mutex mutex;
pthread_cond_t cv_thread;
pthread_cond_t cv_main;

int job_number = 0;
int completions = 0;
int exit_condition = 0;
int n = 4; //number of worker threads

void *worker(void* args){
    int num = (int)args;
    int lastJobNumber = 0;
    int must_exit = 0;

    while (1) {    

        // wait for the main thread to indicate a new job is ready
        pthread_mutex_lock(&mutex);

            while ((lastJobNumber >= job_number) && !exit_condition) {
              pthread_cond_wait(&cv_thread, &mut);  // wait for job_number to change or for exit_conditon to be set
            }

            must_exit = exit_condition;

            if (!must_exit) {
                lastJobNumber = job_number;  // take on the new job!
            }

        pthread_mutex_unlock(&mutex);

        if (must_exit) {
            break;
        }

        printf("Report from: %d.  This thread is executing job %d\n", num, lastJobNumber);

        pthread_mutex_lock(&mutex);
            completions++;
        pthread_mutex_unlock(&mutex);
        pthread_cond_broadcast(&cv_main); // signal to main to wake up
    }
} 


void *print() {
   for (int i = 0; i < 5; i++) {
        pthread_mutex_lock(&mutex);
            job_number++;
            completions = 0;

            // signal to threads a new job is ready
            pthread_cond_broadcast(&cv_thread);

            // wait for all threads to indicate completion
            while (completions < n) {
                pthread_cond_wait(&cv_main, &mutex); 
            }

        pthread_mutex_unlock(&mutex);

        printf("All reported\n");
   }

    // signal all threads to exit
    pthread_mutex_lock(&mutex);
        exit_condition = 1;
    pthread_cond_broadcast(&cv_thread);
    pthread_mutex_unlock(&mutex);
}
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文