来自Timestamp的两个不同KAFKA主题的Spark聚集事件

发布于 2025-02-09 23:55:29 字数 1233 浏览 1 评论 0原文

假设一个具有以下两个主题的Kafka系统:

  • 创建的
  • 删除,

它们用于宣传项目的创建和删除。

Kafka中事件的结构是JSON,这两个主题都是相同的:

{
  "id" : "a1cf621a-2a96-4b70-9dd6-3c54a2819eef"
  "timestamp": "2022-01-05T07:31:04.913000"
}

现在,Spark(Scala)可以如何积累删除和创建的量,以使我们通过时间戳获得许多当前项目。

假设Kafka主题中的以下事件

创建

{"id":"1","timestamp":"2022-01-01T00:00:00.000000"}
{"id":"2","timestamp":"2022-01-02T00:00:00.000000"}

主题:已删除

{"id":"2","timestamp":"2022-01-03T00:00:00.000000"}
{"id":"1","timestamp":"2022-01-04T00:00:00.000000"}

,这基本上是指:

  • 2022-01-01:1创建了项目,总计项目计数为1
  • 2022-- 01-02:1创建项目,项目的总数为2
  • 2022-01-03:1项目已删除,总计项目计数为1
  • 2022-01-04:1项目已删除,总计计数为0,是

0该程序的结果应该是每个时间戳的项目计数,例如:在这里:

----------------------------------------
| timestamp                    | count |
----------------------------------------
| 2022-01-01T00:00:00.000000   | 1     |
| 2022-01-02T00:00:00.000000   | 2     |
| 2022-01-03T00:00:00.000000   | 1     |
| 2022-01-04T00:00:00.000000   | 1     |
----------------------------------------

如何合并两个主题以及时间戳订购的结果?

Assume a kafka system with the following two topics:

  • created
  • deleted

They are used to advertise the creation and deletion of items.

The structure of the events in kafka is JSON and the same for both topics:

{
  "id" : "a1cf621a-2a96-4b70-9dd6-3c54a2819eef"
  "timestamp": "2022-01-05T07:31:04.913000"
}

Now how it it possible with spark (scala) to accumulate the the deleted and created amounts such that we get a number of current items by timestamp.

Assume the following events in kafka

topic: created

{"id":"1","timestamp":"2022-01-01T00:00:00.000000"}
{"id":"2","timestamp":"2022-01-02T00:00:00.000000"}

topic: deleted

{"id":"2","timestamp":"2022-01-03T00:00:00.000000"}
{"id":"1","timestamp":"2022-01-04T00:00:00.000000"}

So this basically means:

  • 2022-01-01: 1 item got created, total count of items is 1
  • 2022-01-02: 1 item got created, total count of items is 2
  • 2022-01-03: 1 item got deleted, total count of items is 1
  • 2022-01-04: 1 item got deleted, total count of items is 0

The resulting output of the program should be the count of items per timestamp, as for example here:

----------------------------------------
| timestamp                    | count |
----------------------------------------
| 2022-01-01T00:00:00.000000   | 1     |
| 2022-01-02T00:00:00.000000   | 2     |
| 2022-01-03T00:00:00.000000   | 1     |
| 2022-01-04T00:00:00.000000   | 1     |
----------------------------------------

How can two topics be merged and the result ordered by timestamp?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

白日梦 2025-02-16 23:55:29

您可以通过.option(“订阅”,“创建,删除”)读取两个主题

然后您都有两个主题的数据框能够按时间戳进行排序,并汇总/减少数据框架以获取输出。

另外,Kafka记录已经在其中有一个时间戳,火花返回为列。因此,您可以将生产者设计更改为具有一个主题,例如evest,从值中删除时间戳,将id移动到记录创建事件将具有非null value,并且删除事件将具有null value

无论哪种方式,您仍然需要一个还原函数


或者,您无法使用SPARK,并且使用建议的主题设计,然后在Kafka Streams / ksqldb中创建一个表已经拥有您想要的数据。不一定使用时间戳信息,而是至少按ID或其他值进行汇总计数。

You can read from both topics with .option("subscribe", "created,deleted")

Then you have a dataframe of both topics, and you can then parse the value for the timestamp, and should be able to sort by the timestamp, and aggregate/reduce the dataframe to get the output.

Alternatively, Kafka records already have a timestamp within them, which Spark returns as a column. So, you can change your producer design to have one topic, say events, remove the timestamp from the value, move the id to the record key, create events will have a non-null value, and delete events will have a null value.

Either way, you still need a reducer function.


Or you can not use Spark, and with the proposed topic design, then creating a table in Kafka Streams / ksqlDB will already have the data you want. Not necessarily with timestamp information, but at least aggregate counts by ID or other value.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文