生产者/消费者堆栈中竞争条件的原因
我尝试创建一个基于通知事件的生产者-消费者堆栈,该堆栈允许单个线程推送数据,另一个线程弹出数据。
当缓冲区已满/空时,一个线程会等待另一个线程,直到它能够继续。
我正在检测竞争条件(程序在我标记 ***ERROR HERE***
的地方中断),但我不明白为什么会发生这种情况。
在此程序中,size
如何高于capacity
?
#include <process.h>
#include <cstdlib>
#include <vector>
#include <windows.h>
template<typename T, typename Ax = std::allocator<T> >
class rwstack
{
// It is assumed that only ONE thread will push data
// and only ONE thread will pop data.
public:
typedef T value_type;
typedef Ax allocator_type;
typedef rwstack<value_type, allocator_type> this_type;
typedef std::vector<value_type, allocator_type> container_type;
private:
allocator_type allocator;
value_type *items;
size_t volatile count;
size_t const capacity;
HANDLE hEventNotEmpty, hEventNotFull;
rwstack(const this_type &other) { __debugbreak(); /*Don't allow*/ }
public:
rwstack(const size_t capacity = 4096)
: allocator(allocator_type()),
items(allocator.allocate(capacity, NULL)),
count(0), capacity(capacity),
hEventNotEmpty(CreateEvent(NULL, TRUE, FALSE, NULL)),
hEventNotFull(CreateEvent(NULL, TRUE, TRUE, NULL)) { }
virtual ~rwstack() // Not actually used in the example
{
CloseHandle(hEventNotEmpty);
CloseHandle(hEventNotFull);
for (size_t i = 0; i < count; i++)
{ allocator.destroy(&items[InterlockedDecrementSizeT(&count) - i]); }
allocator.deallocate(items, capacity);
}
value_type &push(const value_type &value)
{
const ULONG waitResult = WaitForSingleObject(hEventNotFull, INFINITE);
if (waitResult != WAIT_OBJECT_0) { __debugbreak(); }
const size_t newSize = InterlockedIncrementSizeT(&count);
try
{
if (newSize > capacity) { __debugbreak(); } // ****ERROR HERE****
if (newSize >= capacity) { ResetEvent(hEventNotFull); }
allocator.construct(&items[newSize - 1], value);
SetEvent(hEventNotEmpty);
return items[newSize - 1];
}
catch (...) { InterlockedDecrementSizeT(&count); throw; }
}
void pop(value_type *pValue = NULL)
{
const ULONG waitResult = WaitForSingleObject(hEventNotEmpty, INFINITE);
if (waitResult != WAIT_OBJECT_0) { __debugbreak(); }
const size_t newSize = InterlockedDecrementSizeT(&count);
try
{
if (newSize > capacity) { __debugbreak(); } // ****ERROR HERE****
if (newSize <= 0) { ResetEvent(hEventNotEmpty); }
if (pValue != NULL) { *pValue = items[newSize]; }
allocator.destroy(&items[newSize]);
SetEvent(hEventNotFull);
}
catch (...) { InterlockedIncrementSizeT(&count); throw; }
}
};
static size_t InterlockedIncrementSizeT(size_t volatile *p)
{
#if _M_X64
return InterlockedIncrement64(reinterpret_cast<long long volatile *>(p));
#elif _M_IX86
return InterlockedIncrement(reinterpret_cast<long volatile *>(p));
#endif
}
static size_t InterlockedDecrementSizeT(size_t volatile *p)
{
#if _M_X64
return InterlockedDecrement64(reinterpret_cast<long long volatile *>(p));
#elif _M_IX86
return InterlockedDecrement(reinterpret_cast<long volatile *>(p));
#endif
}
测试代码:
typedef rwstack<int> TTestStack;
void __cdecl testPush(void *context)
{
TTestStack::value_type v;
for (;;)
static_cast<TTestStack *>(context)->pop(&v);
}
void __cdecl testPop(void *context)
{
for (TTestStack::value_type v = 0; ; v++)
static_cast<TTestStack *>(context)->push(v);
}
int main()
{
TTestStack rw;
HANDLE hThreads[2] = {
reinterpret_cast<HANDLE>(_beginthread(&testPush, 0, &rw)),
reinterpret_cast<HANDLE>(_beginthread(&testPop, 0, &rw)),
};
const ULONG nThreads = sizeof(hThreads) / sizeof(*hThreads)
WaitForMultipleObjects(nThreads, hThreads, TRUE, INFINITE);
return 0;
}
I've tried to create a producer-consumer stack, based on notification events, that would allow a single thread to push data, and another thread to pop data.
When the buffer is full/empty, one thread waits for another until it is able to continue.
I'm detecting a race condition (the program breaks where I have marked ***ERROR HERE***
) but I don't understand why it can happen.
How can size
go higher than capacity
in this program?
#include <process.h>
#include <cstdlib>
#include <vector>
#include <windows.h>
template<typename T, typename Ax = std::allocator<T> >
class rwstack
{
// It is assumed that only ONE thread will push data
// and only ONE thread will pop data.
public:
typedef T value_type;
typedef Ax allocator_type;
typedef rwstack<value_type, allocator_type> this_type;
typedef std::vector<value_type, allocator_type> container_type;
private:
allocator_type allocator;
value_type *items;
size_t volatile count;
size_t const capacity;
HANDLE hEventNotEmpty, hEventNotFull;
rwstack(const this_type &other) { __debugbreak(); /*Don't allow*/ }
public:
rwstack(const size_t capacity = 4096)
: allocator(allocator_type()),
items(allocator.allocate(capacity, NULL)),
count(0), capacity(capacity),
hEventNotEmpty(CreateEvent(NULL, TRUE, FALSE, NULL)),
hEventNotFull(CreateEvent(NULL, TRUE, TRUE, NULL)) { }
virtual ~rwstack() // Not actually used in the example
{
CloseHandle(hEventNotEmpty);
CloseHandle(hEventNotFull);
for (size_t i = 0; i < count; i++)
{ allocator.destroy(&items[InterlockedDecrementSizeT(&count) - i]); }
allocator.deallocate(items, capacity);
}
value_type &push(const value_type &value)
{
const ULONG waitResult = WaitForSingleObject(hEventNotFull, INFINITE);
if (waitResult != WAIT_OBJECT_0) { __debugbreak(); }
const size_t newSize = InterlockedIncrementSizeT(&count);
try
{
if (newSize > capacity) { __debugbreak(); } // ****ERROR HERE****
if (newSize >= capacity) { ResetEvent(hEventNotFull); }
allocator.construct(&items[newSize - 1], value);
SetEvent(hEventNotEmpty);
return items[newSize - 1];
}
catch (...) { InterlockedDecrementSizeT(&count); throw; }
}
void pop(value_type *pValue = NULL)
{
const ULONG waitResult = WaitForSingleObject(hEventNotEmpty, INFINITE);
if (waitResult != WAIT_OBJECT_0) { __debugbreak(); }
const size_t newSize = InterlockedDecrementSizeT(&count);
try
{
if (newSize > capacity) { __debugbreak(); } // ****ERROR HERE****
if (newSize <= 0) { ResetEvent(hEventNotEmpty); }
if (pValue != NULL) { *pValue = items[newSize]; }
allocator.destroy(&items[newSize]);
SetEvent(hEventNotFull);
}
catch (...) { InterlockedIncrementSizeT(&count); throw; }
}
};
static size_t InterlockedIncrementSizeT(size_t volatile *p)
{
#if _M_X64
return InterlockedIncrement64(reinterpret_cast<long long volatile *>(p));
#elif _M_IX86
return InterlockedIncrement(reinterpret_cast<long volatile *>(p));
#endif
}
static size_t InterlockedDecrementSizeT(size_t volatile *p)
{
#if _M_X64
return InterlockedDecrement64(reinterpret_cast<long long volatile *>(p));
#elif _M_IX86
return InterlockedDecrement(reinterpret_cast<long volatile *>(p));
#endif
}
Test code:
typedef rwstack<int> TTestStack;
void __cdecl testPush(void *context)
{
TTestStack::value_type v;
for (;;)
static_cast<TTestStack *>(context)->pop(&v);
}
void __cdecl testPop(void *context)
{
for (TTestStack::value_type v = 0; ; v++)
static_cast<TTestStack *>(context)->push(v);
}
int main()
{
TTestStack rw;
HANDLE hThreads[2] = {
reinterpret_cast<HANDLE>(_beginthread(&testPush, 0, &rw)),
reinterpret_cast<HANDLE>(_beginthread(&testPop, 0, &rw)),
};
const ULONG nThreads = sizeof(hThreads) / sizeof(*hThreads)
WaitForMultipleObjects(nThreads, hThreads, TRUE, INFINITE);
return 0;
}
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
您没有锁定正确的操作
这里的关键是,当您在线程 A 中禁用
hEventNotFull
事件时,您也在线程 B 中启用它。线程并发运行
因此,发生的情况如下:
线程 B 获得锁并将计数递减至 4095。您需要持有此锁,直到您决定是否启用
hEventNotFull
,但您立即释放它。操作系统暂停线程 B 一会儿。线程 A 获得锁并将 count 增加到 4096。您需要持有此锁,直到您决定是否重置
hEventNotFull
,但您立即释放它。操作系统认为线程 B 比线程 A 更重要。
因此,您最终会调用
resetEvent
线程 A 后跟线程 B 中的SetEvent
。最终结果是您将返回到线程 A 中执行,并且计数 == 4096。执行流程:
You aren't locking the correct operation
The key here is that while you are disabling the
hEventNotFull
event in Thread A, you are also enabling it in Thread B.Threads Run Concurrently
So here's what is happening:
The queue is full at 4096 items.
Thread B obtains the lock and decrements the count to 4095. You need to hold this lock until you decide whether or not to enable
hEventNotFull
, but you immediately release it. The OS pauses Thread B for a moment.Thread A obtains the lock and increments count to 4096. You need to hold this lock until you decide whether or not to reset
hEventNotFull
, but you immediately release it.The OS decides that Thread B is more important than Thread A.
So you wind up calling
resetEvent
in Thread A followed bySetEvent
in Thread B. Net result is that you'll return to execution in Thread A and count == 4096.Flow of Execution: