kafka将sink连接到mongo仅最后结果有延迟

发布于 2025-01-20 06:58:45 字数 950 浏览 4 评论 0原文

我有按国家分组的聚合查询pageView,结果推送到主题。

并通过kafka连接器下沉到mongodb

{
    "connector.class": "MongoDbAtlasSink",
    "name": "confluent-mongodb-sink",
    "input.data.format" : "JSON",
    "connection.host": "ip",
    "topics": "viewPageCountByUsers",
    "max.num.retries": "3",
    "retries.defer.timeout": "5000",
    "max.batch.size": "0",
    "database": "test",
    "collection": "ViewPagesCountByUsers",
    "tasks.max": "1"
}

问题是这个数据非常频繁并且对mongodb的负载很大。我如何设置 kafkaconnection 仅按密钥发送最后一个值作为批处理,例如延迟 5 秒? 示例:更新数据库 5 次是没有意义的

{countryID:7, viewCount: 111}
{countryID:7, viewCount: 112}
{countryID:7, viewCount: 113}
{countryID:7, viewCount: 114}
{countryID:7, viewCount: 115}

如果有机会通过键仅发送最后结果并延迟 5 秒,我可以更新 1 次。

// collect batch 5 sec and flush: 
{countryID:7, viewCount: 115}
{countryID:8, viewCount: 573}

怎么办呢?

i have aggregation query pageView group by country, results push to out topic.

And sink to mongodb by kafka connector

{
    "connector.class": "MongoDbAtlasSink",
    "name": "confluent-mongodb-sink",
    "input.data.format" : "JSON",
    "connection.host": "ip",
    "topics": "viewPageCountByUsers",
    "max.num.retries": "3",
    "retries.defer.timeout": "5000",
    "max.batch.size": "0",
    "database": "test",
    "collection": "ViewPagesCountByUsers",
    "tasks.max": "1"
}

The problem is that this data is very frequent and very load mongodb. How i can set kafkaconnection that send only last value by key as batch, example with 5 sec delay ?
Example: It's pointless to update the database 5 times

{countryID:7, viewCount: 111}
{countryID:7, viewCount: 112}
{countryID:7, viewCount: 113}
{countryID:7, viewCount: 114}
{countryID:7, viewCount: 115}

If there was an opportunity send only last result by key with 5 sec delay i can update 1 time.

// collect batch 5 sec and flush: 
{countryID:7, viewCount: 115}
{countryID:8, viewCount: 573}

How do it?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

一梦浮鱼 2025-01-27 06:58:45

接收器连接器只接受主题中的任何内容,通常不进行批处理。

您需要使用流处理器(例如 Kafka Streams / KSQLdb)来运行窗口聚合,然后输出到一个新主题,您可以从接收器连接器读取该主题。

Sink connectors just take whatever is in the topic, generally without batching.

You'd need to use a stream-processor such as Kafka Streams / KSQLdb to run a windowed-aggregation, then output to a new topic, which you'd read from the sink connector.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文