boost::threads 程序会导致大量竞争条件

发布于 2024-12-13 06:27:20 字数 3077 浏览 1 评论 0原文

我有一个程序,我使用 boost::threads 进行多线程处理。不幸的是 drd (valgrind --tool=drd ./my_program) 报告了很多大约 10000 的问题。

我不确定我是否误解了 boost 线程的某些内容。我花了几个小时试图找出我的错误,但没有取得进一步的进展,因此任何帮助将不胜感激。

我尝试管道化某些过滤器,并希望能够通过使用 run 调用最后一个过滤器来运行它们。然后,这个过滤器应该首先调用他所依赖的所有前驱过滤器,最后调用他的 processQueue() 方法。 我现在希望能够在其赢得的线程中调用前体过滤器,这样如果图形作为并行路径,我就可以获得加速。因此我添加了线程组,以便每个前驱过滤器都在自己的线程中执行。但不幸的是,我遇到了很多竞争条件,但我不确定它们是从哪里产生的。 我希望现在我想要实现的目标更加清晰。

更新

我已将代码更新为更简单的代码,但问题仍然出现。我认为问题与线程生成有关。

更新2

我认为主要原因是valgrind的误报率非常高。我对此提出了一个新问题。 请参阅此处

更新 3

大部分内容当我使用 valgrind 3.6.1 而不是 3.7.0 或 3.8.0 时,可以避免错误。

这是 drd 的一份报告:

==29905== Conflicting load by thread 1 at 0xb0081000 size 8
==29905==    at 0x25A6C2: pthread_join (in /usr/lib/system/libsystem_c.dylib)
==29905==    by 0x2BEC0: boost::thread::join() (in /usr/local/lib/libboost_thread.dylib)
==29905==    by 0x100006641: Filter::run() (in ./playgroudThreads)
==29905==    by 0x100001013: main (in ./playgroudThreads)
==29905== Allocation context: unknown.
==29905== Other segment start (thread 2)
==29905==    at 0x2A7B68: thread_start (in /usr/lib/system/libsystem_c.dylib)
==29905== Other segment end (thread 2)
==29905==    at 0x3E667A: mach_msg_trap (in /usr/lib/system/libsystem_kernel.dylib)
==29905==    by 0x3DED38: semaphore_create (in /usr/lib/system/libsystem_kernel.dylib)
==29905==    by 0x2A50F7: new_sem_from_pool (in /usr/lib/system/libsystem_c.dylib)
==29905==    by 0x2A6199: _pthread_exit (in /usr/lib/system/libsystem_c.dylib)
==29905==    by 0x2A48C9: _pthread_start (in /usr/lib/system/libsystem_c.dylib)
==29905==    by 0x2A7B74: thread_start (in /usr/lib/system/libsystem_c.dylib)

这是我的示例代码:

#include <iostream>
#include <vector>
#include <sys/time.h>
#include <boost/thread.hpp>
#include <boost/bind.hpp>

class Filter
{
    public:

        Filter(int n) :
                n_(n), precursor_(0)
        {
        }

        ~Filter()
        {
        }

        void connect(Filter& f)
        {
            precursor_ = &f;
        }

        void run()
        {

            if (!isCalculationDone_) {
                if (precursor_) {
                    boost::thread thread(&Filter::run, precursor_);

                    thread.join();
                }
                this->processQueue(2);
                isCalculationDone_ = true;

            }

        }

        void processQueue(unsigned N)
        {
            //do some calculations

        }

    public:
        int n_;
        Filter* precursor_;

        bool isCalculationDone_;

};

int main(int argc, char* argv[])
{

    Filter* f1 = new Filter(1);
    Filter* f2 = new Filter(2);

    f2->connect(*f1);

    f2->run();

    std::cerr << "main: done" << std::endl;
    delete f2;
    delete f1;
    return 0;

}
;

I have a program where I use boost::threads for multi threading. Unfortunately drd (valgrind --tool=drd ./my_program) reports lot of problems about 10000.

I am not sure if I misunderstood something of boost thread. I try to find out my error for hours but did not get much further, therefore any help would be appreciated.

I try to pipeline certain filters and want to be able to run them by calling the last filter with run. This filter should then first call all his precursor filter which he depend on and in the end call his processQueue() methode.
I want now to be able to call precursor filters in their won thread, so that I get a speed up if the graph as parallel paths. Therefore I added the threadgroup, so that each precursor filter is executed in his own thread. But unfortunately I get a lot of race conditions where I am not sure where they result from.
I hope now it is more clear what I want to achieve.

Update

I have updated the code to a even simpler code where the problem still occurs. I think the problem is somewhere related to the thread generation.

Update 2

I think the main reason for these is a very high false positive rate of valgrind. I have opened a new question about this. See here

Update 3

most of the errors could be avoided when I use valgrind 3.6.1 instead of 3.7.0 or 3.8.0.

Here one report of drd:

==29905== Conflicting load by thread 1 at 0xb0081000 size 8
==29905==    at 0x25A6C2: pthread_join (in /usr/lib/system/libsystem_c.dylib)
==29905==    by 0x2BEC0: boost::thread::join() (in /usr/local/lib/libboost_thread.dylib)
==29905==    by 0x100006641: Filter::run() (in ./playgroudThreads)
==29905==    by 0x100001013: main (in ./playgroudThreads)
==29905== Allocation context: unknown.
==29905== Other segment start (thread 2)
==29905==    at 0x2A7B68: thread_start (in /usr/lib/system/libsystem_c.dylib)
==29905== Other segment end (thread 2)
==29905==    at 0x3E667A: mach_msg_trap (in /usr/lib/system/libsystem_kernel.dylib)
==29905==    by 0x3DED38: semaphore_create (in /usr/lib/system/libsystem_kernel.dylib)
==29905==    by 0x2A50F7: new_sem_from_pool (in /usr/lib/system/libsystem_c.dylib)
==29905==    by 0x2A6199: _pthread_exit (in /usr/lib/system/libsystem_c.dylib)
==29905==    by 0x2A48C9: _pthread_start (in /usr/lib/system/libsystem_c.dylib)
==29905==    by 0x2A7B74: thread_start (in /usr/lib/system/libsystem_c.dylib)

And here my example code:

#include <iostream>
#include <vector>
#include <sys/time.h>
#include <boost/thread.hpp>
#include <boost/bind.hpp>

class Filter
{
    public:

        Filter(int n) :
                n_(n), precursor_(0)
        {
        }

        ~Filter()
        {
        }

        void connect(Filter& f)
        {
            precursor_ = &f;
        }

        void run()
        {

            if (!isCalculationDone_) {
                if (precursor_) {
                    boost::thread thread(&Filter::run, precursor_);

                    thread.join();
                }
                this->processQueue(2);
                isCalculationDone_ = true;

            }

        }

        void processQueue(unsigned N)
        {
            //do some calculations

        }

    public:
        int n_;
        Filter* precursor_;

        bool isCalculationDone_;

};

int main(int argc, char* argv[])
{

    Filter* f1 = new Filter(1);
    Filter* f2 = new Filter(2);

    f2->connect(*f1);

    f2->run();

    std::cerr << "main: done" << std::endl;
    delete f2;
    delete f1;
    return 0;

}
;

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(3

待天淡蓝洁白时 2024-12-20 06:27:20

您正在创建 8 个过滤器。每个 Filter 对象都有自己的 filterMutex_ - 这些彼此无关。

您正在创建超过 8 个线程。这是故意的吗?

每次调用 run 都会为每个前驱对象启动一个新线程,并在该线程上为该前驱 Filter 对象调用 Filter::run 。所以:

f8->run creates 2 threads for its precursors, calling f6->run and f7->run
 f6->run creates 2 threads: f4->run and f5->run
  f4->run creates 1 thread: f2->run
   f2->run creates 1 thread: f1->run
    f1->run creates no additional threads
  f5->run creates 1 thread: f3->run
   f3->run creates 1 thread: f1->run (different thread from the other f1->run)
    f1->run creates no additional threads
 f7->run creates 1 thread: f3->run
  f3->run creates 1 thread: f1->run
   f1->run creates no additional threads

因此,使用 8 个 Filter 对象,您创建 10 个线程(除了主线程之外),调用 f3->run 两次,然后 f1->run 三次。

对同一对象的多次调用 run 将被序列化。不同的过滤器没有序列化。

不确定这是否会导致您的问题,但正是这种事情让我对设计以及它应该做什么感到好奇。

You are creating 8 Filters. Each Filter object has its own filterMutex_ - these have nothing to do with each other.

You are creating more than 8 threads. Is that deliberate?

Each call to run will launch a new thread for each precursor, calling Filter::run on that thread for that precursor Filter object. So:

f8->run creates 2 threads for its precursors, calling f6->run and f7->run
 f6->run creates 2 threads: f4->run and f5->run
  f4->run creates 1 thread: f2->run
   f2->run creates 1 thread: f1->run
    f1->run creates no additional threads
  f5->run creates 1 thread: f3->run
   f3->run creates 1 thread: f1->run (different thread from the other f1->run)
    f1->run creates no additional threads
 f7->run creates 1 thread: f3->run
  f3->run creates 1 thread: f1->run
   f1->run creates no additional threads

So, with your 8 Filter objects, you create 10 threads (in addition to your main thread), call f3->run twice, and f1->run three times.

The multiple calls to run on the same object will be serialized. Different filters are not serialized.

Not sure if any of this is causing your problem, but it's the kind of thing that makes me wonder about the design, and what it's supposed to be doing.

白芷 2024-12-20 06:27:20

您并不孤单:请参阅 thread 这里,这表明问题是误报“可能是由于新创建的线程重用已终止线程的内存用于线程本地存储而引起的”。

You're not alone: see the thread here, which suggests the problem is a false positive "probably caused by reuse of the memory for thread-local storage from a terminated thread by a newly created thread".

傾旎 2024-12-20 06:27:20

好吧,我不确定你的程序实际上应该做什么,但一般来说,如果你线程独立的操作(例如数学公式)不需要来自你想要线程的任何其他进程的任何输入,那么线程通常很有用,因为在任何其他情况下线程必须等到其他进程可以提供这些数据,因此您可能会浪费大量 CPU 时间。但由于这种情况是不可避免的,线程的艺术是以一种尽可能短且尽可能少出现这种情况的方式来实现您的问题。

在实现线程时,还存在两个线程需要一种资源(例如变量)的问题,并且一个线程可能会在另一个线程正在读取它时更改它,因此可能会提供不一致的数据(而且您的程序可能运行完全不同的数据)如果一个线程比另一个线程快,反之亦然),这实际上称为竞争条件,为了防止这种情况,有 互斥体以防止读取和同时编写某些函数来让某个线程等待另一个线程。

我的猜测是,这两种情况之一发生在您的程序中,因此 vallgrind 会告诉您这些问题,因此在您的位置上,我将仔细检查您的整个代码,并实际上重新考虑任何新线程之间存在或可能存在的任何依赖关系。考虑到主要部分:

f2->connect(f1);
f3->connect(f1);
f4->connect(f2);
f5->connect(f3);
f6->connect(f4);
f6->connect(f5);
f7->connect(f3);
f8->connect(f6);
f8->connect(f7);

boost::unique_lock<boost::shared_mutex> lock(filterMutex_);

想这可能是第一个场景。

链接可能有助于解释您的 valgrind 输出。尤其是“8.2.9. 调试 OpenMP 程序”部分可能会让您感兴趣,因为实际上给出了非常相似的输出作为示例。

这里一个教程,似乎实际上经历了所有这些场景(甚至还有更多) )并很好地解释了如何使用 boost-threading。

Well I am not sure what your program is actually supposed to do, but generally threading is just useful if you thread independent operations like a mathematical formula which wont need any input from any other process you want to thread, because in any other situation the thread has to wait till that other process can give those data and therefore you are risking to waste a lot of CPU time. But as such situations are unavoidable the art of threading is to implement your problem in a way in which such situations are as short and as rare as possible.

While implementing threading there is also the problem of two threads in need of one resource (like a variable) and one might perhaps change it while another thread is reading it and therefore might be provided with an inconsistent data (also your program might run completely different if one thread is faster than in other and the other way around), this is actually called a race-condition and to prevent this there are mutexes to prevent reading and writing at the same time and certain functions to let a certain thread wait for another.

My guess is that one of those two scenarios happens in your program and therefore vallgrind tells you those problems, therefore in your position I would go through your whole code and actually reconsider any dependency there is or might be between any new thread. And considering the main part:

f2->connect(f1);
f3->connect(f1);
f4->connect(f2);
f5->connect(f3);
f6->connect(f4);
f6->connect(f5);
f7->connect(f3);
f8->connect(f6);
f8->connect(f7);

and

boost::unique_lock<boost::shared_mutex> lock(filterMutex_);

I guess it might be the first scenario.

This link might help interpreting your vallgrind output. Especially the "8.2.9. Debugging OpenMP Programs" part might be interresting for you as actually quite similar output is given as an example.

Here an tutorial which seems to actually go through all those scenarios (and even a few more) and explains quite well how to use boost-threading.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文