我的第一个多线程应用程序 - Boost:Threads 的奇怪行为?
我编写了一个用于解析日志文件的多线程应用程序。它基于 http://drdobbs.com/cpp/184401518?pgno=5 的互斥缓冲区示例。
这个想法是有一个缓冲区类,它具有将项目放入缓冲区和从缓冲区中取出项目的功能。读取和写入线程的同步是使用条件来处理的。当缓冲区未满时,新项目将被写入缓冲区;当缓冲区不为空时,将从其中读取项目。否则线程将等待。
该示例使用固定数量的项目来处理,因此我将读取线程更改为在有来自文件的输入时运行,并且处理线程在有输入或缓冲区中留有项目时运行。
我的问题是,如果我使用 1 个读取线程和 1 个处理线程,则一切正常且稳定。当我添加另一个处理线程时,性能会得到很大的提升,并且即使在 10,000 次测试运行后它仍然保持稳定。
现在,当我添加另一个处理器线程(1 个读取,3 个处理)时,程序似乎会定期(但不是每次)挂起(死锁?),并且等待缓冲区填充或变空。
为什么 2 个线程做同样的事情同步稳定,而其中 3 个线程崩溃?
我是 C++ 线程的新手,所以也许你们中更有经验的编码人员知道什么会导致这种行为?
这是我的代码:
Buffer Class:
#include "StringBuffer.h"
void StringBuffer::put(string str)
{
scoped_lock lock(mutex);
if (full == BUF_SIZE)
{
{
//boost::mutex::scoped_lock lock(io_mutex);
//std::cout << "Buffer is full. Waiting..." << std::endl;
}
while (full == BUF_SIZE)
cond.wait(lock);
}
str_buffer[p] = str;
p = (p+1) % BUF_SIZE;
++full;
cond.notify_one();
}
string StringBuffer::get()
{
scoped_lock lk(mutex);
if (full == 0)
{
{
//boost::mutex::scoped_lock lock(io_mutex);
//std::cout << "Buffer is empty. Waiting..." << std::endl;
}
while (full == 0)
cond.wait(lk);
}
string test = str_buffer[c];
c = (c+1) % BUF_SIZE;
--full;
cond.notify_one();
return test;
}
这是主要的:
Parser p;
StringBuffer buf;
Report report;
string transfer;
ifstream input;
vector <boost::regex> regs;
int proc_count = 0;
int push_count = 0;
bool pusher_done = false;
// Show filter configuration and init report by dimensioning counter vectors
void setup_report() {
for (int k = 0; k < p.filters(); k++) {
std::cout << "SID(NUM):" << k << " Name(TXT):\"" << p.name_at(k) << "\"" << " Filter(REG):\"" << p.filter_at(k) << "\"" << endl;
regs.push_back(boost::regex(p.filter_at(k)));
report.hits_filters.push_back(0);
report.names.push_back(p.name_at(k));
report.filters.push_back(p.filter_at(k));
}
}
// Read strings from sourcefiles and put them into buffer
void pusher() {
// as long as another string could be red, ...
while (input) {
// put it into buffer
buf.put(transfer);
// and get another string from source file
getline(input, transfer);
push_count++;
}
pusher_done = true;
}
// Get strings from buffer and check RegEx filters. Pass matches to report
void processor()
{
while (!pusher_done || buf.get_rest()) {
string n = buf.get();
for (unsigned sid = 0; sid < regs.size(); sid++) {
if (boost::regex_search(n, regs[sid])) report.report_hit(sid);
}
boost::mutex::scoped_lock lk(buf.count_mutex);
{
proc_count++;
}
}
}
int main(int argc, const char* argv[], char* envp[])
{
if (argc == 3)
{
// first add sourcefile from argv[1] filepath, ...
p.addSource(argv[1]);
std::cout << "Source File: *** Ok\n";
// then read configuration from argv[2] filepath, ...
p.readPipes(envp, argv[2]);
std::cout << "Configuration: *** Ok\n\n";
// and setup the Report Object.
setup_report();
// For all sourcefiles that have been parsed, ...
for (int i = 0; i < p.sources(); i++) {
input.close();
input.clear();
// open the sourcefile in a filestream.
input.open(p.source_at(i).c_str());
// check if file exist, otherwise throw error and exit
if (!input)
{
std::cout << "\nError! File not found: " << p.source_at(i);
exit(1);
}
// get start time
std::cout << "\n- started: ";
ptime start(second_clock::local_time());
cout << start << endl;
// read a first string into transfer to get the loops going
getline(input, transfer);
// create threads and pass a reference to functions
boost::thread push1(&pusher);
boost::thread proc1(&processor);
boost::thread proc2(&processor);
// start all the threads and wait for them to complete.
push1.join();
proc1.join();
proc2.join();
// calculate and output runtime and lines per second
ptime end(second_clock::local_time());
time_duration runtime = end - start;
std::cout << "- finished: " << ptime(second_clock::local_time()) << endl;
cout << "- processed lines: " << push_count << endl;
cout << "- runtime: " << to_simple_string(runtime) << endl;
float processed = push_count;
float lines_per_second = processed/runtime.total_seconds();
cout << "- lines per second: " << lines_per_second << endl;
// write report to file
report.create_filereport(); // after all threads finished write reported data to file
cout << "\nReport saved as: ./report.log\n\nBye!" << endl;
}
}
else std::cout << "Usage: \"./Speed-Extract [source][config]\"\n\n";
return 0;
}
编辑1:
非常感谢您的帮助。通过在输出中添加一些计数器和线程 ID,我发现了问题所在:
我注意到有几个线程可能仍在等待缓冲区填充。
我的处理线程在尚有尚未读取的新源字符串或缓冲区不为空时运行。这不好。
假设我有 2 个线程等待缓冲区填充。一旦读者读到新行(可能是最后几行),就会有 6 个其他线程尝试获取该行并锁定该项目,因此 2 个等待线程可能甚至没有机会尝试解锁它。
一旦他们检查一条线被另一个线程占用,他们就会继续等待。读取线程在达到 eof 时不会通知它们,然后就会停止。两个等待线程永远等待。
我的读取函数还必须通知所有线程它已达到 eof,因此只有在缓冲区为空且文件不是 EOF 时,线程才应保持等待。
I wrote a multithreading application for parsing logfiles. It's based on the mutex buffer example from http://drdobbs.com/cpp/184401518?pgno=5.
The idea is having a buffer class that has functions to put items into the buffer and take items out of it. The Synchronization of the reading and the writing threads is handled using conditions. While buffer is not full, new items are written into the buffer and while it's not empty, items are being read from it. Otherwise threads will wait.
The example uses a fixed amount of items to process so I changed the reading thread to run while there is input from file and the processing threads run while there's input or while there are items left in buffer.
My Problem is if I use 1 reading and 1 processing thread, everything is ok and stable. When I add another processing thread there's a big boost in performance and it's still stable even after 10.000 testruns.
Now when I add another processor thread (1 read, 3 processing), programm seems to hang (deadlock?) periodically (but not every time) and is either waiting for buffer to fill or to get empty.
How come 2 threads doing the same thing synchronize stable and 3 of them crash?
I'm a newb to C++ threading so perhaps any of you more experienced coders know what can cause this behavior?
Here's my code:
Buffer Class:
#include "StringBuffer.h"
void StringBuffer::put(string str)
{
scoped_lock lock(mutex);
if (full == BUF_SIZE)
{
{
//boost::mutex::scoped_lock lock(io_mutex);
//std::cout << "Buffer is full. Waiting..." << std::endl;
}
while (full == BUF_SIZE)
cond.wait(lock);
}
str_buffer[p] = str;
p = (p+1) % BUF_SIZE;
++full;
cond.notify_one();
}
string StringBuffer::get()
{
scoped_lock lk(mutex);
if (full == 0)
{
{
//boost::mutex::scoped_lock lock(io_mutex);
//std::cout << "Buffer is empty. Waiting..." << std::endl;
}
while (full == 0)
cond.wait(lk);
}
string test = str_buffer[c];
c = (c+1) % BUF_SIZE;
--full;
cond.notify_one();
return test;
}
and here's the main:
Parser p;
StringBuffer buf;
Report report;
string transfer;
ifstream input;
vector <boost::regex> regs;
int proc_count = 0;
int push_count = 0;
bool pusher_done = false;
// Show filter configuration and init report by dimensioning counter vectors
void setup_report() {
for (int k = 0; k < p.filters(); k++) {
std::cout << "SID(NUM):" << k << " Name(TXT):\"" << p.name_at(k) << "\"" << " Filter(REG):\"" << p.filter_at(k) << "\"" << endl;
regs.push_back(boost::regex(p.filter_at(k)));
report.hits_filters.push_back(0);
report.names.push_back(p.name_at(k));
report.filters.push_back(p.filter_at(k));
}
}
// Read strings from sourcefiles and put them into buffer
void pusher() {
// as long as another string could be red, ...
while (input) {
// put it into buffer
buf.put(transfer);
// and get another string from source file
getline(input, transfer);
push_count++;
}
pusher_done = true;
}
// Get strings from buffer and check RegEx filters. Pass matches to report
void processor()
{
while (!pusher_done || buf.get_rest()) {
string n = buf.get();
for (unsigned sid = 0; sid < regs.size(); sid++) {
if (boost::regex_search(n, regs[sid])) report.report_hit(sid);
}
boost::mutex::scoped_lock lk(buf.count_mutex);
{
proc_count++;
}
}
}
int main(int argc, const char* argv[], char* envp[])
{
if (argc == 3)
{
// first add sourcefile from argv[1] filepath, ...
p.addSource(argv[1]);
std::cout << "Source File: *** Ok\n";
// then read configuration from argv[2] filepath, ...
p.readPipes(envp, argv[2]);
std::cout << "Configuration: *** Ok\n\n";
// and setup the Report Object.
setup_report();
// For all sourcefiles that have been parsed, ...
for (int i = 0; i < p.sources(); i++) {
input.close();
input.clear();
// open the sourcefile in a filestream.
input.open(p.source_at(i).c_str());
// check if file exist, otherwise throw error and exit
if (!input)
{
std::cout << "\nError! File not found: " << p.source_at(i);
exit(1);
}
// get start time
std::cout << "\n- started: ";
ptime start(second_clock::local_time());
cout << start << endl;
// read a first string into transfer to get the loops going
getline(input, transfer);
// create threads and pass a reference to functions
boost::thread push1(&pusher);
boost::thread proc1(&processor);
boost::thread proc2(&processor);
// start all the threads and wait for them to complete.
push1.join();
proc1.join();
proc2.join();
// calculate and output runtime and lines per second
ptime end(second_clock::local_time());
time_duration runtime = end - start;
std::cout << "- finished: " << ptime(second_clock::local_time()) << endl;
cout << "- processed lines: " << push_count << endl;
cout << "- runtime: " << to_simple_string(runtime) << endl;
float processed = push_count;
float lines_per_second = processed/runtime.total_seconds();
cout << "- lines per second: " << lines_per_second << endl;
// write report to file
report.create_filereport(); // after all threads finished write reported data to file
cout << "\nReport saved as: ./report.log\n\nBye!" << endl;
}
}
else std::cout << "Usage: \"./Speed-Extract [source][config]\"\n\n";
return 0;
}
Edit 1:
Thanks alot for your help. By adding some counters and thread id's to output I figured out what is the problem:
I noticed that several threads could remain waiting for buffer to fill.
My processing threads run while there are new sourcestrings left that haven't been read yet OR while buffer is not empty. This is not good.
Say I have 2 threads waiting for buffer to fill. As soon as reader reads a new line (maybe the last few lines) theres 6 other threads that try to get this line(s) and lock the item so the 2 waiting threads maybe don't even have a chance to try to unlock it.
As soon as they check a line is taken by another thread and they keep waiting. The Reading thread doesn't notify them when it reaches eof and then it stops. The both waiting threads wait forever.
My Reading function aditionally has to notify all threads that it reached eof, so threads shall only remain waiting if buffer is empty and file is not EOF.
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(2)
作为@Martin,我在您的代码中看不到任何明显的问题。我唯一的想法是,您可以尝试使用单独的条件变量来写入缓冲区并从中读取。现在,每次线程完成获取项目时,在
get
方法中等待的其他线程也可能会收到信号。请考虑以下事项。缓冲区已满,因此编写器等待
cond
信号。现在,读取器清空队列,而写入器甚至没有收到任何信号。这是可能的,因为它们使用相同的条件变量,并且读者越多,这种可能性就越大。每次读取器从缓冲区中删除一个项目时,它都会调用notify_one
。这可以唤醒作者,也可以唤醒读者。假设所有通知最终都会唤醒读者。作者永远不会被释放。最后,所有线程都将等待信号,并且出现死锁。如果这是正确的,那么您有两种可能的修复方法:
notify_all
而不是notify_one
以确保读者每次删除项目时都有机会。As @Martin I can not see any obvious problem in your code. The only idea I have is that you could try to use separate condition variables for writing to the buffer and reading from it. As it is now, each time a thread is finished getting an item, other threads that were waiting in the
get
method are potentially signaled as well.Consider the following. The buffer is full, so the writer waits for a the
cond
signal. Now, the readers empty the queue, without the writer being signaled even once. This is possible, since they use the same condition variable, and becomes more likely the more readers there are. Every time a reader removes an item from the buffer, it callsnotify_one
. This can wake up the writer, but it can also wake up a reader. Suppose by chance all notifications end up waking up readers. The writer will never be released. In the end, all threads will wait on a signal and you have a deadlock.If that is correct, then you have two possible fixes:
notify_all
instead ofnotify_one
to make sure the reader gets a chance every time an item is removed.我实际上看不出有什么问题。
但要记住的一件事是,仅仅因为线程通过信号从条件变量中释放并不意味着它开始运行。
一旦释放,它必须在继续之前获取互斥锁,但每次线程被调度运行时,其他人可能已经锁定了互斥锁(考虑到这些循环有多紧,这并不奇怪),因此它暂停等待下一个调度槽。这可能就是你们的冲突所在。
问题在于将打印语句放入代码中不会有帮助,因为打印语句会影响时间(它们很昂贵),因此您将得到不同的行为。像记录每个线程操作的计数这样便宜的事情可能足够便宜,因此它不会影响时间,但可以帮助您确定问题。 注意:仅在完成后才打印结果。
I can't actually see a problem.
But one thing to remember is that just because a thread is released from the condition variable with a signal does not mean it begins running.
Once released it must acquire the mutex before continuing, but every time the thread is scheduled to run somebody else may potentially have locked the mutex (given how tight those loops are that would not be a surprise) thus it suspends waiting for its next scheduling slot. This may be where your conflict is.
The trouble is putting print statements into the code is not going to help as the print statements affect the timing (they are expensive) and thus you will get a different behavior. Something cheap like keeping a count of what each thread action may be cheap enough so that it does not affect the timing but helps you determine the problem. Note: print the results only after you finish.