FLINK-如何解决过滤器功能中的背压?

发布于 2025-01-22 05:55:44 字数 3888 浏览 3 评论 0 原文

使用Flink v1.13.2使用RockSDB,我正在用Grafana中的以下命令来测量任务按: sum by(task_name)(flink_task_taskmanager_job_job_job_task_back_backpress_backpressedtimemspersecondimemspersecond {job =“ ...

” ,processFilter_b,c,d)来自背压。但是以下过程函数运行良好(没有背压的过程和mongoDB sink-)。而且我不知道如何解决过滤器功能中的背压

可能的原因是什么?

,我也想知道,因为我在聚集功能中使用了rocksdb kafkaobject rocksdb?

这是我的过滤器功能(遭受了背压):

private void myStream(DataStream<KafkaObject> kafkaSource)
    {
       kafkaSource
                .filter(log -> log.contains("x") || log.contains("y") )
                .name("ProcessFilter_A")
                .keyBy(KafkaObject::getInstanceA)
                .window(TumblingProcessingTimeWindows.of(Time.hours(2), Time.minutes(30)))
                .aggregate(new MyAggregateCounter(), new MyProcessFunction("setup A"))
                .name("ProcessA")
                .addSink(new MongoDbSink())
                ;

       kafkaSource
                .filter(log -> log.contains("x") || log.contains("y") )            
                .name("ProcessFilter_B")
                .keyBy(KafkaObject::getInstanceA)
                .window(TumblingProcessingTimeWindows.of(Time.hours(1), Time.minutes(20)))
                .aggregate(new MyAggregateCounter(), new MyProcessFunction("setup B"))
                .name("ProcessB")
                .addSink(new MongoDbSink())
                ;

        kafkaSource
                .filter(log -> log.contains("x") || log.contains("y") )           
                .name("ProcessFilter_C")
                .keyBy(KafkaObject::getInstanceA)
                .window(TumblingProcessingTimeWindows.of(Time.minutes(30), Time.minutes(10)))
                .aggregate(new MyAggregateCounter(), new MyProcessFunction("setup C"))
                .name("ProcessC")
                .addSink(new MongoDbSink())
                ;

        kafkaSource
                .filter(log -> log.contains("x") || log.contains("y") )              
                .name("ProcessFilter_D")
                .keyBy(KafkaObject::getInstanceA)
                .window(TumblingProcessingTimeWindows.of(Time.minutes(15)))
                .aggregate(new MyAggregateCounter(), new MyProcessFunction("setup D"))
                .name("ProcessD")
                .addSink(new MongoDbSink())
                ;
    }

这是汇总函数:

public class MyAggregateCounter implements AggregateFunction<KafkaObject, Tuple3<Long, Long, Long>, Tuple3<Long,Long,Long>>
{
    @Override
    public Tuple3<Long, Long, Long> createAccumulator()
    {
        return Tuple3.of(0L, 0L, 0L);
    }

    @Override
    public Tuple3<Long, Long, Long> add(KafkaObject value, Tuple3<Long, Long, Long> accumulator)
    {
        if (...)
        {
            accumulator.f0 += 1;
        }
        else if (...)
        {
            accumulator.f1 += 1;
        }
        else if (...)
        {
            accumulator.f2 += 1;
        }
        return accumulator;
    }

    @Override
    public Tuple3<Long, Long, Long> getResult(Tuple3<Long, Long, Long> accumulator)
    {
        return accumulator;
    }

    @Override
    public Tuple3<Long, Long, Long> merge(Tuple3<Long, Long, Long> a, Tuple3<Long, Long, Long> b)
    {
        return Tuple3.of(a.f0 + b.f0, a.f1 + b.f1, a.f2 + b.f2);
    }
}

更新: 大卫是对的,问题是mongodb 。即使我也无法将过滤器的背压与mongoDB联系起来(因为没有背压mongodb sink本身),在使用nosink实现删除了mongosink之后,一切都是okey。

在接收器函数的开放方法中,我正在创建n com.mongodb.mongoclient 实例,其中n是并行性。现在,我将n个实例更改为每个TaskManager (使用单词模式)的一个实例,

现在似乎也似乎也需要弗林克(Flink)也需要预定的mongosink(例如kafka)。而且Flink还需要其他一些指标来发现真正的原因。(因为我永远不会发现问题是MongoDB)

With flink v1.13.2 using rocksdb, I am measuring taskpressure with the following command in the grafana: sum by (task_name)(flink_taskmanager_job_task_backPressuredTimeMsPerSecond{job="..."})

Some filter functions suffers(ProcessFilter_A, ProcessFilter_B, C, D) from the backpressure. But the following process functions work well (there are no backpressure -both process and mongodb sink-). And I have no idea how to resolve backpressure in the filter functions

What could be the possible reasons?

And also i wonder that because i am using rocksdb KafkaObject in the aggregate function will be stored in the rocksdb?

Here is the my filter functions (suffered from backpressure):

private void myStream(DataStream<KafkaObject> kafkaSource)
    {
       kafkaSource
                .filter(log -> log.contains("x") || log.contains("y") )
                .name("ProcessFilter_A")
                .keyBy(KafkaObject::getInstanceA)
                .window(TumblingProcessingTimeWindows.of(Time.hours(2), Time.minutes(30)))
                .aggregate(new MyAggregateCounter(), new MyProcessFunction("setup A"))
                .name("ProcessA")
                .addSink(new MongoDbSink())
                ;

       kafkaSource
                .filter(log -> log.contains("x") || log.contains("y") )            
                .name("ProcessFilter_B")
                .keyBy(KafkaObject::getInstanceA)
                .window(TumblingProcessingTimeWindows.of(Time.hours(1), Time.minutes(20)))
                .aggregate(new MyAggregateCounter(), new MyProcessFunction("setup B"))
                .name("ProcessB")
                .addSink(new MongoDbSink())
                ;

        kafkaSource
                .filter(log -> log.contains("x") || log.contains("y") )           
                .name("ProcessFilter_C")
                .keyBy(KafkaObject::getInstanceA)
                .window(TumblingProcessingTimeWindows.of(Time.minutes(30), Time.minutes(10)))
                .aggregate(new MyAggregateCounter(), new MyProcessFunction("setup C"))
                .name("ProcessC")
                .addSink(new MongoDbSink())
                ;

        kafkaSource
                .filter(log -> log.contains("x") || log.contains("y") )              
                .name("ProcessFilter_D")
                .keyBy(KafkaObject::getInstanceA)
                .window(TumblingProcessingTimeWindows.of(Time.minutes(15)))
                .aggregate(new MyAggregateCounter(), new MyProcessFunction("setup D"))
                .name("ProcessD")
                .addSink(new MongoDbSink())
                ;
    }

here is the aggregate function:

public class MyAggregateCounter implements AggregateFunction<KafkaObject, Tuple3<Long, Long, Long>, Tuple3<Long,Long,Long>>
{
    @Override
    public Tuple3<Long, Long, Long> createAccumulator()
    {
        return Tuple3.of(0L, 0L, 0L);
    }

    @Override
    public Tuple3<Long, Long, Long> add(KafkaObject value, Tuple3<Long, Long, Long> accumulator)
    {
        if (...)
        {
            accumulator.f0 += 1;
        }
        else if (...)
        {
            accumulator.f1 += 1;
        }
        else if (...)
        {
            accumulator.f2 += 1;
        }
        return accumulator;
    }

    @Override
    public Tuple3<Long, Long, Long> getResult(Tuple3<Long, Long, Long> accumulator)
    {
        return accumulator;
    }

    @Override
    public Tuple3<Long, Long, Long> merge(Tuple3<Long, Long, Long> a, Tuple3<Long, Long, Long> b)
    {
        return Tuple3.of(a.f0 + b.f0, a.f1 + b.f1, a.f2 + b.f2);
    }
}

UPDATE:
David was right, problem was the mongodb. Even I can not relate the filter backpressure with mongodb(because there is no backpressure mongodb sink itself), after removing the mongoSink with NoSink implementation everything was okey.

In open method of sink function, i am creating N com.mongodb.MongoClient instances where N is the parallelism. Now i changed N instance to one instance per taskmanager (using singletion pattern)

Now even it seems okey, i believe Flink needs pre-defined mongoSink(like Kafka) as well. And also Flink needs some other indicators to find-out the real reason.(Because I would never find that problem was the MongoDB)

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

葬心 2025-01-29 05:55:44

MongoDB接收器是您在滤波器功能中观察到的背压的最可能原因。您可以通过用丢弃的水槽替换水槽来验证这一点,并检查是否消除了背压。

请参阅 https://stackoverflow.com/a/54386180/2000823

至于减少背压,您可以使用flink的识别瓶颈。

如果这样做,您可能会发现MongoDB和/或Flink需要扩大规模。您可能还会发现,您的工作大部分时间都在执行序列化/挑选化 - 优化序列化是增加吞吐量的最佳方法之一。

The MongoDB sink is the most likely cause of the backpressure you are observing in the filter functions. You can verify this by replacing the sink with a discarding sink, and checking to see if that eliminates the backpressure.

See https://stackoverflow.com/a/54386180/2000823 for an example.

As for reducing the backpressure, you could use Flink's built-in backpressure monitoring and flame graphs to identify the bottleneck(s).

If you do that you may find that MongoDB and/or Flink needs to be scaled up. You may also find that your job is spending most of its time performing serialization/deserialization -- optimizing serialization is one of the best ways to increase throughput.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文