窗口 KTable 上的 Kafka 操作

发布于 2025-01-09 18:17:54 字数 1580 浏览 1 评论 0原文

我想对窗口 KTable 进行一些进一步的操作。为了提供一些背景知识,我有一个主题,其数据格式为:{clientId, txTimestamp, txAmount}。从本主题中,我创建了一个流,按 clientId 进行分区,基础主题时间戳等于 txTimestamp 事件字段。从这个流开始,我想聚合每 1 小时窗口中每个 clientId 的事务数。这是通过类似于以下内容完成的: transactions_per_client;

使用 (kafka_topic='transactions_per_client_topic') AS SELECT clientId, COUNT(*) AS transactions_per_client, WINDOWSTART AS window_start, WINDOWEND AS window_end FROM transactions_stream WINDOW TUMBLING (SIZE 1 HOURS) GROUP BY clientId; 创建表 聚合按预期工作并产生类似于以下内容的值:

ClientIdTransactions_per_clientwindowsStartWindowEnd
11212
2812
12423
11934

我现在要做的是进一步处理此表以添加一列,该列表示 2 个相邻窗口之间每个客户端的事务数差异同一个客户。对于上一个表,这将是这样的:

ClientIdTransactions_per_clientwindowsStartWindowEndDeviation
112120
28120
1242312
11934-5

实现此目的的最佳方法是什么(使用kafka 流或 ksql)?我尝试使用用户定义的聚合函数来尝试创建此列,但它不能应用于 KTable,只能应用于 KStream。

I would like to do some further operations on a windowed KTable. To give some background, I have a topic with data in the form of: {clientId, txTimestamp, txAmount}. From this topic, I have created a stream, partitioned by clientId with the underlying topic timestamp equal to the txTimestamp event field. Starting from this stream, I want to aggregate the number of transactions per clientId in every 1 hour windows. This is done with something similar to the following:
CREATE TABLE transactions_per_client WITH (kafka_topic='transactions_per_client_topic') AS SELECT clientId, COUNT(*) AS transactions_per_client, WINDOWSTART AS window_start, WINDOWEND AS window_end FROM transactions_stream WINDOW TUMBLING (SIZE 1 HOURS) GROUP BY clientId;

The aggregations work as expected and yield values similar to:

ClientIdTransactions_per_clientwindowsStartWindowEnd
11212
2812
12423
11934

What I want to do now is further process this table to add a column that represents the difference in number of transactions per client between 2 adjacent windows for the same client. For the previous table, that would be something like this:

ClientIdTransactions_per_clientwindowsStartWindowEndDeviation
112120
28120
1242312
11934-5

What would be the best way to achieve this (either using kafka streams or ksql)? I tried to use the User Defined Aggregation functions to try to create this column but it cannot be applied to a KTable, only to a KStream.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

橘味果▽酱 2025-01-16 18:17:55

仅供将来参考,目前(2022 年 4 月)的官方答案是,它不能通过 DSL 在 kafka-streams 中完成,因为“Windowed-TABLE 在 ksqlDB atm 中是一种“死胡同”,对于 Kafka Streams 也是如此,你不能真正使用 DSL 来进一步处理数据”(Confluence 论坛上的回答:https://forum.confluence.io/t/aggregations-on-windowed- ktables/4340)。建议使用处理器 API,这确实可以非常简单地实现。在高级伪代码中,它会是这样的:

topology.addSource(NAME_OF_SOURCE_IN_THE_NEW_TOPOLOGY, 
timeWindowedDeserializer, LongDeserializer, SOURCE_TOPIC -> the topic with the windowed KTable);

topology.addProcessor(
                NAME_OF_PROCESSOR_IN_THE_NEW_TOPOLOGY,
                () -> new Aggregator(storeName),
                NAME_OF_SOURCE_IN_THE_NEW_TOPOLOGY);
StoreBuilder storeBuilder = keyValueStoreBuilder for the timeWindowedSerde and a Long serde for value;

topology.addStateStore(storeBuilder, NAME_OF_PROCESSOR_IN_THE_NEW_TOPOLOGY);

topology.addSink(
                NAME_OF_SINK_IN_THE_NEW_TOPOLOGY,
                sinkTopic,
                timeWindowedSerializer,
                Serializer for the new structure -> POJO that contains the deviation field,
                NAME_OF_PROCESSOR_IN_THE_NEW_TOPOLOGY);

上一节中的聚合器是一个 org.apache.kafka.streams.processor.api.Processor 实现,它跟踪它所看到的值并能够检索给定键的先前看到的值。
同样,在较高的层次上,它会类似于:

Long previousTransactionAggregate = kvStore.get(previousWindow);

long deviation;
if (previousTransactionAggregate != null) {
      deviation = kafkaRecord.value() - previousTransactionAggregate;
} else {
     deviation = 0L;
}
kvStore.put(kafkaRecord.key(), kafkaRecord.value());
Record<Windowed<Long>, TransactionPerNumericKey> newRecord =
                new Record<>(
                        kafkaRecord.key(),
                        new TransactionPerNumericKey(
                                kafkaRecord.key().key(), kafkaRecord.value(), deviation),
                        kafkaRecord.timestamp());

context.forward(newRecord);

上一节中的 TransactionPerNumericKey 是增强窗口聚合的结构名称(包含偏差值)

Just for future reference, the official answer at this time (April 2022) is that it cannot be done in kafka-streams through a DSL as "Windowed-TABLE are kind of a “dead end” in ksqlDB atm, and also for Kafka Streams, you cannot really use the DSL to further process the data" (answer on Confluent forum here: https://forum.confluent.io/t/aggregations-on-windowed-ktables/4340). The suggestion there is to use the Processor API, which indeed can be pretty straightforward to implement. At a high level pseudocode, it would be something like this:

topology.addSource(NAME_OF_SOURCE_IN_THE_NEW_TOPOLOGY, 
timeWindowedDeserializer, LongDeserializer, SOURCE_TOPIC -> the topic with the windowed KTable);

topology.addProcessor(
                NAME_OF_PROCESSOR_IN_THE_NEW_TOPOLOGY,
                () -> new Aggregator(storeName),
                NAME_OF_SOURCE_IN_THE_NEW_TOPOLOGY);
StoreBuilder storeBuilder = keyValueStoreBuilder for the timeWindowedSerde and a Long serde for value;

topology.addStateStore(storeBuilder, NAME_OF_PROCESSOR_IN_THE_NEW_TOPOLOGY);

topology.addSink(
                NAME_OF_SINK_IN_THE_NEW_TOPOLOGY,
                sinkTopic,
                timeWindowedSerializer,
                Serializer for the new structure -> POJO that contains the deviation field,
                NAME_OF_PROCESSOR_IN_THE_NEW_TOPOLOGY);

The aggregator in the previous section is a org.apache.kafka.streams.processor.api.Processor implementation that is keeping track of the values it has seen and is able to retrieve the previous seen value for a given key.
Again, at a high level it would be something similar to this:

Long previousTransactionAggregate = kvStore.get(previousWindow);

long deviation;
if (previousTransactionAggregate != null) {
      deviation = kafkaRecord.value() - previousTransactionAggregate;
} else {
     deviation = 0L;
}
kvStore.put(kafkaRecord.key(), kafkaRecord.value());
Record<Windowed<Long>, TransactionPerNumericKey> newRecord =
                new Record<>(
                        kafkaRecord.key(),
                        new TransactionPerNumericKey(
                                kafkaRecord.key().key(), kafkaRecord.value(), deviation),
                        kafkaRecord.timestamp());

context.forward(newRecord);

TransactionPerNumericKey in the previous section is the name of the structure for the enhanced windowed aggregation (containing the deviation value)

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文