高性能流数据处理问题

发布于 2024-11-15 16:23:38 字数 320 浏览 1 评论 0原文

我有一个接收流数据的套接字连接。每天的消息数量约为 1.5 亿条。收到消息后,我需要处理它们。由于消息数量相当大。我正在对消息处理代码进行多线程处理。现在我有 8 个线程,有 8 个消息队列。套接字进程会将消息依次放入这些队列中。每个进程只需要处理自己队列中的消息。

我现在的问题是我的队列已经满了。

我应该多排队吗? 所有线程都将使用同步方法写入共享对象。更多的队列会互相影响并使事情变得更糟吗?

我应该有更大的缓冲区吗? 这些似乎是安全的,但我真的想更快地处理消息。

我应该改变我的设计吗? 有什么好的推荐吗?要遵循的指南?

欢迎任何评论。

I have a socket connection that receives streaming data. Everyday the message count is about 150 Mil. After I receive the message, I need to process them. Since the message number is quite large. I am multithreading the message processing code. Right now I have 8 threads, with 8 message queues. The socket process will put the message to these queues sequentially. Each process just need to process messages in their own queue.

My problem right now is that my queue is overflowing.

Should I have more queues?
All threads are going to write to a shared object, using synchronized method. Will more queues be affecting each other and turn things worse?

Should I have bigger buffer?
These seems to be safe to do but I really want to process the messages faster.

Should I change my design?
Is there any good recommendation? Guides to follow?

Any comments are welcome.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

夜无邪 2024-11-22 16:23:38

你为什么使用单独的队列?分配工作的常用方法是拥有 1 个共享队列,所有工作人员都从中读取数据。在java中你可以使用共享的BlockingQueue轻松地做到这一点。这样,作业可以更均匀地分配(如果工作线程繁忙,则不会将作业从队列中拉出)。在您的策略中,缓慢的工作人员队列最终可能会导致积压。为了防止队列溢出,您可以在队列上设置最大大小,然后当您的待办事项变得太大时,生产者将暂停。

你提到你希望整个事情进展得更快。虽然上述建议可能有帮助,也可能没有帮助,但真正解决问题的唯一方法是在分析器下运行系统并查看瓶颈在哪里(很多时候,它并不是您想象的那样) )。否则,您可能会花费大量时间来优化代码,但最终却无济于事。有很多适用于 java(netbeans、jvisualvm、eclipse)和 c++(valgrind)的优秀免费分析器。 yourkit java profiler 是一款很棒的非免费 Java 分析器。

why are you using separate queues? the usual way to distribute work is to have 1 shared queue from which all the workers read. in java you can easily do this using a shared BlockingQueue. this way, the jobs can be distributed more evenly (a worker won't be pulling jobs off of the queue if it is busy). in your strategy, a slow worker's queue can end up building up a backlog. to keep the queue from overflowing, you can put a max size on the queue, and then the producer will pause when your backlog gets too big.

you mention that you want the whole thing to go faster. while the above recommendations may or may not help, the only way to truly solve the problem is to run the system under a profiler and see where the bottleneck is (many times, it's not what you think it is). otherwise, you can spend a lot of time optimizing code that doesn't end up helping. there are plenty of good free profilers for java (netbeans, jvisualvm, eclipse) and c++ (valgrind). a great non-free one for java is yourkit java profiler.

·深蓝 2024-11-22 16:23:38

每个处理线程在处理完一条消息后是否都会写入共享对象?这可能会造成瓶颈。在写入共享对象之前,尝试在每个线程中累积一些临时结果。

Does every processing thread write to the shared object after it has processed just one message? That could create a bottleneck. Try accumulating some temporary results in each thread before writing to the shared object.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文