卡桑德拉轻量级交易
我有一个来自Kafka经纪人的“申请”事件的Flink Job Count。我有一个键入的窗口流,对于每个窗口,我计算事件的数量(我有5分钟的窗口)。最后,我将聚合的数据推入带有Cassandra水槽连接器的Cassandra。
应用程序 | window_start | window_end | count |
---|---|---|---|
app1 | 2022-03-21 15:00:00 | 2022-03-21 15:05:00 | 15 |
app2 | 2022-03-21 15:00:00 | 2022-03-21 15 :05:00 | 23 |
我还处理带有侧输出的晚期事件。因此,当迟到的事件到来时,我会有这种情况:
| App1 | 2022-03-21 15:00:00 | 2022-03-21 15:05:00 | 1 |
预期输出必须是:
应用程序 | window_start | window_end | count |
---|---|---|---|
app1 | 2022-03-21 15:00:00 | 2022-03-21 15:05:00 | 16 |
App2 | 2022-03-21 | 15:00:00:00 2022-03-21 15:05:00 | 23 |
但它是:
应用程序 | window_start | window_end | count |
---|---|---|---|
app1 | 2022-03-21 15:00:00 | 2022-03-21 15:05:00 | 1 |
App2 | 2022-03-21 15:00 :00 | 2022-03-21 15:05:00 | 23 |
这是因为Cassandra消耗了Upsert。 使用SQL数据库(es。postgresql),我可以处理此问题:
“ 插入表(window_start,window_end,application,count)value(?,,?,,??,?)(window_start,window_end_end) ,event_name,应用程序)做更新集count = exclud.count + table.count; “
是否可以使用cassandra进行此类查询,还是这是一个限制?
请记住,我正在使用Apache Flink和Cassandra作为水槽,Java是我的编程语言。
谢谢!
I have a Flink job counts "application" events coming from a Kafka broker. I have a keyed window stream and, for each window, I count the number of events (I have 5-minutes windows). Finally I push the aggregated data into Cassandra with Cassandra Sink connector.
application | window_start | window_end | count |
---|---|---|---|
app1 | 2022-03-21 15:00:00 | 2022-03-21 15:05:00 | 15 |
app2 | 2022-03-21 15:00:00 | 2022-03-21 15:05:00 | 23 |
I also handle late events with side-outputs. So, when a late event arrives I have this situation:
| app1 | 2022-03-21 15:00:00 | 2022-03-21 15:05:00 | 1 |
The expected output must be:
application | window_start | window_end | count |
---|---|---|---|
app1 | 2022-03-21 15:00:00 | 2022-03-21 15:05:00 | 16 |
app2 | 2022-03-21 15:00:00 | 2022-03-21 15:05:00 | 23 |
But it is:
application | window_start | window_end | count |
---|---|---|---|
app1 | 2022-03-21 15:00:00 | 2022-03-21 15:05:00 | 1 |
app2 | 2022-03-21 15:00:00 | 2022-03-21 15:05:00 | 23 |
This because Cassandra uses UPSERT.
With a SQL database (es. PostgreSQL) I can handle this problem:
"INSERT INTO table(window_start, window_end, application, count) VALUES(?, ?, ?, ?) ON CONFLICT (window_start, window_end, event_name, application) DO UPDATE SET count=EXCLUDED.count + table.count;"
Is it possible to make this kind of query with Cassandra, or this is a limitation?
Remember I'm using Apache Flink and Cassandra as Sink and Java is my programming language.
Thanks!
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
data:image/s3,"s3://crabby-images/d5906/d59060df4059a6cc364216c4d63ceec29ef7fe66" alt="扫码二维码加入Web技术交流群"
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论