Streaming system 动态调优

发布于 2023-07-19 08:56:41 字数 11543 浏览 33 评论 0

这里的调优可以泛指任何随着外部环境变化(比如 workload、系统资源、bottleneck 的出现)而改变的操作,比如 change parallelism、optimize execution plan、live migration 等等 [dspa20-12 dspa20-13 dspa20-14]。下面用几个例子来介绍下。

1. Reconfiguration

有时候我们需要 change parallelism。比如观测到 Dataflow Graph 中的某个 operator 出现了 bottleneck,就需要给它加几个节点,提高它的 parallelism(反之也可以降低 parallelism 来节省资源)。基本思路是:1).对当前系统保存 savepoints(user-triggered checkpoint, save application’s state) 2).关闭 application,然后在新的 parallel nodes 上重新启动。

对于 stateful operator,还需要对原来的 state 进行 redistribution 或者 replicate,重新分配到新的节点上。因为 state 本质上是一些 key-value pairs, 一致性哈希 就可以满足基本要求。

2. Automatic scaling control

这个其实是接着上面的话题来谈的。前面提到了如何 scaling,这里关注的是如何自动确定 scaling 的策略。包括:1).detect metrics 2).policy 3).action。[dspa20-13]中列出了几种传统的策略(Queuing theory/Control theory/Heuristic),但它们都不太好用。这里用几篇最近的 paper 举例说明一下最近的进展(只简要说明 idea,详细内容可以参考原文)。

1). [NSDI’18]

为了降低整个 dataflow graph 的运行时间,如果我们能发现 graph 中的 critical path(关键路径),就可以发现影响整个 graph 完成时间的因素,进而优化它们。但是和其他系统不同,streaming job 是无穷尽的,因此还需要 find critical path online。本文关注的就是这个问题。

首先我们可以借助万能的 windowing,do periodic snapshot and cut the stream into fixed size windows. Analyze each window and get performance summary for each window.

但对于 streaming job,我们无法得知完整的 graph,每个 window 中选出来的(一个)critical path 可能并不在全局的 critical path 上。因此我们可以对每个 window 都选出多个 critical path,并且 predict the critical path in the future。这里的思路是 the more critical paths an activity appears on, the more probable this activity is critical(it could be on a potential critical path)。之后作者定义了一个 critical participation metric 来衡量它。

在一些 graph processing system 做的 evaluation 显示,Driver(coordinator)更经常出现在 critical path 上。

2). [OSDI’18]

本文关注的问题是在一个 dataflow graph 上,如何自动发现已成为 bottleneck 的 operator、并且自动 scale。本文希望让自动调优到 target throughput 的过程所需 reconfiguration 步数更少,调优效果更准确。其中 Bottleneck 的定义是该 operator 处理太慢了,影响 downstream 的速率[Figure 2]。

针对如何自动 detect bottleneck 的问题,使用[NSDI’18]中实现的 profiling tool。Use dataflow graph to extract operator dependencies and system instrumentation to collect accurate, representative metrics。

通过上述工具,我们得到了每个 operator 在没有 bottleneck 的理想情况下的 capacity(指标是 throughput)。之后用 linear prediction model 来拟合出 parallelism 和 throughput 之间的关系。为了避免预测值(parallelism)出现上下摇摆震荡,我们预测出 upper bound 和 lower bound,而不是直接预测值[Figure 4]。这样可以保证预测值只向一个方向 scale up/down(不然 reconfiguration 的开销会很大)。每次 reconfiguration 一轮之后重新监测指标,然后 recompute model to minimize error,直到最后得到 target throughput。在 Flink 上的 Evaluation 显示它可以明显减少调优到 target rate 所需的 reconfiguration 次数,收敛更快,而且最终性能也更接近 target[Figure 6]。

个人感觉这篇 paper 非常有 SIGMOD17 OtterTune 的影子…可能因为问题比较相似吧

3. State migration

前面提到过对于 stateful operator,在 reconfiguration 时节点数量可能会有变动,就需要在不同节点之间对 state(key-value pairs)进行 migrate。在一个线上运行的 streaming system 中玩这种操作无异于空中加油。一种比较粗暴的策略是 pause-and-restart,意思是先用一个 buffer 接住来自 upstream 的数据,然后把当前 operator 直接停机并且 snapshot + migrate,完成后再开闸放水。这会导致 migrate 期间很多 operator 被阻塞。

[VLDB’19] 提出了一种 live state migration,意思是把 state migration 也看作一种 dataflow operation,然后让细粒度的 fine-grained state transfer 和 processing 交替运行。

为了实现这个目的,本文引入了 timely dataflow 的概念。timely dataflow 的本意是通过为 operator 引入时间戳(logical timestamp)的概念,使得 streaming system 可以计算带有环的 dataflow graph(很多需要增量更新/迭代计算的任务都需要)。timely dataflow 是由多个 worker 并行处理的,每个 worker 都有整个 dataflow 的一个副本(每个可以用来计算不同 timestamp 的 dataflow),它们之间可以通过 message 进行通信。定义 Frontier_表示之后只能接收 timestamp 大于等于 frontier 的 message_(类似 watermark 的含义),那么 frontier 也就是 timestamp 最早的一个标记(frontier 可以理解为待处理的 event 中最早的一个,或者说这些待处理 event 组成的 graph 中入度为 0 的点。在 [SOSP’13] 的 2.3 节有更详细的介绍)。[ch3.1]

在 state migration 的问题中,本质也是对不同 worker 上的 key-value 做更新操作。我们也可以对这个更新操作(operator)定义 timestamp,表示某一时间之后的 dataflow 才进行更新。通过这样设计,我们就可以把单一的、工作量巨大的 state migration task(需要停机很长一阵完成)拆分成若干个小的 update operator(可以在不同时间完成,这样中间就可以穿插别的工作,避免长时间停机)。[ch3.3]

[Fig3]对需要 live state migration 的 operator 进行了重新设计。L 是原来实现 operator 和储存 state 的部件,另外新加的部件有 F(接收 dataflow 和 migration 的 input,分别输出 data 和 migrate 出来的 state)和 S(接收 F 的输出,然后 apply 到 L 上)。其中 F 用来确定何时执行流过来的 migration 操作(通过将 timestamp 与 frontier 进行对比),如果暂时不能 migrate 就可以先 buffer 起来。这样来保证 only complete state is migrated。[ch3.4]

这篇好难啊…反复看了好多遍才明白了个大致意思…如有错误欢迎指正

4. Flow control and load shedding

其实这和调优关系不大了…不过还是放在这一节里吧。[dspa20-14]

前面提到过 streaming system 可以看作一个生产者-消费者模型。有时候生产者产生事件的速度会非常快,超出了消费者(streaming system)处理速度。全都先 buffer 起来肯定是不现实的。比较常用的应对策略有以下几种:

1). Load shedding

它会选择性的 drop 一些 record。属于牺牲一些准确度来保证 latency,类似于在低网络带宽下降低 video streaming 的画质来保证不卡顿。

Load shedding 可以被做成 drop operator 丢到 dataflow graph 的指定位置中,它需要根据得到的 stat information 来确定如何 drop(比如 drop window/tuple,要 drop 多少等等),本质是一个 optimization 问题。

2). Back-pressure

意思是把 upstream 到来的超出处理能力的 record 先缓存起来(需要一个 persistent queue,例如 kafka),保证 downstream 的 rate 不超过某个阈值。位于 downstream 的 back-pressure 的限流效果会一直 propagate 到 upstream,直到 data stream source。

3). Elasticity

前面提到的动态调优就属于这种,可以根据 workload 变动情况自动分配资源。


为了加深理解,最后来看 facebook 在 ICDE’20 上刚刚发表的一篇 paper:Turbine: Facebook’s service management platform for stream processing。工业界的很多 system paper 都是已经大规模上线运行多年后再整理发表的,含金量很高。

Turbine 是针对 streaming system 设计的集群管理系统。传统的集群管理系统比较适用于 resource requirement can be determined in advance 的场景,但 streaming workload 是多变的,又需要很强的实时性(low latency),传统系统很难满足这个要求。而在 stream processing system 中,any feature that involves manual effort to scale eventually becomes an operational bottleneck。因此 Turbine 实现了以下几个特性:1).Automatically adjusting resource allocation 2).fast task scheduling and failure recovery 3).ACIDF(F 表示 fault tolerance) application update mechanism。下面我们就分别看看如何实现这些特性。

Turbine 包括以下三个组件:job management(store job configurations and manage job updates, decide what to run),task management(convert job into tasks and schedule tasks across clusters, decide where to run),resource management(automatically adjust resource allocation in real time, decide how to run)。具体结构可以看这张图:

在 Turbine 中,一个 job 可以包含多个 task,这些 task 在各自部分的数据上(a disjoint subset of the input data)并行运行相同的 binary(job)。不同 task 之间尽量不存在依赖关系。

1. Job Management

Job 代表被 compile 之后的 streaming application。暂时跳过这一段 hhhh

2. Task management

这部分的目的是把 task 分发到各自的 worker 上运行,需要保证无 duplicate workers running same task、失败之后 migrate 并重启、load balance 等。

Job 被送到一个集中的 task service 组件。它会实时从 Job Store 获取 job,生成 task,并存入 Task Specs(include all configurations necessary to run a task, such as package version, arguments, number of threads)。每个 Turbine Container 都有一个本地的 task manager,它们定期拉取 task specs 的镜像(包含了所有的 task),从中选出本地容器内可以运行的 stream processing task。

Scheduling:为了实现负载均衡,待处理的数据会被拆成多个shard(分片),存放到不同 container 上。Local Task Manager 会负责不同 shard 和 task 之间的匹配。

Load balancing:在运行过程中,系统会实时维护每个 container 的 capacity、每个 shard 的 load(性能开销),并通过 scheduling algorithm 实现二者的最佳匹配。

Failure Handling:和大多数系统一样,Turbine 也是用 heartbeat 来检测 failed node。

3. Resource Management

这也是篇幅最长的部分了。Auto Scaler 的目的是:1).动态调节 container 上的资源,保证 jobs 的运行 2).同时尽量降低这一过程中 migrate 的代价(比如尽量少 restart task)。而 Auto Scaler 的开发也经过了好几个版本的不断改进。

A:第一代 reactive auto scaler:见 Algorithm2,大体意思就是根据运行速度、是否 OOM(out of memory)等来判断 job 所在的 worker 是不是该扩容/缩容了。但这里有一些问题:1).job 的资源占用一开始波动会很剧烈 2).我们无法提前知道每个 job 最低需要的资源量,万一 downscaling 错了风险会很大 3).有时候不知道问题的根源,盲目进行 scaling 也不大好(作者举了一个例子,比如一个 job 总是 connection failure 导致速度慢,此时还给他增加 parallelism 趋势会导致更多的 failure 和 connection traffic)。另外,作者观察到 the amount of resources needed for a given job is often predictable。那还说啥,all in ai 就是了!

B:第二代 proactive(积极的) auto scaler:加入了两个组件:1).Resource Estimator(estimate the usage of a given resource)。它对 stateful / stateless 两种类型的 job 所需的资源(CPU+memory+disk / 主要是 CPU)进行了分析。对于 CPU 资源(也是 stateless 最主要的资源消耗),可以动态估计出 maximum stable processing rate a single thread task can handle,然后作者认为 cpu 消耗是随着线程数增加而线性增长的【comment:真的吗……】。 2).Plan Generator(uses these estimates to construct a resource adjustment plan)。它根据第一步 estimate 出的结果来调节 resource,并保证不会发生前面提到的第一代 auto scaler 中出现的(1),(2),(3)三个问题【comment:但并没有详细说明是如何解决的,特别是 stateful 情况下的存储消耗…也可能是我没看懂 hhh】。

C:第三代 preactive(主动的) auto scaler:其实这个是接着第二代来讲的。前面说了我们要 all in ai,这一节就介绍了 Pattern Analyzer 这个组件(infer patterns based on data seen and to apply this knowledge for pruning out potentially destabilizing scaling decisions)。它会分析两种 data:1).Resource Adjustment Data。这一段主要为了解决(2)downscaling 错这个问题。它会记录下每次调整 resource 的动作供后面的策略参考。 2).Historical Workload Patterns。作者发现 workload 的 pattern 是非常有规律的,比如大家都喜欢白天刷 facebook。利用这些数据我们可以尽可能减少需要 resource adjustment 的次数。作者提出的策略是记录下最近 14 天的 workload(比如 input rate)数据集,每次做出调优策略后,先在数据集上模拟的 verify 一下,目的是 it verifies that this reduction will not cause another round of updates in the next x hours。但也有时候 historical data 也是不太准的,这个就是 future work 了。【comment:个人感觉这里可以进一步 ai 化。目前的策略还是比较 rule based 的】

D:untriaged problems:这里说的是像(3)那种 allocating more resources 会导致情况恶化的问题。作者给出的方法非常简单明了:fires operator alerts that require manual investigation :)

E:vertical VS horizontal:这里说了两种 scaling 的策略:vertical(applies resource allocation changes within the task level without changing the number of tasks)和 horizontal(changing the number of tasks to increase or decrease job parallelism,和前面 OSDI’18 那篇 paper 一样)。

F:capacity management:这一步目的是 makes sure each resource type has sufficient allocation cluster-wide。

前面提到的所有组件加起来大概就形成了这么一套系统:

4. Evaluation

先略过了。

5. Conclusion and future work

Going forward, we plan to investigate machine learning techniques for automatic root cause analysis and mitigation of incidents that previously required human intervention.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据

关于作者

世俗缘

暂无简介

0 文章
0 评论
23 人气
更多

推荐作者

xu362930323

文章 0 评论 0

缱倦旧时光

文章 0 评论 0

qq_eXruk9

文章 0 评论 0

遂心如意

文章 0 评论 0

guojiayue1

文章 0 评论 0

愿与i

文章 0 评论 0

    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
    原文