在apache flink中使用水槽以阅读目的?
我是Apache Flink(和Stackoverflow)的新手,我想知道处理以下方案的最佳实践:
我目前正在使用其他人的应用程序使用Kafkasource消费实时消息。如果这些消息中的键存在于我创建并可以访问的本地数据库中,则其中一些将需要进行转换。然后,此转换的消息需要一一发送到Kafkasink。
为了检查是否需要转换消息,我需要查看本地数据库中该特定消息的密钥是否存在(我必须查询本地数据库以查看每个消息以检查其密钥)。
什么是有效的方法?
我有2个想法:
打开与本地数据库的连接,并执行查询以查看我本地数据库中是否存在该消息的记录。对流中的每条消息重复此操作。
扩展了Flink RichSinkFunction并通过该连接打开连接,并使用Invoke方法执行查询。使用此RichSink重复流中的每条消息。
性能问题:我只想打开一次与本地数据库的连接。我认为方法#1将打开并关闭每条消息的连接,而方法#2只能打开并关闭连接一次。
更一般而言,为了阅读目的而在本地数据库中仅运行一些查询是否合适?我不会使用此RichSink实际将任何数据写入本地数据库。
谢谢!
I am new to Apache Flink(and stackoverflow), and I wanted to know the best practice for dealing with the following scenario:
I am currently consuming real-time message using a KafkaSource from someone else's application. Some of these messages will need to undergo a transformation if the keys in these messages exist in a local database that I have created and have access to. This transformed message then needs to be sent to a KafkaSink one by one.
In order to check if a message needs to be transformed, I need to see if the key for that specific message exists in my local database (I have to query my local database for each message to check for its key).
What is an efficient way to do this?
I have 2 ideas:
Open a connection to the local database and perform a query to see if the record exists in my local database for that message. Repeat this for each message in the stream.
Extend the flink RichSinkFunction and open a connection through that and use the invoke method to perform the query. Use this RichSink to repeat this for each message in the stream.
Performance Concern: I only want to open a connection to the local database once. I think Method #1 would open and close a connection per message while Method #2 would open and close a connection only once.
More generally, is it appropriate to create a RichSink to just run some queries in your local database for read purposes? I am not going to be using this RichSink to actually write any data to the local database.
Thanks!
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(2)
从Flink访问外部系统的首选方法是使用
asyncFunction
: https://nightlies.apache.org/flink/flink/flink/flink/flink-docs-stable/docs/datastream/datastream/operators/apoerators/ashyncio/数据库可以处理负载并足够快以跟上流吞吐量。如果没有,您将需要从数据库中实现某种CDC流,并将其本地存储为Flink State。然后,让A ConnectedStream 因此,他们都可以在
comap
或coflatmap
操作员中共享状态。The preferred approach to access external systems from Flink is to use an
AsyncFunction
: https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/datastream/operators/asyncio/That is, if your database can handle the load and be fast enough to keep up with the stream throughput. If not, you'll want to implement some kind of CDC stream from your database and store its contents locally as Flink state. Then, have a
ConnectedStream
so they both can share state in aCoMap
orCoFlatMap
operator.connectedstream
和asyncfunction
是解决此类问题的首选方法。如果您无法访问所有Flink抽象(例如,如果您在Flink的顶部有一些现有框架),但是您可以实例化
FlatMapFunction
您可以诉诸RichflatMapFunction
- 如果您使用Open
方法将其实例化,则只需维护与数据库的几个连接。ConnectedStream
andAsyncFunction
is the preferred way of approaching this kind of problems.In case you don't have access to all Flink abstractions (like if you have some existing framework on top of Flink) but you can instantiate
FlatMapFunction
you can resort toRichFlatMapFunction
- you'd maintain just a few connection to database this way if you useopen
method to instantiate it.