使用队列将缓冲区数据存储在生产者 - 消费者多线程程序中时的怪异行为

发布于 2025-02-11 11:54:17 字数 7336 浏览 0 评论 0原文

我创建了一个小程序,其中生产者线程创建一个随机数并将其推到堆栈中,然后将消费者线程弹出堆栈并将值打印到控制台上。我创建了2个信号量,一个空间插槽为空时发出信号的信号(初始化为max_buffer_slots),当缓冲区插槽满满时发出信号(初始化为0)。消费者在fullemaphore上等待,生产者在emptysemaphore上等待。在发出消费者/生产者线程发出信号之后,它们使用fuffermutex相应地修改缓冲区。使用信号量,除非至少有一个完整的插槽,否则消费者绝对不应尝试“消费”,而且除非至少有一个空插槽,否则生产者绝对不应尝试“生产”。使用堆栈作为我的缓冲区,这似乎正常工作。

问题是当我尝试将缓冲区作为队列实现时。我总是在printqueue()尝试打印currentslot-> num时,总是在printqueue()中点击一个异常。基于印刷输出(请参阅下面的输出),当消费者消耗22时,我认为它应该将大小设置为1,而不是2再次?因此,我认为这将buffersize变为比实际要大的一个,后来导致无效访问。不太确定。我无法确定是否有一个错误(我经历了几次逻辑,并且看不到任何错误)。还是信号量的某种程度上无法正常工作?我尝试使用不同的信号库库,但这似乎没有什么不同。不确定它是否会影响,但是我正在通过Mac上的Xcode运行程序。

这是我的程序:

#include <time.h>
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <semaphore.h>

static const char* loggingPrefix = "Multithreading";

#define RAND_OUTPUT_MAX 50
#define MAX_BUFFER_SLOTS 15

// Buffer, implemented as queue
typedef struct slot {
    int num;
    struct slot *next;
}slot_t;
static slot_t* startSlot = NULL;
static slot_t* endSlot = NULL;
static int bufferSize = 0;

// Buffer mutex to make sure only one thread accesses buffer variables at a time
static pthread_mutex_t bufferMutex;

// Buffer empty/full semaphores to signal to producer/consumer when to run
static sem_t *emptySemaphore;
static sem_t *fullSemaphore;

static int shouldProduce(void) {
    return bufferSize < MAX_BUFFER_SLOTS;
}

static int shouldConsume(void) {
    return bufferSize > 0;
}

static void printQueue(void) {
    slot_t *currentSlot = startSlot;
    printf("%s: [ ", loggingPrefix);
    while (currentSlot != NULL) {
        printf("%d ", currentSlot->num);
        currentSlot = currentSlot->next;
    }
    printf("]\n");
}

static void produce(int num) {
    if (shouldProduce()) {
        slot_t *newSlot = (slot_t *)malloc(sizeof(slot_t));
        newSlot->num = num;
        newSlot->next = NULL;
        if (bufferSize == 0) {
            startSlot = newSlot;
            endSlot = newSlot;
        } else {
            endSlot->next = newSlot;
            endSlot = newSlot;
        }
        bufferSize++;
    } else {
        printf("%s: ERROR: Tried to produce while buffer is full!!!\n");
    }
    printQueue();
}

static int consume(void) {
    int ret;
    if (shouldConsume()) {
        slot_t *deletedSlot = startSlot;
        startSlot = deletedSlot->next;
        int deletedNum = deletedSlot->num;
        free(deletedSlot);
        deletedSlot = NULL;
        bufferSize--;
        ret = deletedNum;
    } else {
        printf("%s: ERROR: Tried to consume from an empty buffer!!!\n");
        ret = -1;
    }
    printQueue();
    return ret;
}

static void* produceThread(void* args) {
    while (1) {
        int num = rand() % RAND_OUTPUT_MAX;

        sem_wait(emptySemaphore);
        
        pthread_mutex_lock(&bufferMutex);
        if (shouldProduce()) {
            produce(num);
            printf("%s: Produced %d, size: %d\n", loggingPrefix, num, bufferSize);
        } else {
            printf("%s: Producer skipping %d, size: %d\n", loggingPrefix, num, bufferSize);
        }
        pthread_mutex_unlock(&bufferMutex);
        
        sem_post(fullSemaphore);
    }
}

static void* consumeThread(void* args) {
    while (1) {
        sem_wait(fullSemaphore);

        pthread_mutex_lock(&bufferMutex);
        if (shouldConsume()) {
            int num = consume();
            printf("%s: Consumed %d, size: %d\n", loggingPrefix, num, bufferSize);
        } else {
            printf("%s: Consumer waiting, buffer empty, size: %d\n", loggingPrefix, bufferSize);
        }
        pthread_mutex_unlock(&bufferMutex);

        sem_post(emptySemaphore);
    }
}

#define EMPTY_SEMAPHORE_ID "/emptySemaphore"
#define FULL_SEMAPHORE_ID "/fullSemaphore"

int main(int argc, const char * argv[]) {
    srand((uint32_t)time(NULL));
    pthread_t producer;
    pthread_t consumer;
    
    // Create the semaphore
    if((emptySemaphore = sem_open(EMPTY_SEMAPHORE_ID, O_CREAT, 0644, MAX_BUFFER_SLOTS)) == SEM_FAILED ||
       (fullSemaphore = sem_open(FULL_SEMAPHORE_ID, O_CREAT, 0644, 0)) == SEM_FAILED) {
        perror("sem_open");
        exit(EXIT_FAILURE);
    }
    
    // Create the threads
    if (pthread_create(&producer, NULL, &produceThread, NULL) != 0) {
        printf("%s: Failed to create producer\n", loggingPrefix);
    }
    if (pthread_create(&consumer, NULL, &consumeThread, NULL) != 0) {
        printf("%s: Failed to create consumer\n", loggingPrefix);
    }
    
    // Wait until the threads finish to exit this function
    if (pthread_join(producer, NULL) != 0) {
        printf("%s: Failed to join producer thread\n", loggingPrefix);
    }
    if (pthread_join(consumer, NULL) != 0) {
        printf("%s: Failed to join consumer thread\n", loggingPrefix);
    }
    
    // Destroy the threads
    if (sem_close(emptySemaphore) == -1 || sem_close(fullSemaphore) == -1) {
        perror("sem_close");
        exit(EXIT_FAILURE);
    }
    
    if (sem_unlink(EMPTY_SEMAPHORE_ID) == -1 || sem_unlink(FULL_SEMAPHORE_ID) == -1) {
        perror("sem_unlink");
        exit(EXIT_FAILURE);
    }

    return 0;
}

这是使用Smahaphore.h库的输出的片段,并且是缓冲区的队列:

Multithreading: [ 2 ]
Multithreading: Produced 2, size: 1
Multithreading: [ 2 2 ]
Multithreading: Produced 2, size: 2
Multithreading: [ 2 2 22 ]
Multithreading: Produced 22, size: 2
Multithreading: [ 2 22 0 ]
Multithreading: Produced 0, size: 3
Multithreading: [ 2 22 0 ]
Multithreading: Consumed 2, size: 3
Multithreading: [ 22 0 ]
Multithreading: Consumed 2, size: 2
Multithreading: [ -559120240 Multithreading: [ 1953394534 0 35 ]
Multithreading: Consumed 22, size: 2
Multithreading: [ 35 ]
Multithreading: Consumed 0, size: 1

编辑:我从字面上复制和粘贴了支持乌克兰的解决方案( ideOne.com/apxf6z )进入我的Xcode,它仍然在exit(1)中击中printquequequeue ()函数。可能是由于Xcode造成的,或者我在Mac(而不是Windows/linux)上运行它的事实导致该程序的表现很奇怪?

输出:

Multithreading: [ 8 ]
Multithreading: Produced 8, size: 1 cnt: 1
Multithreading: [ 8 8 ]
Multithreading: Produced 8, size: 2 cnt: 2
Multithreading: [ 8 8 37 ]
Multithreading: Produced 37, size: 2 cnt: 3
Multithreading: [ 8 37 42 ]
Multithreading: Produced 42, size: 3 cnt: 4
Multithreading: [ 8 37 42 34 ]
Multithreading: Produced 34, size: 4 cnt: 5
Multithreading: [ 8 37 42 34 33 ]
Multithreading: Produced 33, size: 5 cnt: 6
Multithreading: [ 8 37 42 34 33 15 ]
Multithreading: Consumed 8, size: 6 cnt: 1
Multithreading: [ 37 42 34 33 15 ]
Multithreading: Consumed 8, size: 5 cnt: 2
Multithreading: [ -559054752 Multithreading: [ 42 34 33 15 ]
Multithreading: Consumed 37, size: 4 cnt: 3
Multithreading: [ 34 33 15 ]
Multithreading: Consumed 42, size: 3 cnt: 4
Multithreading: [ 33 15 ]
Multithreading: Consumed 34, size: 2 cnt: 5
Multithreading: [ 15 ]
Multithreading: Consumed 33, size: 1 cnt: 6
Program ended with exit code: 1

I created a small program where a producer thread creates a random number and pushes it to a stack, then a consumer thread pops the stack and prints out the value to the console. I created 2 semaphores, one semaphore to signal when buffer slots are empty (initialized to MAX_BUFFER_SLOTS) and another one to signal when buffer slots are full (initialized to 0). The consumer waits on the fullSemaphore, and the producer waits on the emptySemaphore. After the consumer/producer threads are signaled, they modify the buffer accordingly using a bufferMutex. With the semaphores, the consumer should never try to "consume" unless there is at least one full slot, and the producer should never try to "produce" unless there is at least one empty slot. This seems to be working fine using a stack as my buffer.

The issue is when I try to implement the buffer as a queue. I always hit an exception in printQueue() when it tries to print currentSlot->num. Based on the printed output (see output below), when the consumer consumes 22 from the buffer, I think it should be setting the size to 1, not 2 again? And so I think this is messing up the bufferSize to be one larger than it actually is, which later causes an invalid access. Not too sure though. I can't tell if I have a bug (I went through the logic several times and don't see anything wrong with it). Or maybe the semaphores somehow are not working properly? I tried using different semaphore libraries, but it doesn't seem to make a difference. Not sure if it affects, but I am running my program via Xcode on my Mac.

Here is my program:

#include <time.h>
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <semaphore.h>

static const char* loggingPrefix = "Multithreading";

#define RAND_OUTPUT_MAX 50
#define MAX_BUFFER_SLOTS 15

// Buffer, implemented as queue
typedef struct slot {
    int num;
    struct slot *next;
}slot_t;
static slot_t* startSlot = NULL;
static slot_t* endSlot = NULL;
static int bufferSize = 0;

// Buffer mutex to make sure only one thread accesses buffer variables at a time
static pthread_mutex_t bufferMutex;

// Buffer empty/full semaphores to signal to producer/consumer when to run
static sem_t *emptySemaphore;
static sem_t *fullSemaphore;

static int shouldProduce(void) {
    return bufferSize < MAX_BUFFER_SLOTS;
}

static int shouldConsume(void) {
    return bufferSize > 0;
}

static void printQueue(void) {
    slot_t *currentSlot = startSlot;
    printf("%s: [ ", loggingPrefix);
    while (currentSlot != NULL) {
        printf("%d ", currentSlot->num);
        currentSlot = currentSlot->next;
    }
    printf("]\n");
}

static void produce(int num) {
    if (shouldProduce()) {
        slot_t *newSlot = (slot_t *)malloc(sizeof(slot_t));
        newSlot->num = num;
        newSlot->next = NULL;
        if (bufferSize == 0) {
            startSlot = newSlot;
            endSlot = newSlot;
        } else {
            endSlot->next = newSlot;
            endSlot = newSlot;
        }
        bufferSize++;
    } else {
        printf("%s: ERROR: Tried to produce while buffer is full!!!\n");
    }
    printQueue();
}

static int consume(void) {
    int ret;
    if (shouldConsume()) {
        slot_t *deletedSlot = startSlot;
        startSlot = deletedSlot->next;
        int deletedNum = deletedSlot->num;
        free(deletedSlot);
        deletedSlot = NULL;
        bufferSize--;
        ret = deletedNum;
    } else {
        printf("%s: ERROR: Tried to consume from an empty buffer!!!\n");
        ret = -1;
    }
    printQueue();
    return ret;
}

static void* produceThread(void* args) {
    while (1) {
        int num = rand() % RAND_OUTPUT_MAX;

        sem_wait(emptySemaphore);
        
        pthread_mutex_lock(&bufferMutex);
        if (shouldProduce()) {
            produce(num);
            printf("%s: Produced %d, size: %d\n", loggingPrefix, num, bufferSize);
        } else {
            printf("%s: Producer skipping %d, size: %d\n", loggingPrefix, num, bufferSize);
        }
        pthread_mutex_unlock(&bufferMutex);
        
        sem_post(fullSemaphore);
    }
}

static void* consumeThread(void* args) {
    while (1) {
        sem_wait(fullSemaphore);

        pthread_mutex_lock(&bufferMutex);
        if (shouldConsume()) {
            int num = consume();
            printf("%s: Consumed %d, size: %d\n", loggingPrefix, num, bufferSize);
        } else {
            printf("%s: Consumer waiting, buffer empty, size: %d\n", loggingPrefix, bufferSize);
        }
        pthread_mutex_unlock(&bufferMutex);

        sem_post(emptySemaphore);
    }
}

#define EMPTY_SEMAPHORE_ID "/emptySemaphore"
#define FULL_SEMAPHORE_ID "/fullSemaphore"

int main(int argc, const char * argv[]) {
    srand((uint32_t)time(NULL));
    pthread_t producer;
    pthread_t consumer;
    
    // Create the semaphore
    if((emptySemaphore = sem_open(EMPTY_SEMAPHORE_ID, O_CREAT, 0644, MAX_BUFFER_SLOTS)) == SEM_FAILED ||
       (fullSemaphore = sem_open(FULL_SEMAPHORE_ID, O_CREAT, 0644, 0)) == SEM_FAILED) {
        perror("sem_open");
        exit(EXIT_FAILURE);
    }
    
    // Create the threads
    if (pthread_create(&producer, NULL, &produceThread, NULL) != 0) {
        printf("%s: Failed to create producer\n", loggingPrefix);
    }
    if (pthread_create(&consumer, NULL, &consumeThread, NULL) != 0) {
        printf("%s: Failed to create consumer\n", loggingPrefix);
    }
    
    // Wait until the threads finish to exit this function
    if (pthread_join(producer, NULL) != 0) {
        printf("%s: Failed to join producer thread\n", loggingPrefix);
    }
    if (pthread_join(consumer, NULL) != 0) {
        printf("%s: Failed to join consumer thread\n", loggingPrefix);
    }
    
    // Destroy the threads
    if (sem_close(emptySemaphore) == -1 || sem_close(fullSemaphore) == -1) {
        perror("sem_close");
        exit(EXIT_FAILURE);
    }
    
    if (sem_unlink(EMPTY_SEMAPHORE_ID) == -1 || sem_unlink(FULL_SEMAPHORE_ID) == -1) {
        perror("sem_unlink");
        exit(EXIT_FAILURE);
    }

    return 0;
}

Here is a snippet of the output using the semaphore.h library and a queue for the buffer:

Multithreading: [ 2 ]
Multithreading: Produced 2, size: 1
Multithreading: [ 2 2 ]
Multithreading: Produced 2, size: 2
Multithreading: [ 2 2 22 ]
Multithreading: Produced 22, size: 2
Multithreading: [ 2 22 0 ]
Multithreading: Produced 0, size: 3
Multithreading: [ 2 22 0 ]
Multithreading: Consumed 2, size: 3
Multithreading: [ 22 0 ]
Multithreading: Consumed 2, size: 2
Multithreading: [ -559120240 Multithreading: [ 1953394534 0 35 ]
Multithreading: Consumed 22, size: 2
Multithreading: [ 35 ]
Multithreading: Consumed 0, size: 1

EDIT: I literally copied and pasted Support Ukraine's solution (ideone.com/apXf6Z) into my Xcode and it still hit the exit(1) in the printQueue() function. Could it be due to Xcode, or the fact that I am running this on a Mac (not Windows/Linux) that is causing the program to behave weird like this?

Output:

Multithreading: [ 8 ]
Multithreading: Produced 8, size: 1 cnt: 1
Multithreading: [ 8 8 ]
Multithreading: Produced 8, size: 2 cnt: 2
Multithreading: [ 8 8 37 ]
Multithreading: Produced 37, size: 2 cnt: 3
Multithreading: [ 8 37 42 ]
Multithreading: Produced 42, size: 3 cnt: 4
Multithreading: [ 8 37 42 34 ]
Multithreading: Produced 34, size: 4 cnt: 5
Multithreading: [ 8 37 42 34 33 ]
Multithreading: Produced 33, size: 5 cnt: 6
Multithreading: [ 8 37 42 34 33 15 ]
Multithreading: Consumed 8, size: 6 cnt: 1
Multithreading: [ 37 42 34 33 15 ]
Multithreading: Consumed 8, size: 5 cnt: 2
Multithreading: [ -559054752 Multithreading: [ 42 34 33 15 ]
Multithreading: Consumed 37, size: 4 cnt: 3
Multithreading: [ 34 33 15 ]
Multithreading: Consumed 42, size: 3 cnt: 4
Multithreading: [ 33 15 ]
Multithreading: Consumed 34, size: 2 cnt: 5
Multithreading: [ 15 ]
Multithreading: Consumed 33, size: 1 cnt: 6
Program ended with exit code: 1

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文