卡桑德拉轻量级交易

发布于 2025-01-17 10:44:28 字数 1714 浏览 3 评论 0原文

我有一个来自Kafka经纪人的“申请”事件的Flink Job Count。我有一个键入的窗口流,对于每个窗口,我计算事件的数量(我有5分钟的窗口)。最后,我将聚合的数据推入带有Cassandra水槽连接器的Cassandra。

应用程序window_startwindow_endcount
app12022-03-21 15:00:002022-03-21 15:05:0015
app22022-03-21 15:00:002022-03-21 15 :05:0023

我还处理带有侧输出的晚期事件。因此,当迟到的事件到来时,我会有这种情况:

| App1 | 2022-03-21 15:00:00 | 2022-03-21 15:05:00 | 1 |

预期输出必须是:

应用程序window_startwindow_endcount
app12022-03-21 15:00:002022-03-21 15:05:0016
App22022-03-2115:00:00:00 2022-03-21 15:05:0023

但它是:

应用程序window_startwindow_endcount
app12022-03-21 15:00:002022-03-21 15:05:001
App22022-03-21 15:00 :002022-03-21 15:05:0023

这是因为Cassandra消耗了Upsert。 使用SQL数据库(es。postgresql),我可以处理此问题:

插入表(window_start,window_end,application,count)value(?,,?,,??,?)(window_start,window_end_end) ,event_name,应用程序)做更新集count = exclud.count + table.count;

是否可以使用cassandra进行此类查询,还是这是一个限制?

请记住,我正在使用Apache Flink和Cassandra作为水槽,Java是我的编程语言。

谢谢!

I have a Flink job counts "application" events coming from a Kafka broker. I have a keyed window stream and, for each window, I count the number of events (I have 5-minutes windows). Finally I push the aggregated data into Cassandra with Cassandra Sink connector.

applicationwindow_startwindow_endcount
app12022-03-21 15:00:002022-03-21 15:05:0015
app22022-03-21 15:00:002022-03-21 15:05:0023

I also handle late events with side-outputs. So, when a late event arrives I have this situation:

| app1 | 2022-03-21 15:00:00 | 2022-03-21 15:05:00 | 1 |

The expected output must be:

applicationwindow_startwindow_endcount
app12022-03-21 15:00:002022-03-21 15:05:0016
app22022-03-21 15:00:002022-03-21 15:05:0023

But it is:

applicationwindow_startwindow_endcount
app12022-03-21 15:00:002022-03-21 15:05:001
app22022-03-21 15:00:002022-03-21 15:05:0023

This because Cassandra uses UPSERT.
With a SQL database (es. PostgreSQL) I can handle this problem:

"INSERT INTO table(window_start, window_end, application, count) VALUES(?, ?, ?, ?) ON CONFLICT (window_start, window_end, event_name, application) DO UPDATE SET count=EXCLUDED.count + table.count;"

Is it possible to make this kind of query with Cassandra, or this is a limitation?

Remember I'm using Apache Flink and Cassandra as Sink and Java is my programming language.

Thanks!

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文