使用 pthreads 的简单老板-工人模型

发布于 2024-09-15 03:46:31 字数 6251 浏览 7 评论 0原文

我是一名业余程序员,正在尝试使用 pthreads,看看多线程程序可以在多大程度上提高我正在处理的相当长的计算的效率。计算通过 std::list< 运行。字符串>对象,弹出列表的第一个元素,并将其分配给一个用它计算某些内容的线程。该程序跟踪活动线程,并确保始终有一定数量的活动线程在运行。一旦列表为空,程序就会对结果数据进行排序,转储数据文件并终止。

该程序的多线程版本目前无法运行。它在列表中获取 20、40 或 200 个左右的元素(取决于我给出的列表)和段错误。似乎段错误发生在列表的特定元素上,这意味着它们在任何方面都不是随机出现的。

但是奇怪的是,如果我使用调试符号编译并通过 gdb 运行程序,则程序不会出现段错误。它运行完美。当然,速度很慢,但它会按照我期望的方式运行并完成所有事情。

在研究了大家的建议一段时间后,使用(除其他外)valgrind 的工具来尝试理清正在发生的事情。我注意到下面的简化代码(没有 std 库或 pthread 库之外的任何调用)会给 helgrind 带来麻烦,这可能是我的问题的根源。所以这里只是简化的代码,以及 helgrind 的抱怨。

#include <cstdlib>
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <string>
#include <list>
#include <iostream>
#include <signal.h>
#include <sys/select.h>

struct thread_detail {
 pthread_t *threadID; 
 unsigned long num;
};

pthread_mutex_t coutLock;

void *ThreadToSpawn(void *threadarg)
{
   struct thread_detail *my_data;
   my_data = (struct thread_detail *) threadarg;
   int taskid = my_data->num;

   struct timeval timeout;
   for (unsigned long i=0; i < 10; i++)
    { 
     timeout.tv_sec = 0;  timeout.tv_usec = 500000; // half-second 
     select( 0, NULL, NULL, NULL, & timeout );
     pthread_mutex_lock(&coutLock);
     std::cout << taskid << " "; std::cout.flush();
     pthread_mutex_unlock(&coutLock);
    }
   pthread_exit(NULL);
}


int main (int argc, char *argv[])
{
  unsigned long comp_DONE=0; 
  unsigned long comp_START=0;
  unsigned long ms_LAG=10000; // microsecond lag between polling of threads

  // set-up the mutexes
  pthread_mutex_init( &coutLock, NULL );

  if (argc != 3) { std::cout << "Program requires two arguments: (1) number of threads to use,"
                               " and (2) tasks to accomplish. \n"; exit(1); }
  unsigned long NUM_THREADS(atoi( argv[1] ));
  unsigned long comp_TODO(atoi(argv[2]));
  std::cout << "Program will have " << NUM_THREADS << " threads. \n";
  std::list < thread_detail > thread_table;

   while (comp_DONE != comp_TODO) // main loop to set-up and track threads
    {
     // poll stack of computations to see if any have finished, 
     // extract data and remove completed ones from stack
     std::list < thread_detail >::iterator i(thread_table.begin());
     while (i!=thread_table.end())
      {
       if (pthread_kill(*i->threadID,0)!=0) // thread is dead
        { // if there was relevant info in *i we'd extract it here
         if (pthread_join(*i->threadID, NULL)!=0) { std::cout << "Thread join error!\n"; exit(1); }
         pthread_mutex_lock(&coutLock);
         std::cout << i->num << " done. "; std::cout.flush();
         pthread_mutex_unlock(&coutLock);
         delete i->threadID;
         thread_table.erase(i++);  
         comp_DONE++;
        }
       else (i++);
      }
     // if list not full, toss another on the pile
     while ( (thread_table.size() < NUM_THREADS) && (comp_TODO > comp_START) )
      {
        pthread_t *tId( new pthread_t );
        thread_detail Y; Y.threadID=tId; Y.num=comp_START;
        thread_table.push_back(Y);
        int rc( pthread_create( tId, NULL, ThreadToSpawn, (void *)(&(thread_table.back() )) ) );
        if (rc) { printf("ERROR; return code from pthread_create() is %d\n", rc); exit(-1); }
        pthread_mutex_lock(&coutLock);
       std::cout << comp_START << " start. "; std::cout.flush();
        pthread_mutex_unlock(&coutLock);
        comp_START++;
      }

     // wait a specified amount of time
     struct timeval timeout;
     timeout.tv_sec = 0;  timeout.tv_usec = ms_LAG; 
     select( 0, NULL, NULL, NULL, & timeout );
    } // the big while loop

   pthread_exit(NULL);
}

Helgrind 输出


==2849== Helgrind, a thread error detector
==2849== Copyright (C) 2007-2009, and GNU GPL'd, by OpenWorks LLP et al.
==2849== Using Valgrind-3.6.0.SVN-Debian and LibVEX; rerun with -h for copyright info
==2849== Command: ./thread2 2 6
==2849== 
Program will have 2 threads. 
==2849== Thread #2 was created
==2849==    at 0x64276BE: clone (clone.S:77)
==2849==    by 0x555E172: pthread_create@@GLIBC_2.2.5 (createthread.c:75)
==2849==    by 0x4C2D42C: pthread_create_WRK (hg_intercepts.c:230)
==2849==    by 0x4C2D4CF: pthread_create@* (hg_intercepts.c:257)
==2849==    by 0x401374: main (in /home/rybu/prog/regina/exercise/thread2)
==2849== 
==2849== Thread #1 is the program's root thread
==2849== 
==2849== Possible data race during write of size 8 at 0x7feffffe0 by thread #2
==2849==    at 0x4C2D54C: mythread_wrapper (hg_intercepts.c:200)
==2849==  This conflicts with a previous read of size 8 by thread #1
==2849==    at 0x4C2D440: pthread_create_WRK (hg_intercepts.c:235)
==2849==    by 0x4C2D4CF: pthread_create@* (hg_intercepts.c:257)
==2849==    by 0x401374: main (in /home/rybu/prog/regina/exercise/thread2)
==2849== 
 [0 start.]  [1 start.] 0 1 0 1 0 1 0 1 0 1 0 1 0 1 0 1 0 1 0 1  [0 done.]  [1 done.]  [2 start.]  [3 start.] 2 3 2 3 2 3 2 3 2 3 2 3 2 3 2 3 2 3 2 3  [2 done.]  [3 done.]  [4 start.]  [5 start.] 4 5 4 5 4 5 4 5 4 5 4 5 4 5 4 5 4 5 4 5  [4 done.]  [5 done.] ==2849== 
==2849== For counts of detected and suppressed errors, rerun with: -v
==2849== Use --history-level=approx or =none to gain increased speed, at
==2849== the cost of reduced accuracy of conflicting-access information
==2849== ERROR SUMMARY: 6 errors from 1 contexts (suppressed: 675 from 37)

大概我以错误的方式使用 pthreads,但我不太清楚我做错了什么。此外,我不确定 helgrind 的输出是什么。早些时候 helgrind 抱怨过,因为我没有在线程上调用 pthread_join ,而由于其他原因,代码知道这些线程已经死了。添加 pthread_join 可以解决这些问题。

在线阅读各种 pthread 教程后,我发现进行如此多的线程创建和销毁可能毫无意义,如上面的代码所示。让 N 个线程同时运行,并使用互斥体和共享内存在“BOSS”线程和“WORKER”线程之间传递数据,只在程序结束时杀死 WORKER 线程一次,可能会更有效。所以这是我最终必须调整的事情,但是上面的代码有什么明显的错误吗?

编辑:我越来越频繁地注意到一些关键字。我试图创建的东西的术语显然是线程池。此外,有各种标准实现的建议,例如在 boost 库中,有 boost::threadpool、boost::task、boost::thread。其中一些似乎只是建议。我在这里遇到人们提到 你可以将 ASIO 和 boost::thread 结合起来完成我正在寻找的东西。类似地,还有一个消息队列类。

嗯,看来我只是触及了当今许多人正在思考的主题的表面,但它似乎有点生发,就像 1989 年的 OOP 之类的。

I'm an amateur programmer that's experimenting using pthreads, to see to what extent a multi-threaded program can lead to efficiencies in a rather long computation I'm working on. The computation runs through a std::list< string > object, popping the first element of the list off, and farming it out to a thread that computes something with it. The program keeps track of the active threads, and ensures there's always a certain number of active threads running. Once the list is empty, the program sorts the resulting data, dumps a data file and terminates.

The multi-threaded version of the program currently does not work. It gets 20 or 40 or 200 or so elements down the list (depending on which list I give it), and segfaults. It seems that the segfault happens on particular elements of the list, meaning they don't appear random in any way.

BUT the strange thing is, if I compile with debug symbols and run the program through gdb, the program does not segfault. It runs perfectly. Slowly, of course, but it runs and does everything the way I expect it to.

After playing around with everyone's suggestions for a while, using (among other things) valgrind's tools to try and sort out what's happening. I've noticed the simplified code below (without any calls outside the std library or the pthread library) causes trouble for helgrind and this is likely the source of my problems. So here is just the simplified code, and helgrind's complaints.

#include <cstdlib>
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <string>
#include <list>
#include <iostream>
#include <signal.h>
#include <sys/select.h>

struct thread_detail {
 pthread_t *threadID; 
 unsigned long num;
};

pthread_mutex_t coutLock;

void *ThreadToSpawn(void *threadarg)
{
   struct thread_detail *my_data;
   my_data = (struct thread_detail *) threadarg;
   int taskid = my_data->num;

   struct timeval timeout;
   for (unsigned long i=0; i < 10; i++)
    { 
     timeout.tv_sec = 0;  timeout.tv_usec = 500000; // half-second 
     select( 0, NULL, NULL, NULL, & timeout );
     pthread_mutex_lock(&coutLock);
     std::cout << taskid << " "; std::cout.flush();
     pthread_mutex_unlock(&coutLock);
    }
   pthread_exit(NULL);
}


int main (int argc, char *argv[])
{
  unsigned long comp_DONE=0; 
  unsigned long comp_START=0;
  unsigned long ms_LAG=10000; // microsecond lag between polling of threads

  // set-up the mutexes
  pthread_mutex_init( &coutLock, NULL );

  if (argc != 3) { std::cout << "Program requires two arguments: (1) number of threads to use,"
                               " and (2) tasks to accomplish. \n"; exit(1); }
  unsigned long NUM_THREADS(atoi( argv[1] ));
  unsigned long comp_TODO(atoi(argv[2]));
  std::cout << "Program will have " << NUM_THREADS << " threads. \n";
  std::list < thread_detail > thread_table;

   while (comp_DONE != comp_TODO) // main loop to set-up and track threads
    {
     // poll stack of computations to see if any have finished, 
     // extract data and remove completed ones from stack
     std::list < thread_detail >::iterator i(thread_table.begin());
     while (i!=thread_table.end())
      {
       if (pthread_kill(*i->threadID,0)!=0) // thread is dead
        { // if there was relevant info in *i we'd extract it here
         if (pthread_join(*i->threadID, NULL)!=0) { std::cout << "Thread join error!\n"; exit(1); }
         pthread_mutex_lock(&coutLock);
         std::cout << i->num << " done. "; std::cout.flush();
         pthread_mutex_unlock(&coutLock);
         delete i->threadID;
         thread_table.erase(i++);  
         comp_DONE++;
        }
       else (i++);
      }
     // if list not full, toss another on the pile
     while ( (thread_table.size() < NUM_THREADS) && (comp_TODO > comp_START) )
      {
        pthread_t *tId( new pthread_t );
        thread_detail Y; Y.threadID=tId; Y.num=comp_START;
        thread_table.push_back(Y);
        int rc( pthread_create( tId, NULL, ThreadToSpawn, (void *)(&(thread_table.back() )) ) );
        if (rc) { printf("ERROR; return code from pthread_create() is %d\n", rc); exit(-1); }
        pthread_mutex_lock(&coutLock);
       std::cout << comp_START << " start. "; std::cout.flush();
        pthread_mutex_unlock(&coutLock);
        comp_START++;
      }

     // wait a specified amount of time
     struct timeval timeout;
     timeout.tv_sec = 0;  timeout.tv_usec = ms_LAG; 
     select( 0, NULL, NULL, NULL, & timeout );
    } // the big while loop

   pthread_exit(NULL);
}

Helgrind output


==2849== Helgrind, a thread error detector
==2849== Copyright (C) 2007-2009, and GNU GPL'd, by OpenWorks LLP et al.
==2849== Using Valgrind-3.6.0.SVN-Debian and LibVEX; rerun with -h for copyright info
==2849== Command: ./thread2 2 6
==2849== 
Program will have 2 threads. 
==2849== Thread #2 was created
==2849==    at 0x64276BE: clone (clone.S:77)
==2849==    by 0x555E172: pthread_create@@GLIBC_2.2.5 (createthread.c:75)
==2849==    by 0x4C2D42C: pthread_create_WRK (hg_intercepts.c:230)
==2849==    by 0x4C2D4CF: pthread_create@* (hg_intercepts.c:257)
==2849==    by 0x401374: main (in /home/rybu/prog/regina/exercise/thread2)
==2849== 
==2849== Thread #1 is the program's root thread
==2849== 
==2849== Possible data race during write of size 8 at 0x7feffffe0 by thread #2
==2849==    at 0x4C2D54C: mythread_wrapper (hg_intercepts.c:200)
==2849==  This conflicts with a previous read of size 8 by thread #1
==2849==    at 0x4C2D440: pthread_create_WRK (hg_intercepts.c:235)
==2849==    by 0x4C2D4CF: pthread_create@* (hg_intercepts.c:257)
==2849==    by 0x401374: main (in /home/rybu/prog/regina/exercise/thread2)
==2849== 
 [0 start.]  [1 start.] 0 1 0 1 0 1 0 1 0 1 0 1 0 1 0 1 0 1 0 1  [0 done.]  [1 done.]  [2 start.]  [3 start.] 2 3 2 3 2 3 2 3 2 3 2 3 2 3 2 3 2 3 2 3  [2 done.]  [3 done.]  [4 start.]  [5 start.] 4 5 4 5 4 5 4 5 4 5 4 5 4 5 4 5 4 5 4 5  [4 done.]  [5 done.] ==2849== 
==2849== For counts of detected and suppressed errors, rerun with: -v
==2849== Use --history-level=approx or =none to gain increased speed, at
==2849== the cost of reduced accuracy of conflicting-access information
==2849== ERROR SUMMARY: 6 errors from 1 contexts (suppressed: 675 from 37)

Presumably I'm using pthreads in an incorrect way but it's not so clear to me what I'm doing wrong. Moreover, I'm not sure what to make the of the helgrind output. Earlier helgrind was complaining because I had not called pthread_join on threads that for other reasons the code knew was dead. Adding the pthread_join took care of those complaints.

Reading various pthread tutorials on-line I've discovered that it's probably pointless to have so much thread creation and destruction going on, as in the above code. It's probably more efficient to have N threads running simultaneously, and use mutexes and shared memory to pass data between the "BOSS" thread and the "WORKER" threads, only killing the WORKER threads once, at the end of the program. So that's something I'll eventually have to adjust for but is there anything obviously wrong with the above code?

Edit: I'm noticing some keywords more and more often. The terminology for the thing I'm trying to create is apparently a thread pool. Moreover, there are various proposals for standard implementations of this, for example in the boost library there's boost::threadpool, boost::task, boost::thread. Some of these appear to be only proposals. I come across threads here where people mention you can combine ASIO and boost::thread to accomplish what I'm looking for. Similarly there's a message queue class.

Hmm, so it seems like I'm scratching at the surface of a topics many people are thinking about nowadays, but it seems kind of germinal, like OOP was in 1989 or something.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(3

傲娇萝莉攻 2024-09-22 03:46:31

尝试启用核心转储(ulimit -c unlimited),然后在没有 gdb 的情况下运行程序。当它崩溃时,它应该留下一个核心文件,然后您可以使用 gdb 打开该文件并开始调查(gdb)。

Try enabling core dumps (ulimit -c unlimited), then run your program without gdb. When it crashes, it should leave behind a core file, which you can then open up with gdb and start investigating (gdb <executable-file> <core-file>).

倾`听者〃 2024-09-22 03:46:31

关于 top,您使用了多少个线程?我在顶部输出中没有看到数据,但在使用线程时看到了虚拟列气球。我的理解(也许我应该要求确定)是每个线程都有自己可能使用的内存空间。该内存实际上并未被使用,只是在需要时可用,这就是为什么该数字可以变得相当高而不会真正引起问题的原因。就其本身而言,记忆可能并不是灾难性的。您应该查看数据利用率是否与您使用的线程数呈线性关系。

关于gdb。正如您所指出的,gdb 不会修复您的代码,但如果您损坏了内存,它可能会移动发生错误的位置。如果损坏发生在您不再返回的区域或您已经释放且从未尝试重用的区域,则问题的症状将会消失。离开,直到您需要在某些关键区域演示或使用您的代码。

另外,您还需要查看 helgrind,它是 valgrind 的一部分。如果你有锁问题,这种东西就是它的面包和黄油:

Helgrind 是一个 Valgrind 工具,用于检测使用 POSIX pthreads 线程原语的 C、C++ 和 Fortran 程序中的同步错误。

只要这样做:

valgrind --tool=helgrind {your program}

Regarding top, how many threads are you using? I don't see DATA in my top output but have seen the Virtual column balloon when using threads. My understanding (and perhaps I should ask to be sure) is that each thread has its own memory space that it may potentially use. That memory isn't actually being used, it's just available if needed, which is why that number can get quite high without really causing problems. In and of itself the memory probably isn't catastrophic. You should see if the DATA utilization scales linearly with the number of threads you're using.

Regarding gdb. As you noted, gdb won't fix your code, it may move around where your errors occur if you're corrupting memory though. If the corruption occurs in an area that you don't go back to or that you've already released and don't ever try to reuse the symptoms of your problems will go away. Go away until you need to demo or use your code in some critical area that is.

Also, you'll want to take a look at helgrind, part of valgrind. This kind of thing is its bread and butter if you have a lock problem:

Helgrind is a Valgrind tool for detecting synchronisation errors in C, C++ and Fortran programs that use the POSIX pthreads threading primitives.

Just do:

valgrind --tool=helgrind {your program}
星星的軌跡 2024-09-22 03:46:31

你确定这是完整的代码吗?我看不到您在哪里创建线程或从哪里调用 BuildKCData。

在 pthread_kill() 之后你应该有一个内存屏障,尽管我怀疑它在这种情况下会产生影响。

编辑:您混淆了按顺序执行和缓存一致性。

缓存一致性: x86(当前)保证对齐的 4 字节访问是原子的,因此线程 A 中的 a[0]=123a[1]=线程 B 中的 456 将起作用 — 线程 C 最终将看到“123,456”。周围有各种缓存一致性协议,但我相信它大约是 MRSW 锁。

乱序执行: x86 不保证读取的顺序(也可能保证写入的顺序;关于 Linux 内核中是否需要 sfence 存在争议)。这可以让 CPU 更有效地预取数据,但这意味着线程 A 中的 a[0]=123,a[1]a[1]=456,a[0]线程 B 中的 code> 都可能返回 0,因为 a[1] 的获取可能发生在 a[0] 的加载之前。有两种一般方法可以解决此问题:

  • 仅当您持有锁时才访问共享数据。特别是,不要读取锁之外的共享数据。这是否意味着每个条目的锁定或整个数组的锁定取决于您,以及您认为锁争用可能是什么样的(提示:它通常不是很大)。
  • 在需要有序的事物之间设置记忆屏障。这很难做到正确(pthread 甚至没有内存屏障;pthread_barrier 更像是一个同步点)。

虽然内存屏障是最近的趋势,但锁定更容易更容易实现(我持有锁定,因此不允许其他人更改我脚下的数据)。内存屏障在某些圈子里非常流行,但还有很多事情需要解决(我希望这次读取是原子的我希望其他线程原子写入我希望其他线程使用屏障,并且哦,是的,我也需要使用屏障)。

如果锁定太慢,那么减少争用比用屏障替换锁并希望你做对了要有效得多。

Are you sure it's complete code? I don't see where you're creating threads or where BuildKCData is being called from.

You ought to have a memory barrier after pthread_kill(), even though I doubt it makes a difference in this case.

EDIT: You're confusing in-order execution and cache-consistency.

Cache consistency: x86 (currently) guarantees that aligned 4-byte accesses are atomic, so a[0]=123 in thread A and a[1]=456 in thread B will work — thread C will eventually see "123,456". There are various cache-consistency protocols around, but I believe it's approximately a MRSW lock.

Out-of-order execution: x86 doesn't guarantee ordering of reads (and possibly writes; there was a debate about whether sfence was needed in the linux kernel). This lets the CPU prefetch data more effectively, but it means a[0]=123,a[1] in thread A and a[1]=456,a[0] in thread B could both return 0, because the fetch of a[1] can happen before the load of a[0]. There are two general ways to fix this:

  • Only access shared data when you're holding the lock. In particular, do not read shared data outside of the lock. Whether this means a lock for each entry or a lock for the entire array is up to you, and what you think lock contention is likely to be like (tip: it's not usually very big).
  • Stick a memory barrier between things that need to be in order. This is difficult to get right (pthread doesn't even have memory barriers; pthread_barrier more like a sync point).

While memory barriers are a recent trend, locking is far easier to get right (I am holding the lock, therefore nobody else is allowed to change the data under my feet). Memory barriers are all the rage in some circles, but there's a lot more to get right (I hope this read is atomic, I hope other threads write atomically, I hope other threads use a barrier, and oh yeah, I need to use a barrier too).

And if locking's too slow, reducing contention will be much more effective than replacing locks with barriers and hoping you got it right.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文