kafka将sink连接到mongo仅最后结果有延迟
我有按国家
分组的聚合查询pageView
,结果推送到主题。
并通过kafka连接器下沉到mongodb
{
"connector.class": "MongoDbAtlasSink",
"name": "confluent-mongodb-sink",
"input.data.format" : "JSON",
"connection.host": "ip",
"topics": "viewPageCountByUsers",
"max.num.retries": "3",
"retries.defer.timeout": "5000",
"max.batch.size": "0",
"database": "test",
"collection": "ViewPagesCountByUsers",
"tasks.max": "1"
}
问题是这个数据非常频繁并且对mongodb的负载很大。我如何设置 kafkaconnection 仅按密钥发送最后一个值作为批处理,例如延迟 5 秒? 示例:更新数据库 5 次是没有意义的
{countryID:7, viewCount: 111}
{countryID:7, viewCount: 112}
{countryID:7, viewCount: 113}
{countryID:7, viewCount: 114}
{countryID:7, viewCount: 115}
如果有机会通过键仅发送最后结果并延迟 5 秒,我可以更新 1 次。
// collect batch 5 sec and flush:
{countryID:7, viewCount: 115}
{countryID:8, viewCount: 573}
怎么办呢?
i have aggregation query pageView
group by country
, results push to out topic.
And sink to mongodb by kafka connector
{
"connector.class": "MongoDbAtlasSink",
"name": "confluent-mongodb-sink",
"input.data.format" : "JSON",
"connection.host": "ip",
"topics": "viewPageCountByUsers",
"max.num.retries": "3",
"retries.defer.timeout": "5000",
"max.batch.size": "0",
"database": "test",
"collection": "ViewPagesCountByUsers",
"tasks.max": "1"
}
The problem is that this data is very frequent and very load mongodb. How i can set kafkaconnection that send only last value by key as batch, example with 5 sec delay ?
Example: It's pointless to update the database 5 times
{countryID:7, viewCount: 111}
{countryID:7, viewCount: 112}
{countryID:7, viewCount: 113}
{countryID:7, viewCount: 114}
{countryID:7, viewCount: 115}
If there was an opportunity send only last result by key with 5 sec delay i can update 1 time.
// collect batch 5 sec and flush:
{countryID:7, viewCount: 115}
{countryID:8, viewCount: 573}
How do it?
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
接收器连接器只接受主题中的任何内容,通常不进行批处理。
您需要使用流处理器(例如 Kafka Streams / KSQLdb)来运行窗口聚合,然后输出到一个新主题,您可以从接收器连接器读取该主题。
Sink connectors just take whatever is in the topic, generally without batching.
You'd need to use a stream-processor such as Kafka Streams / KSQLdb to run a windowed-aggregation, then output to a new topic, which you'd read from the sink connector.