由于使用事件而产生的开销

发布于 2024-08-02 08:27:29 字数 2682 浏览 12 评论 0原文

我有一个自定义线程池类,它创建一些线程,每个线程等待自己的事件(信号)。当新作业添加到线程池时,它会唤醒第一个空闲线程,以便它执行该作业。

问题如下:我有大约 1000 个循环,每个循环大约有 10'000 次迭代。这些循环必须按顺序执行,但我有 4 个可用的 CPU。我尝试做的是将 10'000 次迭代循环拆分为 4 个 2'500 次迭代循环,即每个线程一个。但我必须等待 4 个小循环完成才能进入下一个“大”迭代。这意味着我无法捆绑这些工作。

我的问题是,使用线程池和 4 个线程比顺序执行作业要慢得多(由单独的线程执行一个循环比直接在主线程中顺序执行要慢得多)。

我使用的是 Windows,因此我使用 CreateEvent() 创建事件,然后使用 WaitForMultipleObjects(2, handlers, false, INFINITE) 等待其中一个事件,直到主线程调用SetEvent()

看来整个事件(以及使用关键部分的线程之间的同步)非常昂贵!

我的问题是:使用事件花费“大量”时间是否正常?如果是这样,我是否可以使用另一种机制,并且时间成本会更少?

这是一些代码来说明(从我的线程池类复制的一些相关部分):

// thread function
unsigned __stdcall ThreadPool::threadFunction(void* params) {
    // some housekeeping
    HANDLE signals[2];
    signals[0] = waitSignal;
    signals[1] = endSignal;

    do {
        // wait for one of the signals
        waitResult = WaitForMultipleObjects(2, signals, false, INFINITE);

        // try to get the next job parameters;
        if (tp->getNextJob(threadId, data)) {
            // execute job
            void* output = jobFunc(data.params);

            // tell thread pool that we're done and collect output
            tp->collectOutput(data.ID, output);
        }

        tp->threadDone(threadId);
    }
    while (waitResult - WAIT_OBJECT_0 == 0);

    // if we reach this point, endSignal was sent, so we are done !

    return 0;
}

// create all threads
for (int i = 0; i < nbThreads; ++i) {
    threadData data;
    unsigned int threadId = 0;
    char eventName[20];

    sprintf_s(eventName, 20, "WaitSignal_%d", i);

    data.handle = (HANDLE) _beginthreadex(NULL, 0, ThreadPool::threadFunction,
        this, CREATE_SUSPENDED, &threadId);
    data.threadId = threadId;
    data.busy = false;
    data.waitSignal = CreateEvent(NULL, true, false, eventName);

    this->threads[threadId] = data;

    // start thread
    ResumeThread(data.handle);
}

// add job
void ThreadPool::addJob(int jobId, void* params) {
    // housekeeping
    EnterCriticalSection(&(this->mutex));

    // first, insert parameters in the list
    this->jobs.push_back(job);

    // then, find the first free thread and wake it
    for (it = this->threads.begin(); it != this->threads.end(); ++it) {
        thread = (threadData) it->second;

        if (!thread.busy) {
            this->threads[thread.threadId].busy = true;

            ++(this->nbActiveThreads);

            // wake thread such that it gets the next params and runs them
            SetEvent(thread.waitSignal);
            break;
        }
    }

    LeaveCriticalSection(&(this->mutex));
}

I have a custom thread pool class, that creates some threads that each wait on their own event (signal). When a new job is added to the thread pool, it wakes the first free thread so that it executes the job.

The problem is the following : I have around 1000 loops of each around 10'000 iterations do to. These loops must be executed sequentially, but I have 4 CPUs available. What I try to do is to split the 10'000 iteration loops into 4 2'500 iterations loops, ie one per thread. But I have to wait for the 4 small loops to finish before going to the next "big" iteration. This means that I can't bundle the jobs.

My problem is that using the thread pool and 4 threads is much slower than doing the jobs sequentially (having one loop executed by a separate thread is much slower than executing it directly in the main thread sequentially).

I'm on Windows, so I create events with CreateEvent() and then wait on one of them using WaitForMultipleObjects(2, handles, false, INFINITE) until the main thread calls SetEvent().

It appears that this whole event thing (along with the synchronization between the threads using critical sections) is pretty expensive !

My question is : is it normal that using events takes "a lot of" time ? If so, is there another mechanism that I could use and that would be less time-expensive ?

Here is some code to illustrate (some relevant parts copied from my thread pool class) :

// thread function
unsigned __stdcall ThreadPool::threadFunction(void* params) {
    // some housekeeping
    HANDLE signals[2];
    signals[0] = waitSignal;
    signals[1] = endSignal;

    do {
        // wait for one of the signals
        waitResult = WaitForMultipleObjects(2, signals, false, INFINITE);

        // try to get the next job parameters;
        if (tp->getNextJob(threadId, data)) {
            // execute job
            void* output = jobFunc(data.params);

            // tell thread pool that we're done and collect output
            tp->collectOutput(data.ID, output);
        }

        tp->threadDone(threadId);
    }
    while (waitResult - WAIT_OBJECT_0 == 0);

    // if we reach this point, endSignal was sent, so we are done !

    return 0;
}

// create all threads
for (int i = 0; i < nbThreads; ++i) {
    threadData data;
    unsigned int threadId = 0;
    char eventName[20];

    sprintf_s(eventName, 20, "WaitSignal_%d", i);

    data.handle = (HANDLE) _beginthreadex(NULL, 0, ThreadPool::threadFunction,
        this, CREATE_SUSPENDED, &threadId);
    data.threadId = threadId;
    data.busy = false;
    data.waitSignal = CreateEvent(NULL, true, false, eventName);

    this->threads[threadId] = data;

    // start thread
    ResumeThread(data.handle);
}

// add job
void ThreadPool::addJob(int jobId, void* params) {
    // housekeeping
    EnterCriticalSection(&(this->mutex));

    // first, insert parameters in the list
    this->jobs.push_back(job);

    // then, find the first free thread and wake it
    for (it = this->threads.begin(); it != this->threads.end(); ++it) {
        thread = (threadData) it->second;

        if (!thread.busy) {
            this->threads[thread.threadId].busy = true;

            ++(this->nbActiveThreads);

            // wake thread such that it gets the next params and runs them
            SetEvent(thread.waitSignal);
            break;
        }
    }

    LeaveCriticalSection(&(this->mutex));
}

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(9

孤寂小茶 2024-08-09 08:27:29

在我看来,这是一种生产者消费者模式,它可以用两个信号量来实现,一个信号量保护队列溢出,另一个信号量保护空队列。

您可以在此处找到一些详细信息。

This looks to me as a producer consumer pattern, which can be implented with two semaphores, one guarding the queue overflow, the other the empty queue.

You can find some details here.

戴着白色围巾的女孩 2024-08-09 08:27:29

是的,WaitForMultipleObjects 非常昂贵。如果您的作业很小,同步开销将开始超过实际执行作业的成本,如您所见。

解决这个问题的一种方法是将多个作业捆绑为一个:如果您得到了一份“小”作业(无论您如何评估此类事情),请将其存储在某个地方,直到您有足够的小作业一起组成一个合理大小的作业。然后将它们全部发送到工作线程进行处理。

或者,您可以使用多读取器单写入器队列来存储作业,而不是使用信号发送。在此模型中,每个工作线程都尝试从队列中获取作业。当它找到一个时,它就会完成工作;如果没有,它会休眠一小段时间,然后醒来并重试。这会降低每个任务的开销,但即使没有工作要做,线程也会占用 CPU。这完全取决于问题的确切性质。

Yes, WaitForMultipleObjects is pretty expensive. If your jobs are small, the synchronization overhead will start to overwhelm the cost of actually doing the job, as you're seeing.

One way to fix this is bundle multiple jobs into one: if you get a "small" job (however you evaluate such things), store it someplace until you have enough small jobs together to make one reasonably-sized job. Then send all of them to a worker thread for processing.

Alternately, instead of using signaling you could use a multiple-reader single-writer queue to store your jobs. In this model, each worker thread tries to grab jobs off the queue. When it finds one, it does the job; if it doesn't, it sleeps for a short period, then wakes up and tries again. This will lower your per-task overhead, but your threads will take up CPU even when there's no work to be done. It all depends on the exact nature of the problem.

小巷里的女流氓 2024-08-09 08:27:29

请注意,在发出 endSignal 后您仍在请求下一份工作。

for( ;; ) {
    // wait for one of the signals
    waitResult = WaitForMultipleObjects(2, signals, false, INFINITE);
    if( waitResult - WAIT_OBJECT_0 != 0 )
        return;
    //....
}

Watch out, you are still asking for a next job after the endSignal is emitted.

for( ;; ) {
    // wait for one of the signals
    waitResult = WaitForMultipleObjects(2, signals, false, INFINITE);
    if( waitResult - WAIT_OBJECT_0 != 0 )
        return;
    //....
}
深陷 2024-08-09 08:27:29

既然您说并行执行比顺序执行慢得多,我假设您的内部 2500 次循环迭代的处理时间很小(在几微秒范围内)。那么除了检查算法以分割更大的处理块之外,您无能为力; OpenMP 不会有帮助,所有其他同步技术也不会有帮助,因为它们从根本上来说都依赖于事件(自旋循环不符合条件)。

另一方面,如果 2500 次循环迭代的处理时间大于 100 微秒(在当前 PC 上),则可能会遇到硬件限制。如果您的处理使用大量内存带宽,将其拆分为四个处理器不会给您带来更多带宽,实际上会因为冲突而给您带来更少的带宽。您还可能遇到缓存循环问题,其中前 1000 次迭代中的每一次都会刷新并重新加载 4 个核心的缓存。那么就没有一种解决方案,并且根据您的目标硬件,可能没有一种解决方案。

Since you say that it is much slower in parallel than sequential execution, I assume that your processing time for your internal 2500 loop iterations is tiny (in the few micro seconds range). Then there is not much you can do except review your algorithm to split larger chunks of precessing; OpenMP won't help and every other synchronization techniques won't help either because they fundamentally all rely on events (spin loops do not qualify).

On the other hand, if your processing time of the 2500 loop iterations is larger than 100 micro seconds (on current PCs), you might be running into limitations of the hardware. If your processing uses a lot of memory bandwidth, splitting it to four processors will not give you more bandwidth, it will actually give you less because of collisions. You could also be running into problems of cache cycling where each of your top 1000 iteration will flush and reload the cache of the 4 cores. Then there is no one solution, and depending on your target hardware, there may be none.

殤城〤 2024-08-09 08:27:29

如果您只是并行化循环并使用 vs 2008,我建议您查看 OpenMP。如果您使用的是 Visual Studio 2010 beta 1,我建议您查看 并行模式库,特别是 “并行”/“并行每个”
api
"任务组 类,因为这些可能会做你想要做的事情,只是用更少的代码。

关于你关于性能的问题,这实际上取决于你在迭代期间安排了多少工作以及什么。如果你经常使用它并且你的工作量很小,那么 WaitForMultipleObjects 的成本可能会非常昂贵,这就是为什么我建议使用已经构建的实现,并且你还需要确保你没有在调试模式下运行。任务本身不会阻塞锁、I/O 或内存分配,并且您不会遇到错误共享,

我建议在类似 < 的探查器下查看此问题 。 a href="http://msdn.microsoft.com/en-us/performance/default.aspx" rel="nofollow noreferrer">xperf Visual Studio 2010 beta 1 中的 f1 分析器(它有 2 个新功能)有助于查看争用的并发模式)或英特尔的 vtune。

您还可以共享您在任务中运行的代码,这样人们就可以更好地了解您在做什么,因为我总是在性能问题上得到的答案首先是“这取决于”,其次是“您有吗?”对其进行了简介。”

祝你好运

-瑞克

If you are just parallelizing loops and using vs 2008, I'd suggest looking at OpenMP. If you're using visual studio 2010 beta 1, I'd suggesting looking at the parallel pattern library, particularly the "parallel for" / "parallel for each"
apis
or the "task group class because these will likely do what you're attempting to do, only with less code.

Regarding your question about performance, here it really depends. You'll need to look at how much work you're scheduling during your iterations and what the costs are. WaitForMultipleObjects can be quite expensive if you hit it a lot and your work is small which is why I suggest using an implementation already built. You also need to ensure that you aren't running in debug mode, under a debugger and that the tasks themselves aren't blocking on a lock, I/O or memory allocation, and you aren't hitting false sharing. Each of these has the potential to destroy scalability.

I'd suggest looking at this under a profiler like xperf the f1 profiler in visual studio 2010 beta 1 (it has 2 new concurrency modes which help see contention) or Intel's vtune.

You could also share the code that you're running in the tasks, so folks could get a better idea of what you're doing, because the answer I always get with performance issues is first "it depends" and second, "have you profiled it."

Good Luck

-Rick

━╋う一瞬間旳綻放 2024-08-09 08:27:29

它不应该那么昂贵,但是如果您的工作几乎不需要任何时间,那么线程和同步对象的开销将变得很大。像这样的线程池对于处理时间较长的作业或使用大量 IO 而不是 CPU 资源的作业来说效果更好。如果处理作业时受 CPU 限制,请确保每个 CPU 仅具有 1 个线程。

可能还有其他问题,getNextJob如何获取它的数据来处理?如果有大量数据复制,那么您的开销又会显着增加。

我会通过让每个线程不断从队列中拉出作业直到队列为空来优化它。这样,您可以将一百个作业传递到线程池,并且同步对象将仅使用一次来启动线程。我还将作业存储在队列中,并将它们的指针、引用或迭代器传递给线程,而不是复制数据。

It shouldn't be that expensive, but if your job takes hardly any time at all, then the overhead of the threads and sync objects will become significant. Thread pools like this work much better for longer-processing jobs or for those that use a lot of IO instead of CPU resources. If you are CPU-bound when processing a job, ensure you only have 1 thread per CPU.

There may be other issues, how does getNextJob get its data to process? If there's a large amount of data copying, then you've increased your overhead significantly again.

I would optimise it by letting each thread keep pulling jobs off the queue until the queue is empty. that way, you can pass a hundred jobs to the thread pool and the sync objects will be used just the once to kick off the thread. I'd also store the jobs in a queue and pass a pointer, reference or iterator to them to the thread instead of copying the data.

唐婉 2024-08-09 08:27:29

线程之间的上下文切换也可能很昂贵。在某些情况下,开发一个可用于通过一个线程或多线程顺序处理作业的框架是很有趣的。这样您就可以两全其美。

顺便问一下,你的问题到底是什么?我将能够用更精确的问题来更准确地回答:)

编辑:

在某些情况下,事件部分可能会比​​您的处理消耗更多,但不应该那么昂贵,除非您的处理速度非常快。在这种情况下,线程之间的切换也很昂贵,因此我的回答第一部分是按顺序做事......

您应该寻找线程间同步瓶颈。您可以跟踪线程等待时间开始...

编辑:更多提示后...

如果我猜对了,您的问题是有效地使用所有计算机核心/处理器来并行化一些本质上连续的处理。

假设您有 4 个核心和 10000 个循环来计算,如您的示例(在评论中)所示。你说需要等4个线程结束才可以继续。然后您可以简化同步过程。你只需要给你的四个线程 thr nth, nth+1, nth+2, nth+3 循环,等待四个线程完成然后继续。您应该使用集合点或屏障(等待 n 个线程完成的同步机制)。 Boost 具有这样的机制。您可以查看 windows 实现以提高效率。您的线程池并不真正适合该任务。在关键部分中搜索可用线程会消耗您的 CPU 时间。不是活动部分。

The context switching between threads can be expensive too. It is interesting in some cases to develop a framework you can use to process your jobs sequentially with one thread or with multiple threads. This way you can have the best of the two worlds.

By the way, what is your question exactly ? I will be able to answer more precisely with a more precise question :)

EDIT:

The events part can consume more than your processing in some cases, but should not be that expensive, unless your processing is really fast to achieve. In this case, switching between thredas is expensive too, hence my answer first part on doing things sequencially ...

You should look for inter-threads synchronisation bottlenecks. You can trace threads waiting times to begin with ...

EDIT: After more hints ...

If I guess correctly, your problem is to efficiently use all your computer cores/processors to parralellize some processing essencialy sequential.

Take that your have 4 cores and 10000 loops to compute as in your example (in a comment). You said that you need to wait for the 4 threads to end before going on. Then you can simplify your synchronisation process. You just need to give your four threads thr nth, nth+1, nth+2, nth+3 loops, wait for the four threads to complete then going on. You should use a rendezvous or barrier (a synchronization mechanism that wait for n threads to complete). Boost has such a mechanism. You can look the windows implementation for efficiency. Your thread pool is not really suited to the task. The search for an available thread in a critical section is what is killing your CPU time. Not the event part.

请别遗忘我 2024-08-09 08:27:29

看来这整个事件的事情
(随着同步
线程之间使用关键
部分)相当昂贵!

“贵”是一个相对词。喷气式飞机很贵吗?是汽车吗?或者自行车...鞋子...?

在这种情况下,问题是:相对于 JobFunction 执行所需的时间,事件是否“昂贵”?发布一些绝对数字将有所帮助:“无线程”时该过程需要多长时间?是几个月,还是几飞秒?

当您增加线程池大小时,时间会发生什么变化?尝试池大小为 1,然后是 2,然后是 4,等等。

另外,由于您过去在线程池方面遇到过一些问题,我建议进行一些调试
计算您的线程函数实际被调用的次数...它符合您的预期吗?

凭空挑选一个数字(不了解目标系统的任何信息,并假设您没有在未显示的代码中做任何“巨大”的事情),我预计每个“工作”的“事件开销”以微秒为单位进行测量。也许一百左右。如果在 JobFunction 中执行算法所花费的时间并不比这个时间多得多,那么您的线程可能会浪费您的时间而不是节省时间。

It appears that this whole event thing
(along with the synchronization
between the threads using critical
sections) is pretty expensive !

"Expensive" is a relative term. Are jets expensive? Are cars? or bicycles... shoes...?

In this case, the question is: are events "expensive" relative to the time taken for JobFunction to execute? It would help to publish some absolute figures: How long does the process take when "unthreaded"? Is it months, or a few femtoseconds?

What happens to the time as you increase the threadpool size? Try a pool size of 1, then 2 then 4, etc.

Also, as you've had some issues with threadpools here in the past, I'd suggest some debug
to count the number of times that your threadfunction is actually invoked... does it match what you expect?

Picking a figure out of the air (without knowing anything about your target system, and assuming you're not doing anything 'huge' in code you haven't shown), I'd expect the "event overhead" of each "job" to be measured in microseconds. Maybe a hundred or so. If the time taken to perform the algorithm in JobFunction is not significantly MORE than this time, then your threads are likely to cost you time rather than save it.

想你只要分分秒秒 2024-08-09 08:27:29

如前所述,线程增加的开销量取决于完成您定义的“作业”所需的相对时间量。因此,找到工作块大小的平衡非常重要,该平衡可以最大限度地减少工作块的数量,但又不会让处理器闲置以等待最后一组计算完成。

您的编码方法通过主动寻找空闲线程来提供新工作,从而增加了开销工作量。操作系统已经在跟踪这一点,并且效率更高。此外,您的函数 ThreadPool::addJob() 可能会发现所有线程都在使用中,并且无法委派工作。但它没有提供与该问题相关的任何返回代码。如果您没有以某种方式检查这种情况并且没有注意到结果中的错误,则意味着始终有空闲的处理器。我建议重新组织代码,以便 addJob() 执行其名称 - 仅添加一个作业(无需查找甚至不关心谁执行该作业),而每个工作线程在完成现有工作后会主动获取新工作。

As mentioned previously, the amount of overhead added by threading depends on the relative amount of time taken to do the "jobs" that you defined. So it is important to find a balance in the size of the work chunks that minimizes the number of pieces but does not leave processors idle waiting for the last group of computations to complete.

Your coding approach has increased the amount of overhead work by actively looking for an idle thread to supply with new work. The operating system is already keeping track of that and doing it a lot more efficiently. Also, your function ThreadPool::addJob() may find that all of the threads are in use and be unable to delegate the work. But it does not provide any return code related to that issue. If you are not checking for this condition in some way and are not noticing errors in the results, it means that there are idle processors always. I would suggest reorganizing the code so that addJob() does what it is named -- adds a job ONLY (without finding or even caring who does the job) while each worker thread actively gets new work when it is done with its existing work.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文