C /C++无锁(或非阻塞)环形缓冲区覆盖最旧的数据?
我正在尝试找到一种方法,以无锁或非阻塞的方式为单个消费者/单个消费者创建环形缓冲区,该缓冲区将覆盖缓冲区中最旧的数据。我读过很多无锁算法,当缓冲区已满时“返回 false”时,这些算法就会起作用——即,不添加;但我什至找不到伪代码来说明当您需要覆盖最旧的数据时如何执行此操作。
我正在使用GCC 4.1.2(工作中的限制,我无法升级版本...)并且我有Boost库,过去我制作了自己的Atomic< T>变量类型非常接近即将到来的规范(它并不完美,但它是线程安全的并且可以满足我的需要)。
当我思考时,我认为使用这些原子确实应该解决这个问题。关于我的想法的一些粗略的伪代码:
template< typename T , unsigned int Size>
class RingBuffer {
private:
Atomic<unsigned int> readIndex;
Atomic<unsigned int> writeIndex;
enum Capacity { size = Size };
T* buf;
unsigned int getNextIndex(unsigned int i)
{
return (i + 1 ) % size;
}
public:
RingBuffer() { //create array of size, set readIndex = writeIndex = 0 }
~RingBuffer() { //delete data }
void produce(const T& t)
{
if(writeIndex == getNextIndex(readIndex)) //1
{
readIndex = getNextIndex(readIndex); //2
}
buf[writeIndex] = t;
writeIndex = getNextIndex(writeIndex); //3
}
bool consume(T& t)
{
if(readIndex == writeIndex) //4
return false;
t = buf[readIndex];
readIndex = getNexIndex(readIndex); //5
return true;
}
};
据我所知,这里不存在死锁情况,因此我们可以避免这种情况(如果我上面的实现即使在伪代码级别上也是错误的,请提出建设性批评总是受到赞赏)。 然而,我能找到的最大竞争条件是:
假设缓冲区已满。即writeIndex +1 = readIndex; (1) 发生,就像调用consume 一样。这是真的 (4) 为 false,因此我们从缓冲区中读取 (5) 发生,并且 readIndex 提前一位(因此实际上缓冲区中有空间 (2) 发生,再次推进 readIndex,从而丢失该值。
基本上,这是一个经典问题,作者必须修改读者,从而导致竞争条件。如果每次访问它时都没有实际阻止整个列表,我想不出一种方法来防止这种情况发生。我缺少什么?
I'm trying to find a way to make a Lock Free OR Non-blocking way to make a Ring Buffer for single consumer / single consumer that will over-write the oldest data int the buffer. I've read a lot of lock-free algorithms that work when you "return false" if the buffer is full--ie, don't add; but I can't find even pseudo-code that talks about how to do it when you need to overwrite the oldest data.
I am using GCC 4.1.2 (restriction at work, i can't upgrade the version...) and I have the Boost libraries, and in the past I made my own Atomic< T > variable type that follows pretty closely to the upcomming specification (its not perfect, but it is thread-safe and does what i need).
When I thought about it, I figured using these atomics should really take care of the problem. some rough psuedo-code as to what i was thinking:
template< typename T , unsigned int Size>
class RingBuffer {
private:
Atomic<unsigned int> readIndex;
Atomic<unsigned int> writeIndex;
enum Capacity { size = Size };
T* buf;
unsigned int getNextIndex(unsigned int i)
{
return (i + 1 ) % size;
}
public:
RingBuffer() { //create array of size, set readIndex = writeIndex = 0 }
~RingBuffer() { //delete data }
void produce(const T& t)
{
if(writeIndex == getNextIndex(readIndex)) //1
{
readIndex = getNextIndex(readIndex); //2
}
buf[writeIndex] = t;
writeIndex = getNextIndex(writeIndex); //3
}
bool consume(T& t)
{
if(readIndex == writeIndex) //4
return false;
t = buf[readIndex];
readIndex = getNexIndex(readIndex); //5
return true;
}
};
As far as I can tell, there is no deadlock situations here, so we're safe from that (If my implementation above is wrong even on its pseudo-code leve, constructive criticism is always appreciated).
However,the BIG race condition I can find is:
lets assume the buffer is full. that is, writeIndex +1 = readIndex;
(1) occurs, just as consume is being called. and is true
(4) is false, so we move to read from the buffer
(5) occurs, and the readIndex is advanced one (so there is, in fact, space in the buffer
(2) occurs, advancing readIndex AGAIN, thus LOSING the value.
Basically, its a classic problem of the writter must modify the reader, causing a race condition. Without actually blocking the entire list everytime I access it, I can't think of a way to prevent this from happening. What am I missing??
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(3)
很多:
do {
复制值;使用修改序列号等检查副本是否完整性。} while (
corrupt)
} while (
corrupt)
是不够的 - 您还需要使用 CAS 样式的循环来影响索引增量(尽管我确实假设您知道这一点,因为您说您已经对此进行了广泛的阅读)但是,让我们将其记为低于您的伪代码级别,并考虑您的明确问题:
t
之前,在consume()
上正确采样/复制了 readIndex,那么如果 CAS 指令已经通过制片人。无需进行通常的重新采样和重试 CAS,只需继续即可。Lots:
do {
copy the value out; check copy has integrity using modification sequence num etc.} while (
corrupt)
But, let's write that off as being below your pseudo-code level, and consider your explicit question:
consume()
- before the (possibly corrupt)t
was copied - then the CAS instruction will fail if it's already been incremented by the producer. Instead of the usual resample and retry CAS, just continue.这是我最近创建的原子变量循环缓冲区的代码。我已将其修改为“覆盖”数据而不是返回 false。
免责声明 - 尚未经过生产级测试。
Here is a code of circular buffer on atomic variables I have recently created. I have modified it to "override" data instead of returning false.
Disclaimer - it is not production grade tested yet.