如何在 C++ 中同步和合并多个线程的结果?
我有一个数据源不断地输入数据包。有 5 个线程(A、B、C、D、E)处理数据包。请注意,5 个线程的速度完全不同,并且它们为每个传入数据包生成 5 个不同的特征(每个线程生成 1 个特征)。
5 个线程的进度不同:当 A 分析完前 10 个包时,B 可能只完成了包 1、包 2,而 C 可能根本没有完成单个包。
我的任务是匹配5个线程的结果,当前10个数据包的所有5个特征都可用时开始最终分析。
我的问题是: - 如何组合来自不同线程的结果,确保仅在有一定数量的结果可用时才触发分析线程? - 我似乎需要一个聚合器线程来检查不同缓冲区的可用性。我正在考虑锁定/条件。我如何实现涉及不同缓冲区的这种条件?
多线程方面完全是新手。欢迎任何建议。
我正在使用带有 Boost 库的 GNU C++。
I have a data feed continuously feeding data packet in. There are 5 threads(A, B, C, D, E) processing the data packages. Note the 5 threads have totally different speed and they generate 5 different features(each thread generate 1 feature) for every incoming data package.
The 5 threads are at different pace: when A has finished analyzing first 10 packages, B might only have finished package 1, package 2, and C might have not even finish a single package at all.
My task is to match the results from 5 threads, and start the final analysis when all the 5 features for the first 10 data package are available.
My question is:
- How to combine the results from different threads making sure the analysis thread is only triggered when a certain amount of result are available?
- I seems that I need a aggregator thread checking the availability of different buffers. I am thinking in terms of lock/condition. How could I implement such a condition involving different buffers?
Totally newbie in multithreading. Any suggestion is welcomed.
I am using GNU C++ with Boost library.
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(7)
拥有一个“聚合器”线程:该线程将从工作线程获取输入(我建议通过非阻塞线程安全队列),一旦“批次”准备就绪,将其推送到“分析器”线程。
队列具有不阻塞任何工作人员的优点:“聚合器”只需轮询工作人员队列(通过条件部分)。您可以根据自己的喜好控制轮询速率。
该解决方案解决了“同步所有”情况的问题。
Have yourself an "aggregator" thread: this thread would get its input from the worker threads (through non-blocking thread-safe queues I suggest) and once a "batch" is ready, push it to your "analyzer" thread.
Queues offer the advantage of not blocking any of the workers: the "aggregator" just has to poll the worker queues (through a condition section). You can control the rate of polling to your liking.
This solution goes around the problem of "synchronize all" situations.
您可能需要检查生产者-消费者问题
You may want to check the Producer-consumer problem
使用信号量和额外的布尔“完成”变量。每次线程完成时,它首先写入其答案,然后写入其“完成”变量,然后调用“检查”函数来检查所有线程的“完成”变量,如果它们全部为真,则触发分析线程。
根据您的性能权衡,您可能只需要最慢的“工作”线程来调用“检查”函数,因此快速线程不会继续锁定其“完成”变量以进行读取。当然,这取决于知道哪个是最慢的。
我不知道你的重置策略:你想每次等待10个新输入还是连续分析最近10个?
Use semaphores, and extra boolean 'done' variables. Every time a thread is done, it FIRST write its answers, THEN its 'done' variable, then calls a 'check' function that checks all treads 'done' variables and if they're all true trigger the analysis thread.
depending on your performance trade-offs, you probably want just the slowest 'work' thread to ever call the 'check' function, so the fast ones won't keep locking its 'done' variable for reading. This, of course, depends on knowing which is the slowest.
I don't know your reset policy: do you want to wait for 10 fresh inputs every time or analyze the 10 most recent continuously?
一些伪代码:
创建新队列的原因是,如果删除项目,聚合处理可能会导致大量锁定和失效 - 如果将新队列放在工作线程上,则无需担心锁定(特别是当聚合器不需要与工作人员共享其结果)。
Some pseudocode:
The reason for creating new queues is that your aggregation processing may result in significant amounts of locking, and invalidation if items are removed - if you put new queues on your worker threads, you need to worry less about locking (especially as the aggregator doesn't need to share it's results BACK with the workers).
屏障是规范的“同步所有”操作。
但是,听起来您希望在关键部分中有一个“计数结果”变量,当完成一定数量时该变量会递增。然后,您想要执行“阻止直到变量等于 x”。这可以通过针对计数结果变量的自旋锁来完成。
Barriers are the canonical "synchronize all" operation.
However, it sounds like you want to have a "count result" variable in a critical section that is incremented when a certain amount is done. Then, you want to do a "block until variable is equal to x". That can be accomplished with a spin-lock against the count result variable.
拥有一个存储结果的容器和这样的函数(伪代码):
由于互斥体仅保护添加到容器的操作,该操作相当快,因此不应导致过度序列化。
Have a container that stores results and a function like this (psuedo code):
Since the mutex is only protecting the add to container operation which is fairly quick it shouldn't cause excessive serialization.
根据您当前的设计,您受到最慢计算的限制,其他线程将无法充分使用。
如果你想处理很多数据包,我会像这样分割工作:
将数据包分发到 N 个相同的线程,该线程按顺序计算它们收到的数据包的 5 个结果。
每个线程将其结果数据包放入线程安全的 fifo 中。
您的主线程读取结果,并在需要时使用数据包编号对它们重新排序
With your current design your are limited by the slowest computation, the other threads won't be used enough.
If you want to process a lot of packets, I would instead split the work like this :
Distribute data packets to N identical thread which compute the 5 result in sequence for the packets they receive.
Each thread puts its result packets in a thread safe fifo.
Your main thread reads the result and if needed reorders them using packet numbers