在apache flink中使用水槽以阅读目的?

发布于 2025-02-01 08:54:22 字数 591 浏览 3 评论 0原文

我是Apache Flink(和Stackoverflow)的新手,我想知道处理以下方案的最佳实践:

我目前正在使用其他人的应用程序使用Kafkasource消费实时消息。如果这些消息中的键存在于我创建并可以访问的本地数据库中,则其中一些将需要进行转换。然后,此转换的消息需要一一发送到Kafkasink。

为了检查是否需要转换消息,我需要查看本地数据库中该特定消息的密钥是否存在(我必须查询本地数据库以查看每个消息以检查其密钥)。

什么是有效的方法?

我有2个想法:

  1. 打开与本地数据库的连接,并执行查询以查看我本地数据库中是否存在该消息的记录。对流中的每条消息重复此操作。

  2. 扩展了Flink RichSinkFunction并通过该连接打开连接,并使用Invoke方法执行查询。使用此RichSink重复流中的每条消息。

性能问题:我只想打开一次与本地数据库的连接。我认为方法#1将打开并关闭每条消息的连接,而方法#2只能打开并关闭连接一次。

更一般而言,为了阅读目的而在本地数据库中仅运行一些查询是否合适?我不会使用此RichSink实际将任何数据写入本地数据库。

谢谢!

I am new to Apache Flink(and stackoverflow), and I wanted to know the best practice for dealing with the following scenario:

I am currently consuming real-time message using a KafkaSource from someone else's application. Some of these messages will need to undergo a transformation if the keys in these messages exist in a local database that I have created and have access to. This transformed message then needs to be sent to a KafkaSink one by one.

In order to check if a message needs to be transformed, I need to see if the key for that specific message exists in my local database (I have to query my local database for each message to check for its key).

What is an efficient way to do this?

I have 2 ideas:

  1. Open a connection to the local database and perform a query to see if the record exists in my local database for that message. Repeat this for each message in the stream.

  2. Extend the flink RichSinkFunction and open a connection through that and use the invoke method to perform the query. Use this RichSink to repeat this for each message in the stream.

Performance Concern: I only want to open a connection to the local database once. I think Method #1 would open and close a connection per message while Method #2 would open and close a connection only once.

More generally, is it appropriate to create a RichSink to just run some queries in your local database for read purposes? I am not going to be using this RichSink to actually write any data to the local database.

Thanks!

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

胡渣熟男 2025-02-08 08:54:22

从Flink访问外部系统的首选方法是使用asyncFunction https://nightlies.apache.org/flink/flink/flink/flink/flink-docs-stable/docs/datastream/datastream/operators/apoerators/ashyncio/

数据库可以处理负载并足够快以跟上流吞吐量。如果没有,您将需要从数据库中实现某种CDC流,并将其本地存储为Flink State。然后,让A ConnectedStream 因此,他们都可以在comapcoflatmap操作员中共享状态。

The preferred approach to access external systems from Flink is to use an AsyncFunction: https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/datastream/operators/asyncio/

That is, if your database can handle the load and be fast enough to keep up with the stream throughput. If not, you'll want to implement some kind of CDC stream from your database and store its contents locally as Flink state. Then, have a ConnectedStream so they both can share state in a CoMap or CoFlatMap operator.

把梦留给海 2025-02-08 08:54:22

connectedstreamasyncfunction是解决此类问题的首选方法。

如果您无法访问所有Flink抽象(例如,如果您在Flink的顶部有一些现有框架),但是您可以实例化FlatMapFunction您可以诉诸RichflatMapFunction - 如果您使用Open方法将其实例化,则只需维护与数据库的几个连接。

ConnectedStream and AsyncFunction is the preferred way of approaching this kind of problems.

In case you don't have access to all Flink abstractions (like if you have some existing framework on top of Flink) but you can instantiate FlatMapFunction you can resort to RichFlatMapFunction - you'd maintain just a few connection to database this way if you use open method to instantiate it.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文