C++ 中原子的乐观锁定策略和订购
在阅读了 c++0x 的原子并结合非锁定队列之后,我决定尝试一下它们。
这个想法是编写一个具有乐观锁定的单个生产者、多个消费者队列。消息不需要被消费。跳过是完全可以的,只要消费者阅读时会阅读最新版本或知道所读的内容很糟糕。
在下面的代码中,我想到的策略失败了。由于数据是无序写入的,因此数据会被损坏。任何关于为什么会出现这种情况以及如何解决它的指示将不胜感激。
Linux 上的编译: g++ -std=c++0x -o code code.cpp -lpthread
谢谢, 丹尼斯
//
// This features 2 threads in which the first writes to a structure
// and the second tries to read from that with an optimistic
// locking strategy. The data is equal to the versioning so we can
// see if the data is corrupt or not.
//
// @since: 2011-10-28
// @author: Dennis Fleurbaaij <[email protected]>
//
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <stdatomic.h>
#include <sched.h>
#include <assert.h>
#include <iostream>
#include <xmmintrin.h>
struct structure_t
{
std::atomic<unsigned int> id;
unsigned int data_a;
unsigned int data_b;
char _pad[ 64 - 12 ];
};
#define NUM_STRUCTURES 2
struct structure_t structures[NUM_STRUCTURES];
std::atomic<size_t> current_version_index;
volatile bool start = false;
volatile bool run = true;
size_t const iter_count = 10000000;
/**
* Write thread
*/
void* writer(void*)
{
while(!start)
sched_yield();
size_t i;
for( i=0 ; i<iter_count ; ++i )
{
size_t index = current_version_index.load(std::memory_order_relaxed);
size_t next_index = ( current_version_index + 1 ) & NUM_STRUCTURES-1;
structures[next_index].data_a = i;
structures[next_index].data_b = i;
structures[next_index].id.store(i, std::memory_order_release);
current_version_index.store(next_index);
//std::cout << "Queued - id: " << i << ", index: " << next_index << std::endl;
//sleep(1);
}
run=false;
}
/**
* Read thread
*/
void* reader(void*)
{
while(!start)
sched_yield();
unsigned int prev_id=0;
size_t i;
while(run)
{
size_t index = current_version_index.load(std::memory_order_relaxed);
unsigned int id = structures[index].id.load(std::memory_order_acquire);
if( id > prev_id )
{
unsigned int data_a = structures[index].data_a;
unsigned int data_b = structures[index].data_b;
// Re-read the data and check optimistic lock. This should be read after
// the lines above and should not be optimized away.
//
// This is what fails after a while:
// Error in data. Index: 0, id: 24097, id2: 24097, data_a: 24099, data_b: 24099
unsigned int id2 = structures[index].id.load(std::memory_order_acquire);
if( id2 > id )
{
continue;
}
if( id != id2 ||
id != data_a ||
id != data_b )
{
std::cerr << "Error in data. Index: " << index << ", id: " << id
<< ", id2: " << id2 << ", data_a: " << data_a << ", data_b: " << data_b << std::endl;
exit(EXIT_FAILURE);
}
//std::cout << "Read. Index: " << index << ", id: " << id
// << ", data_a: " << data_a << ", data_b: " << data_b << std::endl;
prev_id = id;
}
_mm_pause();
}
}
/**
* Main
*/
int main (int argc, char *argv[])
{
assert( sizeof(structure_t) == 64 );
pthread_t write_thread, read_thread;
pthread_create(&write_thread, NULL, writer, (void*)NULL);
pthread_create(&read_thread, NULL, reader, (void*)NULL);
sleep(1);
start = 1;
void *status;
pthread_join(read_thread, &status);
pthread_join(write_thread, &status);
}
After reading up on c++0x's atomics and in combination with non locking queues I decided to have a go at playing with them.
The idea was to write a single producer, multiple consumer queue with optimistic locking. The messages do not need to be consumed. Skipping is perfectly fine as long as that if a consumer reads it reads the last version or knows that it's read was bad.
In the code below the strategy I had in mind fails. The data gets corrupted because data is written out-of-order. Any pointers on why this is, and how to fix it would be greatly appreciated.
Compilation on Linux: g++ -std=c++0x -o code code.cpp -lpthread
Thanks,
Dennis
//
// This features 2 threads in which the first writes to a structure
// and the second tries to read from that with an optimistic
// locking strategy. The data is equal to the versioning so we can
// see if the data is corrupt or not.
//
// @since: 2011-10-28
// @author: Dennis Fleurbaaij <[email protected]>
//
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <stdatomic.h>
#include <sched.h>
#include <assert.h>
#include <iostream>
#include <xmmintrin.h>
struct structure_t
{
std::atomic<unsigned int> id;
unsigned int data_a;
unsigned int data_b;
char _pad[ 64 - 12 ];
};
#define NUM_STRUCTURES 2
struct structure_t structures[NUM_STRUCTURES];
std::atomic<size_t> current_version_index;
volatile bool start = false;
volatile bool run = true;
size_t const iter_count = 10000000;
/**
* Write thread
*/
void* writer(void*)
{
while(!start)
sched_yield();
size_t i;
for( i=0 ; i<iter_count ; ++i )
{
size_t index = current_version_index.load(std::memory_order_relaxed);
size_t next_index = ( current_version_index + 1 ) & NUM_STRUCTURES-1;
structures[next_index].data_a = i;
structures[next_index].data_b = i;
structures[next_index].id.store(i, std::memory_order_release);
current_version_index.store(next_index);
//std::cout << "Queued - id: " << i << ", index: " << next_index << std::endl;
//sleep(1);
}
run=false;
}
/**
* Read thread
*/
void* reader(void*)
{
while(!start)
sched_yield();
unsigned int prev_id=0;
size_t i;
while(run)
{
size_t index = current_version_index.load(std::memory_order_relaxed);
unsigned int id = structures[index].id.load(std::memory_order_acquire);
if( id > prev_id )
{
unsigned int data_a = structures[index].data_a;
unsigned int data_b = structures[index].data_b;
// Re-read the data and check optimistic lock. This should be read after
// the lines above and should not be optimized away.
//
// This is what fails after a while:
// Error in data. Index: 0, id: 24097, id2: 24097, data_a: 24099, data_b: 24099
unsigned int id2 = structures[index].id.load(std::memory_order_acquire);
if( id2 > id )
{
continue;
}
if( id != id2 ||
id != data_a ||
id != data_b )
{
std::cerr << "Error in data. Index: " << index << ", id: " << id
<< ", id2: " << id2 << ", data_a: " << data_a << ", data_b: " << data_b << std::endl;
exit(EXIT_FAILURE);
}
//std::cout << "Read. Index: " << index << ", id: " << id
// << ", data_a: " << data_a << ", data_b: " << data_b << std::endl;
prev_id = id;
}
_mm_pause();
}
}
/**
* Main
*/
int main (int argc, char *argv[])
{
assert( sizeof(structure_t) == 64 );
pthread_t write_thread, read_thread;
pthread_create(&write_thread, NULL, writer, (void*)NULL);
pthread_create(&read_thread, NULL, reader, (void*)NULL);
sleep(1);
start = 1;
void *status;
pthread_join(read_thread, &status);
pthread_join(write_thread, &status);
}
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
也许,这是一个错误:(
读者可能会从前面的步骤中获取 current_version_index ,因此竞争条件实际上是可能的)
Perhaps, this is a bug:
(current_version_index may be taken by reader from previous steps, so race condition is actually possible)