Chrono 库多线程时间推导限制?
我正在尝试通过多线程设置中的时间推导来解决问题。我有 3 个线程,全部固定到不同的核心。前两个线程 (reader_threads.cc) 在 run()
函数内的无限 while 循环中运行。他们完成执行并将当前时间窗口发送到第三个线程。
当前时间窗口是根据 chrono time
/ Ti 的值计算的。
第三个线程以自己的速度运行,并且仅在标志已升起时检查请求,该请求也是通过发送给第三个线程的消息
。
如果一个纪元至少为 20000us,我就能够在同一纪元中获得所有三个线程的所需行为。在结果中,您可以找到更多信息。
读取器线程
#include <cstdio>
#include <cstdlib>
#include <cstring>
#include <iostream>
#include <chrono>
#include <atomic>
#include <mutex>
#include "control_thread.h"
#define INTERNAL_THREAD
#if defined INTERNAL_THREAD
#include <thread>
#include <pthread.h>
#else
#endif
using namespace std;
atomic<bool> thread_active[2];
atomic<bool> go;
pthread_barrier_t barrier;
template <typename T>
void send(Message volatile * m, unsigned int epoch, bool flag) {
for (int i = 0 ; i < sizeof(T); i++){
m->epoch = epoch;
m->flag = flag;
}
}
ControlThread * ct;
// Main run for threads
void run(unsigned int threadID){
// Put message into incoming buffer
Message volatile * m1 = &(ct->incoming_requests[threadID - 1]);
thread_active[threadID] = true;
std::atomic<bool> flag;
// this thread is done initializing stuff
thread_active[threadID] = true;
while (!go);
while(true){
using namespace std::chrono;
// Get current time with precision of microseconds
auto now = time_point_cast<microseconds>(steady_clock::now());
// sys_microseconds is type time_point<system_clock, microseconds>
using sys_microseconds = decltype(now);
// Convert time_point to signed integral type
auto duration = now.time_since_epoch();
// Convert signed integral type to time_point
sys_microseconds dt{microseconds{duration}};
// test
if (dt != now){
std::cout << "Failure." << std::endl;
}else{
// std::cout << "Success." << std::endl;
}
auto epoch = duration / Ti;
pthread_barrier_wait(&barrier);
flag = true;
// send current time to the control thread
send<int>(m1, epoch, flag);
auto current_position = duration % Ti;
std::chrono::duration<double, micro> multi_thread_sleep = chrono::microseconds(Ti) - chrono::microseconds(current_position);
if(multi_thread_sleep > chrono::microseconds::zero()){
this_thread::sleep_for(multi_thread_sleep);
}
}
}
int threads_num = 3;
void server() {
// Don't start control thread until reader threds finish init
for (int i=1; i < threads_num; i++){
while (!thread_active[i]);
}
go = true;
while (go) {
for (int i = 0; i < threads_num; i++) {
ct->current_requests(i);
}
// Arbitrary sleep to ensure that locking is accurate
std::this_thread::sleep_for(50us);
}
}
class Thread {
public:
#if defined INTERNAL_THREAD
thread execution_handle;
#endif
unsigned int id;
Thread(unsigned int i) : id(i) {}
};
void init(){
ct = new ControlThread();
}
int main (int argc, char * argv[]){
Thread * r[4];
pthread_barrier_init(&barrier, NULL, 2);
init();
/* start threads
*================*/
for (unsigned int i = 0; i < threads_num; i++) {
r[i] = new Thread(i);
#if defined INTERNAL_THREAD
if(i==0){
r[0]->execution_handle = std::thread([] {server();});
}else if(i == 1){
r[i]->execution_handle = std::thread([i] {run(i);});
}else if(i == 2){
r[i]->execution_handle = std::thread([i] {run(i);});
}
/* pin to core i */
cpu_set_t cpuset;
CPU_ZERO(&cpuset);
CPU_SET(i, &cpuset);
int rc = pthread_setaffinity_np(r[i]->execution_handle.native_handle(), sizeof(cpuset), &cpuset);
#endif
}
// wait for threads to end
for (unsigned int i = 0; i < threads_num + 1; i++) {
#if defined INTERNAL_THREAD
r[i]->execution_handle.join();
#endif
}
pthread_barrier_destroy(&barrier);
return 0;
}
控制线程
#ifndef __CONTROL_THEAD_H__
#define __CONTROL_THEAD_H__
// Global vars
const auto Ti = std::chrono::microseconds(15000);
std::mutex m;
int count;
class Message{
public:
std::atomic<bool> flag;
unsigned long epoch;
};
class ControlThread {
public:
/* rw individual threads */
Message volatile incoming_requests[4];
void current_requests(unsigned long current_thread) {
using namespace std::chrono;
auto now = time_point_cast<microseconds>(steady_clock::now());
// sys_milliseconds is type time_point<system_clock, milliseconds>
using sys_microseconds = decltype(now);
// Convert time_point to signed integral type
auto time = now.time_since_epoch();
// Convert signed integral type to time_point
sys_microseconds dt{microseconds{time}};
// test
if (dt != now){
std::cout << "Failure." << std::endl;
}else{
// std::cout << "Success." << std::endl;
}
long contol_thread_epoch = time / Ti;
// Only check request when flag is raised
if(incoming_requests[current_thread].flag){
m.lock();
incoming_requests[current_thread].flag = false;
m.unlock();
// If reader thread epoch and control thread matches
if(incoming_requests[current_thread].epoch == contol_thread_epoch){
// printf("Successful desired behaviour\n");
}else{
count++;
if(count > 0){
printf("Missed %d\n", count);
}
}
}
}
};
#endif
RUN
g++ -std=c++2a -pthread -lrt -lm -lcrypt reader_threads.cc -o run
sudo ./run
结果
以下错过的纪元是一次循环迭代(单个 Ti)等于 1000us 。此外,通过增加 Ti,跳过的 epoch 数量也会减少。最后,如果 Ti 设置为 20000 us ,则不会检测到跳过的纪元。有谁知道我在转换或线程之间的通信中是否犯了错误?如果 epoch 为 5000us,为什么线程不同步?
Missed 1
Missed 2
Missed 3
Missed 4
Missed 5
Missed 6
Missed 7
Missed 8
Missed 9
Missed 10
Missed 11
Missed 12
Missed 13
Missed 14
Missed 15
Missed 16
I am trying to solve the problem with a time derivation in a multithreaded setup. I have 3 threads, all pinned to different cores. The first two threads (reader_threads.cc) run in the infinite while loop inside the run()
function. They finish their execution and send the current time window they are into the third thread.
The current time window is calculated based on the value from chrono time
/ Ti
The third thread is running at its own pace, and it's checking only the request when the flag has been raised, which is also sent via Message
to the third thread.
I was able to get the desired behavior of all three threads in the same epoch if one epoch is at least 20000us. In the results, you can find more info.
Reader threads
#include <cstdio>
#include <cstdlib>
#include <cstring>
#include <iostream>
#include <chrono>
#include <atomic>
#include <mutex>
#include "control_thread.h"
#define INTERNAL_THREAD
#if defined INTERNAL_THREAD
#include <thread>
#include <pthread.h>
#else
#endif
using namespace std;
atomic<bool> thread_active[2];
atomic<bool> go;
pthread_barrier_t barrier;
template <typename T>
void send(Message volatile * m, unsigned int epoch, bool flag) {
for (int i = 0 ; i < sizeof(T); i++){
m->epoch = epoch;
m->flag = flag;
}
}
ControlThread * ct;
// Main run for threads
void run(unsigned int threadID){
// Put message into incoming buffer
Message volatile * m1 = &(ct->incoming_requests[threadID - 1]);
thread_active[threadID] = true;
std::atomic<bool> flag;
// this thread is done initializing stuff
thread_active[threadID] = true;
while (!go);
while(true){
using namespace std::chrono;
// Get current time with precision of microseconds
auto now = time_point_cast<microseconds>(steady_clock::now());
// sys_microseconds is type time_point<system_clock, microseconds>
using sys_microseconds = decltype(now);
// Convert time_point to signed integral type
auto duration = now.time_since_epoch();
// Convert signed integral type to time_point
sys_microseconds dt{microseconds{duration}};
// test
if (dt != now){
std::cout << "Failure." << std::endl;
}else{
// std::cout << "Success." << std::endl;
}
auto epoch = duration / Ti;
pthread_barrier_wait(&barrier);
flag = true;
// send current time to the control thread
send<int>(m1, epoch, flag);
auto current_position = duration % Ti;
std::chrono::duration<double, micro> multi_thread_sleep = chrono::microseconds(Ti) - chrono::microseconds(current_position);
if(multi_thread_sleep > chrono::microseconds::zero()){
this_thread::sleep_for(multi_thread_sleep);
}
}
}
int threads_num = 3;
void server() {
// Don't start control thread until reader threds finish init
for (int i=1; i < threads_num; i++){
while (!thread_active[i]);
}
go = true;
while (go) {
for (int i = 0; i < threads_num; i++) {
ct->current_requests(i);
}
// Arbitrary sleep to ensure that locking is accurate
std::this_thread::sleep_for(50us);
}
}
class Thread {
public:
#if defined INTERNAL_THREAD
thread execution_handle;
#endif
unsigned int id;
Thread(unsigned int i) : id(i) {}
};
void init(){
ct = new ControlThread();
}
int main (int argc, char * argv[]){
Thread * r[4];
pthread_barrier_init(&barrier, NULL, 2);
init();
/* start threads
*================*/
for (unsigned int i = 0; i < threads_num; i++) {
r[i] = new Thread(i);
#if defined INTERNAL_THREAD
if(i==0){
r[0]->execution_handle = std::thread([] {server();});
}else if(i == 1){
r[i]->execution_handle = std::thread([i] {run(i);});
}else if(i == 2){
r[i]->execution_handle = std::thread([i] {run(i);});
}
/* pin to core i */
cpu_set_t cpuset;
CPU_ZERO(&cpuset);
CPU_SET(i, &cpuset);
int rc = pthread_setaffinity_np(r[i]->execution_handle.native_handle(), sizeof(cpuset), &cpuset);
#endif
}
// wait for threads to end
for (unsigned int i = 0; i < threads_num + 1; i++) {
#if defined INTERNAL_THREAD
r[i]->execution_handle.join();
#endif
}
pthread_barrier_destroy(&barrier);
return 0;
}
Control Thread
#ifndef __CONTROL_THEAD_H__
#define __CONTROL_THEAD_H__
// Global vars
const auto Ti = std::chrono::microseconds(15000);
std::mutex m;
int count;
class Message{
public:
std::atomic<bool> flag;
unsigned long epoch;
};
class ControlThread {
public:
/* rw individual threads */
Message volatile incoming_requests[4];
void current_requests(unsigned long current_thread) {
using namespace std::chrono;
auto now = time_point_cast<microseconds>(steady_clock::now());
// sys_milliseconds is type time_point<system_clock, milliseconds>
using sys_microseconds = decltype(now);
// Convert time_point to signed integral type
auto time = now.time_since_epoch();
// Convert signed integral type to time_point
sys_microseconds dt{microseconds{time}};
// test
if (dt != now){
std::cout << "Failure." << std::endl;
}else{
// std::cout << "Success." << std::endl;
}
long contol_thread_epoch = time / Ti;
// Only check request when flag is raised
if(incoming_requests[current_thread].flag){
m.lock();
incoming_requests[current_thread].flag = false;
m.unlock();
// If reader thread epoch and control thread matches
if(incoming_requests[current_thread].epoch == contol_thread_epoch){
// printf("Successful desired behaviour\n");
}else{
count++;
if(count > 0){
printf("Missed %d\n", count);
}
}
}
}
};
#endif
RUN
g++ -std=c++2a -pthread -lrt -lm -lcrypt reader_threads.cc -o run
sudo ./run
Results
The following missed epochs are with one loop iteration (single Ti) equal to 1000us. Also, by increasing Ti, the less number of epochs have been skipped. Finally, if Ti is set to the 20000 us , no skipped epochs are detected. Does anyone have an idea whether I am making a mistake in casting or in communication between threads? Why the threads are not in sync if epoch is i.e. 5000us?
Missed 1
Missed 2
Missed 3
Missed 4
Missed 5
Missed 6
Missed 7
Missed 8
Missed 9
Missed 10
Missed 11
Missed 12
Missed 13
Missed 14
Missed 15
Missed 16
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论