阻塞队列竞争条件?

发布于 2024-12-25 03:20:02 字数 4565 浏览 3 评论 0原文

我正在尝试实现一个高性能阻塞队列,由 pthreads、semaphore.h 和 gcc 原子内置函数之上的循环缓冲区支持。队列需要处理来自不同线程的多个同时读取器和写入器。

我已经隔离了某种竞争条件,并且我不确定这是否是对某些原子操作和信号量的行为的错误假设,或者我的设计是否存在根本缺陷。

我将其提取并简化为下面的独立示例。我希望这个程序永远不会返回。然而,在数十万次迭代后,它确实会返回,并在队列中检测到损坏。

在下面的示例中(为了说明),它实际上并不存储任何内容,它只是将保存实际数据的单元格设置为 1,将单元格设置为 0 表示空单元格。有一个计数信号量(空缺)代表空单元格数量,另一个计数信号量(占用)代表占用单元格数量。

写入者执行以下操作:

  1. 减少空位
  2. 原子地获取下一个头索引(mod 队列大小)
  3. 写入它
  4. 增加占用者

读者执行相反的操作:

  1. 减少占用者
  2. 原子地获取下一个尾部索引(mod 队列大小)
  3. 从中读取
  4. 增加空缺

我希望给定如上所述,只有一个线程可以同时读取或写入任何给定单元。

任何关于它为什么不起作用或调试策略的想法都值得赞赏。代码和输出如下...

#include <stdlib.h>
#include <semaphore.h>
#include <iostream>

using namespace std;

#define QUEUE_CAPACITY 8 // must be power of 2
#define NUM_THREADS 2

struct CountingSemaphore
{
    sem_t m;
    CountingSemaphore(unsigned int initial) { sem_init(&m, 0, initial); }
    void post() { sem_post(&m); }
    void wait() { sem_wait(&m); }
    ~CountingSemaphore() { sem_destroy(&m); }
};

struct BlockingQueue
{
    unsigned int head; // (head % capacity) is next head position
    unsigned int tail; // (tail % capacity) is next tail position
    CountingSemaphore vacancies; // how many cells are vacant
    CountingSemaphore occupants; // how many cells are occupied

    int cell[QUEUE_CAPACITY];
// (cell[x] == 1) means occupied
// (cell[x] == 0) means vacant

    BlockingQueue() :
        head(0),
        tail(0),
        vacancies(QUEUE_CAPACITY),
        occupants(0)
    {
        for (size_t i = 0; i < QUEUE_CAPACITY; i++)
            cell[i] = 0;
    }

    // put an item in the queue
    void put()
    {
        vacancies.wait();

        // atomic post increment
        set(__sync_fetch_and_add(&head, 1) % QUEUE_CAPACITY);

        occupants.post();
    }

    // take an item from the queue
    void take()
    {
        occupants.wait();

        // atomic post increment
        get(__sync_fetch_and_add(&tail, 1) % QUEUE_CAPACITY);

        vacancies.post();
    }

    // set cell i
    void set(unsigned int i)
    {
        // atomic compare and assign
        if (!__sync_bool_compare_and_swap(&cell[i], 0, 1))
        {
            corrupt("set", i);
            exit(-1);
        }
    }

    // get cell i
    void get(unsigned int i)
    {
        // atomic compare and assign
        if (!__sync_bool_compare_and_swap(&cell[i], 1, 0))
        {
            corrupt("get", i);
            exit(-1);
        }
    }

    // corruption detected
    void corrupt(const char* action, unsigned int i)
    {
        static CountingSemaphore sem(1);
        sem.wait();

        cerr << "corruption detected" << endl;
        cerr << "action = " << action << endl;
        cerr << "i = " << i << endl;
        cerr << "head = " << head << endl;
        cerr << "tail = " << tail << endl;

        for (unsigned int j = 0; j < QUEUE_CAPACITY; j++)
            cerr << "cell[" << j << "] = " << cell[j] << endl;
    }
};

BlockingQueue q;

// keep posting to the queue forever
void* Source(void*)
{
    while (true)
        q.put();

    return 0;
}

// keep taking from the queue forever
void* Sink(void*)
{
    while (true)
        q.take();

    return 0;
} 

int main()
{
    pthread_t id;

    // start some pthreads to run Source function
    for (int i = 0; i < NUM_THREADS; i++)
        if (pthread_create(&id, NULL, &Source, 0))
            abort();

    // start some pthreads to run Sink function
    for (int i = 0; i < NUM_THREADS; i++)
        if (pthread_create(&id, NULL, &Sink, 0))
            abort();

    while (true);
}

按如下方式编译上述内容:

    $ g++ -pthread AboveCode.cpp
    $ ./a.out

每次输出都不同,但这里有一个示例:

    corruption detected
    action = get
    i = 6
    head = 122685
    tail = 122685
    cell[0] = 0
    cell[1] = 0
    cell[2] = 1
    cell[3] = 0
    cell[4] = 1
    cell[5] = 0
    cell[6] = 1
    cell[7] = 1

我的系统是 Intel Core 2 上的 Ubuntu 11.10:

    $ uname -a
    Linux 3.0.0-14-generic #23-Ubuntu SMP \
      Mon Nov 21 20:28:43 UTC 2011 x86_64 x86_64 x86_64 GNU/Linux
    $ cat /proc/cpuinfo | grep Intel
    model name : Intel(R) Core(TM)2 Quad  CPU   Q9300  @ 2.50GHz
    $ g++ --version
    g++ (Ubuntu/Linaro 4.6.1-9ubuntu3) 4.6.1

谢谢, 安德鲁.

I'm trying to implement a high performance blocking queue backed by a circular buffer on top of pthreads, semaphore.h and gcc atomic builtins. The queue needs to handle multiple simulataneous readers and writers from different threads.

I've isolated some sort of race condition, and I'm not sure if it's a faulty assumption about the behavior of some of the atomic operations and semaphores, or whether my design is fundamentally flawed.

I've extracted and simplified it to the below standalone example. I would expect that this program never returns. It does however return after a few hundred thousand iterations with corruption detected in the queue.

In the below example (for exposition) it doesn't actually store anything, it just sets to 1 a cell that would hold the actual data, and 0 to represent an empty cell. There is a counting semaphore (vacancies) representing the number of vacant cells, and another counting semaphore (occupants) representing the number of occupied cells.

Writers do the following:

  1. decrement vacancies
  2. atomically get next head index (mod queue size)
  3. write to it
  4. increment occupants

Readers do the opposite:

  1. decrement occupants
  2. atomically get next tail index (mod queue size)
  3. read from it
  4. increment vacancies

I would expect that given the above, precisely one thread can be reading or writing any given cell at one time.

Any ideas about why it doesn't work or debugging strategies appreciated. Code and output below...

#include <stdlib.h>
#include <semaphore.h>
#include <iostream>

using namespace std;

#define QUEUE_CAPACITY 8 // must be power of 2
#define NUM_THREADS 2

struct CountingSemaphore
{
    sem_t m;
    CountingSemaphore(unsigned int initial) { sem_init(&m, 0, initial); }
    void post() { sem_post(&m); }
    void wait() { sem_wait(&m); }
    ~CountingSemaphore() { sem_destroy(&m); }
};

struct BlockingQueue
{
    unsigned int head; // (head % capacity) is next head position
    unsigned int tail; // (tail % capacity) is next tail position
    CountingSemaphore vacancies; // how many cells are vacant
    CountingSemaphore occupants; // how many cells are occupied

    int cell[QUEUE_CAPACITY];
// (cell[x] == 1) means occupied
// (cell[x] == 0) means vacant

    BlockingQueue() :
        head(0),
        tail(0),
        vacancies(QUEUE_CAPACITY),
        occupants(0)
    {
        for (size_t i = 0; i < QUEUE_CAPACITY; i++)
            cell[i] = 0;
    }

    // put an item in the queue
    void put()
    {
        vacancies.wait();

        // atomic post increment
        set(__sync_fetch_and_add(&head, 1) % QUEUE_CAPACITY);

        occupants.post();
    }

    // take an item from the queue
    void take()
    {
        occupants.wait();

        // atomic post increment
        get(__sync_fetch_and_add(&tail, 1) % QUEUE_CAPACITY);

        vacancies.post();
    }

    // set cell i
    void set(unsigned int i)
    {
        // atomic compare and assign
        if (!__sync_bool_compare_and_swap(&cell[i], 0, 1))
        {
            corrupt("set", i);
            exit(-1);
        }
    }

    // get cell i
    void get(unsigned int i)
    {
        // atomic compare and assign
        if (!__sync_bool_compare_and_swap(&cell[i], 1, 0))
        {
            corrupt("get", i);
            exit(-1);
        }
    }

    // corruption detected
    void corrupt(const char* action, unsigned int i)
    {
        static CountingSemaphore sem(1);
        sem.wait();

        cerr << "corruption detected" << endl;
        cerr << "action = " << action << endl;
        cerr << "i = " << i << endl;
        cerr << "head = " << head << endl;
        cerr << "tail = " << tail << endl;

        for (unsigned int j = 0; j < QUEUE_CAPACITY; j++)
            cerr << "cell[" << j << "] = " << cell[j] << endl;
    }
};

BlockingQueue q;

// keep posting to the queue forever
void* Source(void*)
{
    while (true)
        q.put();

    return 0;
}

// keep taking from the queue forever
void* Sink(void*)
{
    while (true)
        q.take();

    return 0;
} 

int main()
{
    pthread_t id;

    // start some pthreads to run Source function
    for (int i = 0; i < NUM_THREADS; i++)
        if (pthread_create(&id, NULL, &Source, 0))
            abort();

    // start some pthreads to run Sink function
    for (int i = 0; i < NUM_THREADS; i++)
        if (pthread_create(&id, NULL, &Sink, 0))
            abort();

    while (true);
}

Compile the above as follows:

    $ g++ -pthread AboveCode.cpp
    $ ./a.out

The output is different every time, but here is one example:

    corruption detected
    action = get
    i = 6
    head = 122685
    tail = 122685
    cell[0] = 0
    cell[1] = 0
    cell[2] = 1
    cell[3] = 0
    cell[4] = 1
    cell[5] = 0
    cell[6] = 1
    cell[7] = 1

My system is Ubuntu 11.10 on Intel Core 2:

    $ uname -a
    Linux 3.0.0-14-generic #23-Ubuntu SMP \
      Mon Nov 21 20:28:43 UTC 2011 x86_64 x86_64 x86_64 GNU/Linux
    $ cat /proc/cpuinfo | grep Intel
    model name : Intel(R) Core(TM)2 Quad  CPU   Q9300  @ 2.50GHz
    $ g++ --version
    g++ (Ubuntu/Linaro 4.6.1-9ubuntu3) 4.6.1

Thanks,
Andrew.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(3

月下客 2025-01-01 03:20:02

一种可能的情况,针对两个写入器线程(W0、W1)和一个读取器线程(R0)逐步跟踪。 W0 早于 W1 进入 put(),被操作系统或硬件中断,并较晚完成。

        w0 (core 0)               w1 (core 1)                r0
t0         ----                      ---       blocked on occupants.wait() / take
t1      entered put()                ---                    ---         
t2      vacancies.wait()           entered put()            ---
t3      got new_head = 1           vacancies.wait()         ---
t4     <interrupted by OS>         got new_head = 2         ---
t5                                 written 1 at cell[2]     ---
t6                                 occupants.post();        ---
t7                                 exited put()            waked up
t8                                   ---               got new_tail = 1
t9     <still in interrupt>          ---    read 0 from ceil[1]  !! corruption !!
t10     written 1 at cell[1]                           
t11     occupants.post();
t12     exited put()

One of possible situations, traced step by step for two writer threads (W0, W1) and one reader thread (R0). W0 entered put() earlier than W1, was interrupted by OS or hardware and finished later.

        w0 (core 0)               w1 (core 1)                r0
t0         ----                      ---       blocked on occupants.wait() / take
t1      entered put()                ---                    ---         
t2      vacancies.wait()           entered put()            ---
t3      got new_head = 1           vacancies.wait()         ---
t4     <interrupted by OS>         got new_head = 2         ---
t5                                 written 1 at cell[2]     ---
t6                                 occupants.post();        ---
t7                                 exited put()            waked up
t8                                   ---               got new_tail = 1
t9     <still in interrupt>          ---    read 0 from ceil[1]  !! corruption !!
t10     written 1 at cell[1]                           
t11     occupants.post();
t12     exited put()
薄情伤 2025-01-01 03:20:02

从设计的角度来看,我会将整个队列视为共享资源,并使用单个互斥锁来保护它。

写入者执行以下操作:

  1. 将互斥体
  2. 写入队列(包括处理索引)
  3. 释放互斥体

读取器执行以下操作:

  1. 将互斥体
  2. 从队列中读取(包括处理索引)
  3. 释放互斥体

From a design point of view, I would consider the whole queue as a shared resource and protect it with a single mutex.

Writers do the following:

  1. take the mutex
  2. write to the queue (including handling of indexes)
  3. free the mutex

Readers do the following:

  1. take the mutex
  2. read from the queue (including handling of indexes)
  3. free the mutex
风铃鹿 2025-01-01 03:20:02

我有一个理论。这是一个循环队列,因此一个读取线程可能会被重叠。假设读取器获取索引 0。在执行任何操作之前,它会失去 CPU。另一个读取器线程获取索引 1,然后是 2,然后 3 ...然后是 7,然后是 0。第一个读取器醒来,两个线程都认为它们具有对索引 0 的独占访问权限。不确定如何证明这一点。希望有帮助。

I have a theory. It's a circular queue so one reading thread may be getting lapped. Say a reader takes index 0. Before it does anything it loses the CPU. Another reader thread takes index 1, then 2, then 3 ... then 7, then 0. The first reader wakes up and both threads think they have exclusive access to index 0. Not sure how to prove it. Hope that helps.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文