FLINK-如何解决过滤器功能中的背压?
使用Flink v1.13.2使用RockSDB,我正在用Grafana中的以下命令来测量任务按: sum by(task_name)(flink_task_taskmanager_job_job_job_task_back_backpress_backpressedtimemspersecondimemspersecond {job =“ ...
” ,processFilter_b,c,d)来自背压。但是以下过程函数运行良好(没有背压的过程和mongoDB sink-)。而且我不知道如何解决过滤器功能中的背压
可能的原因是什么?
,我也想知道,因为我在聚集功能中使用了rocksdb kafkaobject rocksdb?
这是我的过滤器功能(遭受了背压):
private void myStream(DataStream<KafkaObject> kafkaSource)
{
kafkaSource
.filter(log -> log.contains("x") || log.contains("y") )
.name("ProcessFilter_A")
.keyBy(KafkaObject::getInstanceA)
.window(TumblingProcessingTimeWindows.of(Time.hours(2), Time.minutes(30)))
.aggregate(new MyAggregateCounter(), new MyProcessFunction("setup A"))
.name("ProcessA")
.addSink(new MongoDbSink())
;
kafkaSource
.filter(log -> log.contains("x") || log.contains("y") )
.name("ProcessFilter_B")
.keyBy(KafkaObject::getInstanceA)
.window(TumblingProcessingTimeWindows.of(Time.hours(1), Time.minutes(20)))
.aggregate(new MyAggregateCounter(), new MyProcessFunction("setup B"))
.name("ProcessB")
.addSink(new MongoDbSink())
;
kafkaSource
.filter(log -> log.contains("x") || log.contains("y") )
.name("ProcessFilter_C")
.keyBy(KafkaObject::getInstanceA)
.window(TumblingProcessingTimeWindows.of(Time.minutes(30), Time.minutes(10)))
.aggregate(new MyAggregateCounter(), new MyProcessFunction("setup C"))
.name("ProcessC")
.addSink(new MongoDbSink())
;
kafkaSource
.filter(log -> log.contains("x") || log.contains("y") )
.name("ProcessFilter_D")
.keyBy(KafkaObject::getInstanceA)
.window(TumblingProcessingTimeWindows.of(Time.minutes(15)))
.aggregate(new MyAggregateCounter(), new MyProcessFunction("setup D"))
.name("ProcessD")
.addSink(new MongoDbSink())
;
}
这是汇总函数:
public class MyAggregateCounter implements AggregateFunction<KafkaObject, Tuple3<Long, Long, Long>, Tuple3<Long,Long,Long>>
{
@Override
public Tuple3<Long, Long, Long> createAccumulator()
{
return Tuple3.of(0L, 0L, 0L);
}
@Override
public Tuple3<Long, Long, Long> add(KafkaObject value, Tuple3<Long, Long, Long> accumulator)
{
if (...)
{
accumulator.f0 += 1;
}
else if (...)
{
accumulator.f1 += 1;
}
else if (...)
{
accumulator.f2 += 1;
}
return accumulator;
}
@Override
public Tuple3<Long, Long, Long> getResult(Tuple3<Long, Long, Long> accumulator)
{
return accumulator;
}
@Override
public Tuple3<Long, Long, Long> merge(Tuple3<Long, Long, Long> a, Tuple3<Long, Long, Long> b)
{
return Tuple3.of(a.f0 + b.f0, a.f1 + b.f1, a.f2 + b.f2);
}
}
更新: 大卫是对的,问题是mongodb 。即使我也无法将过滤器的背压与mongoDB联系起来(因为没有背压mongodb sink本身),在使用nosink实现删除了mongosink之后,一切都是okey。
在接收器函数的开放方法中,我正在创建n com.mongodb.mongoclient
实例,其中n是并行性。现在,我将n个实例更改为每个TaskManager (使用单词模式)的一个实例,
现在似乎也似乎也需要弗林克(Flink)也需要预定的mongosink(例如kafka)。而且Flink还需要其他一些指标来发现真正的原因。(因为我永远不会发现问题是MongoDB)
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
MongoDB接收器是您在滤波器功能中观察到的背压的最可能原因。您可以通过用丢弃的水槽替换水槽来验证这一点,并检查是否消除了背压。
请参阅 https://stackoverflow.com/a/54386180/2000823
至于减少背压,您可以使用flink的和识别瓶颈。
如果这样做,您可能会发现MongoDB和/或Flink需要扩大规模。您可能还会发现,您的工作大部分时间都在执行序列化/挑选化 - 优化序列化是增加吞吐量的最佳方法之一。
The MongoDB sink is the most likely cause of the backpressure you are observing in the filter functions. You can verify this by replacing the sink with a discarding sink, and checking to see if that eliminates the backpressure.
See https://stackoverflow.com/a/54386180/2000823 for an example.
As for reducing the backpressure, you could use Flink's built-in backpressure monitoring and flame graphs to identify the bottleneck(s).
If you do that you may find that MongoDB and/or Flink needs to be scaled up. You may also find that your job is spending most of its time performing serialization/deserialization -- optimizing serialization is one of the best ways to increase throughput.