来自Timestamp的两个不同KAFKA主题的Spark聚集事件
假设一个具有以下两个主题的Kafka系统:
- 创建的
- 删除,
它们用于宣传项目的创建和删除。
Kafka中事件的结构是JSON,这两个主题都是相同的:
{
"id" : "a1cf621a-2a96-4b70-9dd6-3c54a2819eef"
"timestamp": "2022-01-05T07:31:04.913000"
}
现在,Spark(Scala)可以如何积累删除和创建的量,以使我们通过时间戳获得许多当前项目。
假设Kafka主题中的以下事件
:创建
{"id":"1","timestamp":"2022-01-01T00:00:00.000000"}
{"id":"2","timestamp":"2022-01-02T00:00:00.000000"}
主题:已删除
{"id":"2","timestamp":"2022-01-03T00:00:00.000000"}
{"id":"1","timestamp":"2022-01-04T00:00:00.000000"}
,这基本上是指:
- 2022-01-01:1创建了项目,总计项目计数为1
- 2022-- 01-02:1创建项目,项目的总数为2
- 2022-01-03:1项目已删除,总计项目计数为1
- 2022-01-04:1项目已删除,总计计数为0,是
0该程序的结果应该是每个时间戳的项目计数,例如:在这里:
----------------------------------------
| timestamp | count |
----------------------------------------
| 2022-01-01T00:00:00.000000 | 1 |
| 2022-01-02T00:00:00.000000 | 2 |
| 2022-01-03T00:00:00.000000 | 1 |
| 2022-01-04T00:00:00.000000 | 1 |
----------------------------------------
如何合并两个主题以及时间戳订购的结果?
Assume a kafka system with the following two topics:
- created
- deleted
They are used to advertise the creation and deletion of items.
The structure of the events in kafka is JSON and the same for both topics:
{
"id" : "a1cf621a-2a96-4b70-9dd6-3c54a2819eef"
"timestamp": "2022-01-05T07:31:04.913000"
}
Now how it it possible with spark (scala) to accumulate the the deleted and created amounts such that we get a number of current items by timestamp.
Assume the following events in kafka
topic: created
{"id":"1","timestamp":"2022-01-01T00:00:00.000000"}
{"id":"2","timestamp":"2022-01-02T00:00:00.000000"}
topic: deleted
{"id":"2","timestamp":"2022-01-03T00:00:00.000000"}
{"id":"1","timestamp":"2022-01-04T00:00:00.000000"}
So this basically means:
- 2022-01-01: 1 item got created, total count of items is 1
- 2022-01-02: 1 item got created, total count of items is 2
- 2022-01-03: 1 item got deleted, total count of items is 1
- 2022-01-04: 1 item got deleted, total count of items is 0
The resulting output of the program should be the count of items per timestamp, as for example here:
----------------------------------------
| timestamp | count |
----------------------------------------
| 2022-01-01T00:00:00.000000 | 1 |
| 2022-01-02T00:00:00.000000 | 2 |
| 2022-01-03T00:00:00.000000 | 1 |
| 2022-01-04T00:00:00.000000 | 1 |
----------------------------------------
How can two topics be merged and the result ordered by timestamp?
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
您可以通过
.option(“订阅”,“创建,删除”)读取两个主题
然后您都有两个主题的数据框能够按时间戳进行排序,并汇总/减少数据框架以获取输出。
另外,Kafka记录已经在其中有一个时间戳,火花返回为列。因此,您可以将生产者设计更改为具有一个主题,例如
evest
,从值中删除时间戳,将id
移动到记录键
创建事件将具有非nullvalue
,并且删除事件将具有null
value
。无论哪种方式,您仍然需要一个还原函数。
或者,您无法使用SPARK,并且使用建议的主题设计,然后在Kafka Streams / ksqldb中创建一个表已经拥有您想要的数据。不一定使用时间戳信息,而是至少按ID或其他值进行汇总计数。
You can read from both topics with
.option("subscribe", "created,deleted")
Then you have a dataframe of both topics, and you can then parse the value for the timestamp, and should be able to sort by the timestamp, and aggregate/reduce the dataframe to get the output.
Alternatively, Kafka records already have a timestamp within them, which Spark returns as a column. So, you can change your producer design to have one topic, say
events
, remove the timestamp from the value, move theid
to the recordkey
, create events will have a non-nullvalue
, and delete events will have anull
value
.Either way, you still need a reducer function.
Or you can not use Spark, and with the proposed topic design, then creating a table in Kafka Streams / ksqlDB will already have the data you want. Not necessarily with timestamp information, but at least aggregate counts by ID or other value.