使用估值导致事件处理滞后的巨大检查点大小
我在Flink中有一个应用程序,该应用程序会重复数据删除多个流。 它可以在一个字符串字段上进行键,并使用值使用值进行删除。
在RichFilterFunction中使用价值状态。
public class DedupeWithState extends RichFilterFunction<Tuple2<String, Message>> {
private ValueState<Boolean> seen;
private final ValueStateDescriptor<Boolean> desc;
public DedupeWithState(long cacheExpirationTimeMs) {
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.milliseconds(cacheExpirationTimeMs))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.build();
desc = new ValueStateDescriptor<>("seen", Types.BOOLEAN);
desc.enableTimeToLive(ttlConfig);
}
@Override
public void open(Configuration conf) {
seen = getRuntimeContext().getState(desc);
}
@Override
public boolean filter(Tuple2<String, Message> stringMessageTuple2) throws Exception {
if (seen.value() == null) {
seen.update(true);
return true;
}
return false;
}
}
该应用程序消耗了来自KAFKA的3个流,每个流都有其自己的DEDUPE功能,该功能具有4小时的TTL。
DataStream<Tuple2<String, Message>> event1 = event1Input
.keyBy(x->x.f0)
.filter(new DedupeWithState(14400000));
DataStream<Tuple2<String, Message>> event2 = event2Input
.keyBy(x->x.f0)
.filter(new DedupeWithState(14400000));
DataStream<Tuple2<String, Message>> event3 = event3Input
.keyBy(x->x.f0)
.filter(new DedupeWithState(14400000));
附有屏幕截图。
后端属性为:
state.backend: rocksdb
state.backend.incremental: true
state.checkpoints.dir: <azure blob store>
- 我们正在使用Flink 1.13.6。
- 每个流的QPS是Event1-7K,Event2-6K,Event3-200
- 关键大小为〜110字节
- 检查点间隔为5分钟,并启用了增量检查点。 根据上述配置(给定启用增量检查点),每个流应具有以下检查点大小: Event1-&GT; ((7000 * 60 * 5) * 110 bytes)= 〜220MB
问题是检查点大小非常巨大。它从400 MB开始(如预期的那样),但每个检查点最高为2-3GB checkpoint histor 。这导致了系统中的DEDUPE功能和整体滞后的巨大背压。 每个操作员的检查点
I have an application in flink, which does deduplication of multiple streams.
It does key-by on one string field and dedupes it by using value-state.
Using value state in RichFilterFunction.
public class DedupeWithState extends RichFilterFunction<Tuple2<String, Message>> {
private ValueState<Boolean> seen;
private final ValueStateDescriptor<Boolean> desc;
public DedupeWithState(long cacheExpirationTimeMs) {
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.milliseconds(cacheExpirationTimeMs))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.build();
desc = new ValueStateDescriptor<>("seen", Types.BOOLEAN);
desc.enableTimeToLive(ttlConfig);
}
@Override
public void open(Configuration conf) {
seen = getRuntimeContext().getState(desc);
}
@Override
public boolean filter(Tuple2<String, Message> stringMessageTuple2) throws Exception {
if (seen.value() == null) {
seen.update(true);
return true;
}
return false;
}
}
The application consumes 3 streams from kafka, and each stream has its own dedupe function with ttl of 4hours.
DataStream<Tuple2<String, Message>> event1 = event1Input
.keyBy(x->x.f0)
.filter(new DedupeWithState(14400000));
DataStream<Tuple2<String, Message>> event2 = event2Input
.keyBy(x->x.f0)
.filter(new DedupeWithState(14400000));
DataStream<Tuple2<String, Message>> event3 = event3Input
.keyBy(x->x.f0)
.filter(new DedupeWithState(14400000));
Screenshots attached.
Backend properties are:
state.backend: rocksdb
state.backend.incremental: true
state.checkpoints.dir: <azure blob store>
Checkpoint configuration as on WebUI
- We are using Flink 1.13.6.
- The QPS of each stream is event1 - 7k, event2 - 6k, event3 - 200
- Key size is ~110 bytes
- Checkpoint interval is 5 mins and incremental checkpoint is enabled.
As per above configs (given that incremental checkpoint is enabled) each stream should have following checkpoint size:
event1 -> ((7000 * 60 * 5) * 110bytes) = ~220MB
Issue is the checkpoint size is very huge. It starts from 400 MB (as expected) but is going upto 2-3GB per checkpoint Checkpoint history. This results in huge backpressure in Dedupe function and overall lag in the system. Checkpoint per operator
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(2)
也许该州没有被清洁,因为它懒惰地完成(阅读)。从初始版本
我会尝试使用
mapState
每个流(不键入),而不是valueState
每个键,因此您现在的状态可以不断访问。或者,您也可以在dedupewithstate
中设置一个计时器,该计时器访问状态并强制清理(您可能需要使用ProcessFunction
才能设置计时器)或简单地清除它。Maybe the state is not being cleaned since it is done lazily (on read). From the initial release post (a bit old but may still stand):
I would try with a
MapState
per stream (without keying by) instead of aValueState
per key as you have now so the same state is continuously accessed. Or you may also be able to set up a timer inDedupeWithState
that accesses the state and forces the cleanup (you may need to use aProcessFunction
to be able to set up timers) or that simply clears it.尝试这样的事情 -
Try something like this -