生产者/消费者堆栈中竞争条件的原因

发布于 2024-12-20 11:10:46 字数 4225 浏览 3 评论 0原文

我尝试创建一个基于通知事件的生产者-消费者堆栈,该堆栈允许单个线程推送数据,另一个线程弹出数据。

当缓冲区已满/空时,一个线程会等待另一个线程,直到它能够继续。

我正在检测竞争条件(程序在我标记 ***ERROR HERE*** 的地方中断),但我不明白为什么会发生这种情况。

在此程序中,size 如何高于capacity

#include <process.h>
#include <cstdlib>
#include <vector>
#include <windows.h>

template<typename T, typename Ax = std::allocator<T> >
class rwstack
{
    // It is assumed that only ONE thread will push data
    //   and only ONE thread will pop data.

public:
    typedef T value_type;
    typedef Ax allocator_type;
    typedef rwstack<value_type, allocator_type> this_type;
    typedef std::vector<value_type, allocator_type> container_type;

private:
    allocator_type allocator;
    value_type *items;
    size_t volatile count;
    size_t const capacity;
    HANDLE hEventNotEmpty, hEventNotFull;
    rwstack(const this_type &other) { __debugbreak(); /*Don't allow*/ }

public:
    rwstack(const size_t capacity = 4096)
        : allocator(allocator_type()),
        items(allocator.allocate(capacity, NULL)),
        count(0), capacity(capacity),
        hEventNotEmpty(CreateEvent(NULL, TRUE, FALSE, NULL)),
        hEventNotFull(CreateEvent(NULL, TRUE, TRUE, NULL)) { }

    virtual ~rwstack()  // Not actually used in the example
    {
        CloseHandle(hEventNotEmpty);
        CloseHandle(hEventNotFull);
        for (size_t i = 0; i < count; i++)
        { allocator.destroy(&items[InterlockedDecrementSizeT(&count) - i]); }
        allocator.deallocate(items, capacity);
    }

    value_type &push(const value_type &value)
    {
        const ULONG waitResult = WaitForSingleObject(hEventNotFull, INFINITE);
        if (waitResult != WAIT_OBJECT_0) { __debugbreak(); }
        const size_t newSize = InterlockedIncrementSizeT(&count);
        try
        {
            if (newSize > capacity) { __debugbreak(); }  // ****ERROR HERE****
            if (newSize >= capacity) { ResetEvent(hEventNotFull); }
            allocator.construct(&items[newSize - 1], value);
            SetEvent(hEventNotEmpty);
            return items[newSize - 1];
        }
        catch (...) { InterlockedDecrementSizeT(&count); throw; }
    }

    void pop(value_type *pValue = NULL)
    {
        const ULONG waitResult = WaitForSingleObject(hEventNotEmpty, INFINITE);
        if (waitResult != WAIT_OBJECT_0) { __debugbreak(); }
        const size_t newSize = InterlockedDecrementSizeT(&count);
        try
        {
            if (newSize > capacity) { __debugbreak(); }  // ****ERROR HERE****
            if (newSize <= 0) { ResetEvent(hEventNotEmpty); }
            if (pValue != NULL) { *pValue = items[newSize]; }
            allocator.destroy(&items[newSize]);
            SetEvent(hEventNotFull);
        }
        catch (...) { InterlockedIncrementSizeT(&count); throw; }
    }
};

static size_t InterlockedIncrementSizeT(size_t volatile *p)
{
#if _M_X64
    return InterlockedIncrement64(reinterpret_cast<long long volatile *>(p));
#elif _M_IX86
    return InterlockedIncrement(reinterpret_cast<long volatile *>(p));
#endif
}

static size_t InterlockedDecrementSizeT(size_t volatile *p)
{
#if _M_X64
    return InterlockedDecrement64(reinterpret_cast<long long volatile *>(p));
#elif _M_IX86
    return InterlockedDecrement(reinterpret_cast<long volatile *>(p));
#endif
}

测试代码:

typedef rwstack<int> TTestStack;

void __cdecl testPush(void *context)
{
    TTestStack::value_type v;
    for (;;)
        static_cast<TTestStack *>(context)->pop(&v);
}

void __cdecl testPop(void *context)
{
    for (TTestStack::value_type v = 0; ; v++)
        static_cast<TTestStack *>(context)->push(v);
}

int main()
{
    TTestStack rw;
    HANDLE hThreads[2] = {
        reinterpret_cast<HANDLE>(_beginthread(&testPush, 0, &rw)),
        reinterpret_cast<HANDLE>(_beginthread(&testPop,  0, &rw)),
    };
    const ULONG nThreads = sizeof(hThreads) / sizeof(*hThreads)
    WaitForMultipleObjects(nThreads, hThreads, TRUE, INFINITE);
    return 0;
}

I've tried to create a producer-consumer stack, based on notification events, that would allow a single thread to push data, and another thread to pop data.

When the buffer is full/empty, one thread waits for another until it is able to continue.

I'm detecting a race condition (the program breaks where I have marked ***ERROR HERE***) but I don't understand why it can happen.

How can size go higher than capacity in this program?

#include <process.h>
#include <cstdlib>
#include <vector>
#include <windows.h>

template<typename T, typename Ax = std::allocator<T> >
class rwstack
{
    // It is assumed that only ONE thread will push data
    //   and only ONE thread will pop data.

public:
    typedef T value_type;
    typedef Ax allocator_type;
    typedef rwstack<value_type, allocator_type> this_type;
    typedef std::vector<value_type, allocator_type> container_type;

private:
    allocator_type allocator;
    value_type *items;
    size_t volatile count;
    size_t const capacity;
    HANDLE hEventNotEmpty, hEventNotFull;
    rwstack(const this_type &other) { __debugbreak(); /*Don't allow*/ }

public:
    rwstack(const size_t capacity = 4096)
        : allocator(allocator_type()),
        items(allocator.allocate(capacity, NULL)),
        count(0), capacity(capacity),
        hEventNotEmpty(CreateEvent(NULL, TRUE, FALSE, NULL)),
        hEventNotFull(CreateEvent(NULL, TRUE, TRUE, NULL)) { }

    virtual ~rwstack()  // Not actually used in the example
    {
        CloseHandle(hEventNotEmpty);
        CloseHandle(hEventNotFull);
        for (size_t i = 0; i < count; i++)
        { allocator.destroy(&items[InterlockedDecrementSizeT(&count) - i]); }
        allocator.deallocate(items, capacity);
    }

    value_type &push(const value_type &value)
    {
        const ULONG waitResult = WaitForSingleObject(hEventNotFull, INFINITE);
        if (waitResult != WAIT_OBJECT_0) { __debugbreak(); }
        const size_t newSize = InterlockedIncrementSizeT(&count);
        try
        {
            if (newSize > capacity) { __debugbreak(); }  // ****ERROR HERE****
            if (newSize >= capacity) { ResetEvent(hEventNotFull); }
            allocator.construct(&items[newSize - 1], value);
            SetEvent(hEventNotEmpty);
            return items[newSize - 1];
        }
        catch (...) { InterlockedDecrementSizeT(&count); throw; }
    }

    void pop(value_type *pValue = NULL)
    {
        const ULONG waitResult = WaitForSingleObject(hEventNotEmpty, INFINITE);
        if (waitResult != WAIT_OBJECT_0) { __debugbreak(); }
        const size_t newSize = InterlockedDecrementSizeT(&count);
        try
        {
            if (newSize > capacity) { __debugbreak(); }  // ****ERROR HERE****
            if (newSize <= 0) { ResetEvent(hEventNotEmpty); }
            if (pValue != NULL) { *pValue = items[newSize]; }
            allocator.destroy(&items[newSize]);
            SetEvent(hEventNotFull);
        }
        catch (...) { InterlockedIncrementSizeT(&count); throw; }
    }
};

static size_t InterlockedIncrementSizeT(size_t volatile *p)
{
#if _M_X64
    return InterlockedIncrement64(reinterpret_cast<long long volatile *>(p));
#elif _M_IX86
    return InterlockedIncrement(reinterpret_cast<long volatile *>(p));
#endif
}

static size_t InterlockedDecrementSizeT(size_t volatile *p)
{
#if _M_X64
    return InterlockedDecrement64(reinterpret_cast<long long volatile *>(p));
#elif _M_IX86
    return InterlockedDecrement(reinterpret_cast<long volatile *>(p));
#endif
}

Test code:

typedef rwstack<int> TTestStack;

void __cdecl testPush(void *context)
{
    TTestStack::value_type v;
    for (;;)
        static_cast<TTestStack *>(context)->pop(&v);
}

void __cdecl testPop(void *context)
{
    for (TTestStack::value_type v = 0; ; v++)
        static_cast<TTestStack *>(context)->push(v);
}

int main()
{
    TTestStack rw;
    HANDLE hThreads[2] = {
        reinterpret_cast<HANDLE>(_beginthread(&testPush, 0, &rw)),
        reinterpret_cast<HANDLE>(_beginthread(&testPop,  0, &rw)),
    };
    const ULONG nThreads = sizeof(hThreads) / sizeof(*hThreads)
    WaitForMultipleObjects(nThreads, hThreads, TRUE, INFINITE);
    return 0;
}

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

稚然 2024-12-27 11:10:46

您没有锁定正确的操作

这里的关键是,当您在线程 A 中禁用 hEventNotFull 事件时,您也在线程 B 中启用它。

线程并发运行

因此,发生的情况如下:

  1. < p>队列已满,有 4096 个项目。

  2. 线程 B 获得锁并将计数递减至 4095。您需要持有此锁,直到您决定是否启用 hEventNotFull,但您立即释放它。操作系统暂停线程 B 一会儿。

  3. 线程 A 获得锁并将 count 增加到 4096。您需要持有此锁,直到您决定是否重置 hEventNotFull,但您立即释放它。

  4. 操作系统认为线程 B 比线程 A 更重要。

  5. 因此,您最终会调用 resetEvent线程 A 后跟线程 B 中的 SetEvent。最终结果是您将返回到线程 A 中执行,并且计数 == 4096。

执行流程:

Thread B: Get count and decrement it to 4095.  # Queue not full
Thread A: Get count and increment it to 4096.  # Queue full
Thread A: ResetEvent on `hEventNotFull`        # A thinks it will block since queue is full
Thread B: SetEvent on `hEventNotFull`          # B is using stale info and unblocks A

You aren't locking the correct operation

The key here is that while you are disabling the hEventNotFull event in Thread A, you are also enabling it in Thread B.

Threads Run Concurrently

So here's what is happening:

  1. The queue is full at 4096 items.

  2. Thread B obtains the lock and decrements the count to 4095. You need to hold this lock until you decide whether or not to enable hEventNotFull, but you immediately release it. The OS pauses Thread B for a moment.

  3. Thread A obtains the lock and increments count to 4096. You need to hold this lock until you decide whether or not to reset hEventNotFull, but you immediately release it.

  4. The OS decides that Thread B is more important than Thread A.

  5. So you wind up calling resetEvent in Thread A followed by SetEvent in Thread B. Net result is that you'll return to execution in Thread A and count == 4096.

Flow of Execution:

Thread B: Get count and decrement it to 4095.  # Queue not full
Thread A: Get count and increment it to 4096.  # Queue full
Thread A: ResetEvent on `hEventNotFull`        # A thinks it will block since queue is full
Thread B: SetEvent on `hEventNotFull`          # B is using stale info and unblocks A
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文