在Flink中设置操作员适当并行的直觉
我的问题是要了解在固定集群设置中flink工作中运营商的并行性的好选择。假设,我们有一个包含MAP
和的FLINK作业DAG
在它们之间使用管道的边缘键入运算符(没有阻止边缘)。一个示例DAG如下:
Scan -> Keyword Search -> Aggregation
假设一个固定大小的群集,M
机器,每个机器,c
core core c core,dag是唯一在集群上运行的工作流程。 Flink允许用户为单个操作员设置并行性。我通常为每个操作员设置m*c
并行。但是,从性能角度来看,这是最佳选择(例如执行时间)吗?我们可以利用操作员的属性做出更好的选择吗?例如,如果我们知道聚合
更昂贵,如果我们仅将m*c
并行分配给contregation
操作员,并减少并行性其他运营商?希望这也可以减少背压的机会。
我不是在寻找适合我的“最佳”并行性的合适公式。我只是在寻找可以用来做出决定的直觉/指南/想法。令人惊讶的是,我找不到很多关于此主题的文献。
注意:我知道最近弗林克的动态缩放反应模式。但是我的问题是关于只有一个工作流程的固定群集,这意味着动态缩放不相关。我看了看这个问题,但没有得到答案。
My question is about knowing a good choice for parallelism for operators in a flink job in a fixed cluster setting. Suppose, we have a flink job DAG containing map
and reduce
type operators with pipelined edges between them (no blocking edge). An example DAG is as follows:
Scan -> Keyword Search -> Aggregation
Assume a fixed size cluster of M
machines with C
cores each and the DAG is the only workflow to be run on the cluster. Flink allows the user to set the parallelism for individual operators. I usually set M*C
parallelism for each operator. But is this the best choice from performance perspective (e.g. execution time)? Can we leverage the properties of the operators to make a better choice? For example, if we know that aggregation
is more expensive, should we assign M*C
parallelism to only the aggregation
operator and reduce the parallelism for other operators? This hopefully will reduce the chances of backpressure too.
I am not looking for a proper formula that will give me the "best" parallelism. I am just looking for some kind of an intuition/guideline/ideas that can be used to make a decision. Surprisingly, I could not find much literature to read on this topic.
Note: I am aware of the dynamic scaling reactive mode in recent Flink. But my question is about a fixed cluster with only one workflow running, which means that the dynamic scaling is not relevant. I looked at this question, but did not get an answer.
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
我对此有所不同。从我的角度来看,有两个关键问题要考虑:
(1)我想保持老虎机制服吗?换句话说,每个插槽都会有每个任务的实例,还是我想调整特定任务的并行性?
(2)每个插槽多少核?
我对(1)默认的回答是“保持统一”。我还没有看到很多情况,因为事实证明,调整单个操作员(或任务)的并行性是值得的。
如果这意味着打破操作员链,则通常会适得其反。无论如何,在不寻常的情况下,这样做是有意义的,但是总的来说,我看不到这一点。由于某些插槽将具有每个操作员的实例,而且插槽都是统一的,为什么要有一些插槽将一些任务分配给他们会有所帮助? (在这里,我假设您对建立插槽共享组的麻烦不感兴趣,当然可以做到。)走这条路可以使事情从操作的角度而变得更加复杂,而且几乎没有收益。我认为,更好的是优化其他地方(例如,序列化)。
至于每个插槽的内核,每个插槽有2个核心,许多工作受益,对于一些复杂的工作,您需要更高的任务。因此,我认为,对于简单的ETL作业的
m*c
的总体并行性,以及m*c/2 (或较低)的作业,做一些更激烈的事情。
为了说明极端:
简单的ETL作业可能是
所有连接转发连接的位置。由于只有一个任务,并且由于Flink仅使用一个每个任务的线程,因此在这种情况下,我们只使用每个插槽的一个线程。因此,分配每个插槽一个核心的任何内容都是完全浪费。而且任务可能是I/O绑定的。
在另一个极端情况下,我看到了涉及约30个加入的工作,一个或多个ML模型的评估,以及窗户的聚合等。您当然需要一个以上的CPU核心来处理像这样的工作的每个平行片段(并且)就此而言,超过两个)。
通常,大多数CPU的努力都用于序列化和挑选化,尤其是在RockSDB中。我会尝试在每个事件中弄清楚涉及多少RockSDB状态访问,钥匙比和重新平衡 - 并提供足够的核心,所有Ser/de都可以同时发生(如果您关心最大化的吞吐量)。对于最简单的工作,一个核心可以跟上。到您到达窗户加入之类的东西时,您可能已经在推动一个核心可以跟上什么的限制 - 取决于源和水槽的速度,以及您不要浪费资源的小心。
示例:想象一下,您正在选择一个50的并行性,每个插槽2个核心,或一个平行性为100,每个插槽1个核心。在这两种情况下,都有相同的资源 - 哪个会表现更好?
我期望更少的插槽更少,每个插槽更多的核心会表现得更好,通常,只要每个插槽的任务/线程足够多以使两个内核都忙碌(如果整个管道适合一个任务,那么这可能不正确,尽管Deserializers可以也可以在自己的线程中运行)。随着较少的插槽,您每个插槽会拥有更多的键和密钥组,这将有助于避免数据倾斜,并且随着任务较少,检查点(如果启用了),则表现更好。过程间通信也更有可能采用优化的(内存)路径。
I think about this a little differently. From my perspective, there are two key questions to consider:
(1) Do I want to keep the slots uniform? Or in other words, will each slot have an instance of every task, or do I want to adjust the parallelism of specific tasks?
(2) How many cores per slot?
My answer to (1) defaults to "keep things uniform". I haven't seen very many situations where tuning the parallelism of individual operators (or tasks) has proven to be worthwhile.
Changing the parallelism is usually counterproductive if it means breaking an operator chain. Doing it where's a shuffle anyway can make sense in unusual circumstances, but in general I don't see the point. Since some of the slots will have instances of every operator, and the slots are all uniform, why is it going to be helpful to have some slots with fewer tasks assigned to them? (Here I'm assuming you aren't interested in going to the trouble of setting up slot sharing groups, which of course one could do.) Going down this path can make things more complex from an operational perspective, and for little gain. Better, in my opinion, to optimize elsewhere (e.g., serialization).
As for cores per slot, many jobs benefit from having 2 cores per slot, and for some complex jobs with lots of tasks you'll want to go even higher. So I think in terms of an overall parallelism of
M*C
for simple ETL jobs, andM*C/2
(or lower) for jobs doing something more intense.To illustrate the extremes:
A simple ETL job might be something like
where all of the connections are forwarding connections. Since there is only one task, and because Flink only uses one thread per task, in this case we are only using one thread per slot. So allocating anything more than one core per slot is a complete waste. And the task is probably i/o bound anyway.
At the other extreme, I've seen jobs that involve ~30 joins, the evaluation of one or more ML models, plus windowed aggregations, etc. You certainly want more than one CPU core handling each parallel slice of a job like that (and more than two, for that matter).
Typically most of the CPU effort goes into serialization and deserialization, especially with RocksDB. I would try to figure out, for every event, how many RocksDB state accesses, keyBy's, and rebalances are involved -- and provide enough cores that all of that ser/de can happen concurrently (if you care about maximizing throughput). For the simplest of jobs, one core can keep up. By the time to you get to something like a windowed join you may already be pushing the limits of what one core can keep up with -- depending on how fast your sources and sinks can go, and how careful you are not to waste resources.
Example: imagine you are choosing between a parallelism of 50 with 2 cores per slot, or a parallelism of 100 with 1 core per slot. In both cases the same resources are available -- which will perform better?
I would expect fewer slots with more cores per slot to perform somewhat better, in general, provided there are enough tasks/threads per slot to keep both cores busy (if the whole pipeline fits into one task this might not be true, though deserializers can also run in their own thread). With fewer slots you'll have more keys and key groups per slot, which will help to avoid data skew, and with fewer tasks, checkpointing (if enabled) will be a bit better behaved. Inter-process communication is also a little more likely to be able to take an optimized (in-memory) path.