使用估值导致事件处理滞后的巨大检查点大小

发布于 2025-01-29 20:35:02 字数 2569 浏览 2 评论 0原文

我在Flink中有一个应用程序,该应用程序会重复数据删除多个流。 它可以在一个字符串字段上进行键,并使用值使用值进行删除。

在RichFilterFunction中使用价值状态。

public class DedupeWithState extends RichFilterFunction<Tuple2<String, Message>> {
    private ValueState<Boolean> seen;
    private final ValueStateDescriptor<Boolean> desc;

    public DedupeWithState(long cacheExpirationTimeMs) {
        StateTtlConfig ttlConfig = StateTtlConfig
                .newBuilder(Time.milliseconds(cacheExpirationTimeMs))
                .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
                .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
                .build();
        desc = new ValueStateDescriptor<>("seen", Types.BOOLEAN);
        desc.enableTimeToLive(ttlConfig);
    }

    @Override
    public void open(Configuration conf) {
        seen = getRuntimeContext().getState(desc);
    }

    @Override
    public boolean filter(Tuple2<String, Message> stringMessageTuple2) throws Exception {
        if (seen.value() == null) {
            seen.update(true);
            return true;
        }
        return false;
    }
}

该应用程序消耗了来自KAFKA的3个流,每个流都有其自己的DEDUPE功能,该功能具有4小时的TTL。

DataStream<Tuple2<String, Message>> event1 = event1Input
                .keyBy(x->x.f0)
                .filter(new DedupeWithState(14400000));
                
DataStream<Tuple2<String, Message>> event2 = event2Input
                .keyBy(x->x.f0)
                .filter(new DedupeWithState(14400000));

DataStream<Tuple2<String, Message>> event3 = event3Input
                .keyBy(x->x.f0)
                .filter(new DedupeWithState(14400000));

附有屏幕截图。

后端属性为:

state.backend: rocksdb
state.backend.incremental: true
state.checkpoints.dir: <azure blob store>

checkpoint配置如webui

  • 我们正在使用Flink 1.13.6。
  • 每个流的QPS是Event1-7K,Event2-6K,Event3-200
  • 关键大小为〜110字节
  • 检查点间隔为5分钟,并启用了增量检查点。 根据上述配置(给定启用增量检查点),每个流应具有以下检查点大小: Event1-&GT; ((7000 * 60 * 5) * 110 bytes)= 〜220MB

问题是检查点大小非常巨大。它从400 MB开始(如预期的那样),但每个检查点最高为2-3GB checkpoint histor 。这导致了系统中的DEDUPE功能和整体滞后的巨大背压。 每个操作员的检查点

I have an application in flink, which does deduplication of multiple streams.
It does key-by on one string field and dedupes it by using value-state.

Using value state in RichFilterFunction.

public class DedupeWithState extends RichFilterFunction<Tuple2<String, Message>> {
    private ValueState<Boolean> seen;
    private final ValueStateDescriptor<Boolean> desc;

    public DedupeWithState(long cacheExpirationTimeMs) {
        StateTtlConfig ttlConfig = StateTtlConfig
                .newBuilder(Time.milliseconds(cacheExpirationTimeMs))
                .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
                .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
                .build();
        desc = new ValueStateDescriptor<>("seen", Types.BOOLEAN);
        desc.enableTimeToLive(ttlConfig);
    }

    @Override
    public void open(Configuration conf) {
        seen = getRuntimeContext().getState(desc);
    }

    @Override
    public boolean filter(Tuple2<String, Message> stringMessageTuple2) throws Exception {
        if (seen.value() == null) {
            seen.update(true);
            return true;
        }
        return false;
    }
}

The application consumes 3 streams from kafka, and each stream has its own dedupe function with ttl of 4hours.

DataStream<Tuple2<String, Message>> event1 = event1Input
                .keyBy(x->x.f0)
                .filter(new DedupeWithState(14400000));
                
DataStream<Tuple2<String, Message>> event2 = event2Input
                .keyBy(x->x.f0)
                .filter(new DedupeWithState(14400000));

DataStream<Tuple2<String, Message>> event3 = event3Input
                .keyBy(x->x.f0)
                .filter(new DedupeWithState(14400000));

Screenshots attached.

Backend properties are:

state.backend: rocksdb
state.backend.incremental: true
state.checkpoints.dir: <azure blob store>

Checkpoint configuration as on WebUI

  • We are using Flink 1.13.6.
  • The QPS of each stream is event1 - 7k, event2 - 6k, event3 - 200
  • Key size is ~110 bytes
  • Checkpoint interval is 5 mins and incremental checkpoint is enabled.
    As per above configs (given that incremental checkpoint is enabled) each stream should have following checkpoint size:
    event1 -> ((7000 * 60 * 5) * 110bytes) = ~220MB

Issue is the checkpoint size is very huge. It starts from 400 MB (as expected) but is going upto 2-3GB per checkpoint Checkpoint history. This results in huge backpressure in Dedupe function and overall lag in the system. Checkpoint per operator

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

戴着白色围巾的女孩 2025-02-05 20:35:02

也许该州没有被清洁,因为它懒惰地完成(阅读)。从初始版本

当读取操作中访问状态对象时,Flink将检查其时间戳并清除状态是否过期(取决于已配置的状态可见性,返回过期状态)。由于这种懒惰的去除,除非收集垃圾,否则再也不会访问的过期状态将永远占据存储空间。

我会尝试使用mapState每个流(不键入),而不是valueState每个键,因此您现在的状态可以不断访问。或者,您也可以在dedupewithstate中设置一个计时器,该计时器访问状态并强制清理(您可能需要使用ProcessFunction才能设置计时器)或简单地清除它。

Maybe the state is not being cleaned since it is done lazily (on read). From the initial release post (a bit old but may still stand):

When a state object is accessed in a read operation, Flink will check its timestamp and clear the state if it is expired (depending on the configured state visibility, the expired state is returned or not). Due to this lazy removal, expired state that is never accessed again will forever occupy storage space unless it is garbage collected.

I would try with a MapState per stream (without keying by) instead of a ValueState per key as you have now so the same state is continuously accessed. Or you may also be able to set up a timer in DedupeWithState that accesses the state and forces the cleanup (you may need to use a ProcessFunction to be able to set up timers) or that simply clears it.

亢潮 2025-02-05 20:35:02

尝试这样的事情 -

/**
 * @author sucheth.shivakumar
 */
public class Check extends KeyedProcessFunction {
    private ValueState<Boolean> seen;

    @Override
    public void open(Configuration parameters) throws Exception {
        ValueStateDescriptor<Boolean> desc = new ValueStateDescriptor<>("seen", Types.BOOLEAN);
        // defines the time the state has to be stored in the state backend before it is auto cleared
        seen = getRuntimeContext().getState(desc);
    }
    

    @Override
    public void processElement(Object value, Context ctx, Collector out) throws Exception {
        if (seen.value() == null) {
            seen.update(true);
            // emits the record
            out.collect(stringMessageTuple2);
            ctx.timerService().registerProcessingTimeTimer(ctx.timestamp() + 14400000);
        }
    }
    
    
    @Override
    // this fires after 4 hrs is passed and clears the state
    public void onTimer(long timestamp, OnTimerContext ctx, Collector out)
            throws Exception {
        // triggers after ttl has passed
        if (seen.value()) {
            seen.clear();
        }
    }

Try something like this -

/**
 * @author sucheth.shivakumar
 */
public class Check extends KeyedProcessFunction {
    private ValueState<Boolean> seen;

    @Override
    public void open(Configuration parameters) throws Exception {
        ValueStateDescriptor<Boolean> desc = new ValueStateDescriptor<>("seen", Types.BOOLEAN);
        // defines the time the state has to be stored in the state backend before it is auto cleared
        seen = getRuntimeContext().getState(desc);
    }
    

    @Override
    public void processElement(Object value, Context ctx, Collector out) throws Exception {
        if (seen.value() == null) {
            seen.update(true);
            // emits the record
            out.collect(stringMessageTuple2);
            ctx.timerService().registerProcessingTimeTimer(ctx.timestamp() + 14400000);
        }
    }
    
    
    @Override
    // this fires after 4 hrs is passed and clears the state
    public void onTimer(long timestamp, OnTimerContext ctx, Collector out)
            throws Exception {
        // triggers after ttl has passed
        if (seen.value()) {
            seen.clear();
        }
    }
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文