聚合&与Kafka流分组
我正在Kafka流和州商店工作。我有以下逻辑,可以根据密钥汇总消息组。
final Materialized<String, Sample, SessionStore<Bytes, byte[]>> abcStore =
Materialized.<String, Sample, SessionStore<Bytes,
byte[]>>as("topicforchangelog").withCachingDisabled();
KStream<String, Sample> abcStream = builder.stream("inprocess-topic");
SessionWindowedKStream<String, Sample> windowedKStream =
abcStream.groupByKey().windowedBy(SessionWindows.with(ofSeconds("36000")));
windowedKStream
.aggregate(
Aggregater::initialize,
Aggregater::merge,
Aggregater::merge,
abcStore)
.toStream()
.map((key, value) -> KeyValue.pair(key.key(), value))
.filter((k, v) -> v != null )
.selectKey((key, value) -> value.getId())
.peek(abcClass::logOut)
.to("outputtopic");
基于汇总类中的逻辑,我能够按照我的期望合并以下消息。
Message 1 : Key : A , value : {"id" : "abc" , "names" : []}
Message 2 : Key : A , value : {"id" : null , "names" : [{"name" : "x"}]}
Message 3 : Key : A , value : {"id" : null , "names" : [{"name" : "xx"}]}
以及我在下面得到的最后一条消息。
{"id" : "abc" , "names" : [{"name" : "x"}, {"name" : "xx"}]}
首先 三个 消息 是 > 主题 分钟。 但 >(一个 天 或 更多) , 如果 > i 发送 a 消息 具有 key ”一个“ -
Message 4 : Key : A , value : {"id" : null , "names" : [{"name" : "xxx"}}
我的投影是形成一个类似的信息
{"id" : "abc" , "names" : [{"name" : "x"}, {"name" : "xx"},{"name" : "xxx"}]}
,但没有发生。我正在接收到{“ id”:null,“ names”的输出:[{“ name”:“ xxx”}}}的
聚合并未发生,而所有记录中存在的所有记录都会以“ a”为键
。由于
- 会话窗口是否已过期
- 因为消费者的应用程序已经读取了以前的消息?
(或) ,
I am working on kafka streams and state stores. I have a logic below to aggregate group of message based on key.
final Materialized<String, Sample, SessionStore<Bytes, byte[]>> abcStore =
Materialized.<String, Sample, SessionStore<Bytes,
byte[]>>as("topicforchangelog").withCachingDisabled();
KStream<String, Sample> abcStream = builder.stream("inprocess-topic");
SessionWindowedKStream<String, Sample> windowedKStream =
abcStream.groupByKey().windowedBy(SessionWindows.with(ofSeconds("36000")));
windowedKStream
.aggregate(
Aggregater::initialize,
Aggregater::merge,
Aggregater::merge,
abcStore)
.toStream()
.map((key, value) -> KeyValue.pair(key.key(), value))
.filter((k, v) -> v != null )
.selectKey((key, value) -> value.getId())
.peek(abcClass::logOut)
.to("outputtopic");
Based on the logic in the Aggregater class , i am able to merge the following messages as i expect.
Message 1 : Key : A , value : {"id" : "abc" , "names" : []}
Message 2 : Key : A , value : {"id" : null , "names" : [{"name" : "x"}]}
Message 3 : Key : A , value : {"id" : null , "names" : [{"name" : "xx"}]}
And the final message i am getting like below.
{"id" : "abc" , "names" : [{"name" : "x"}, {"name" : "xx"}]}
The first three messages are pushed to topic in the gap of 5 mins. But After some period of time (one day or more) , if i send a message having key "A" -
Message 4 : Key : A , value : {"id" : null , "names" : [{"name" : "xxx"}}
My expection is to form a message like
{"id" : "abc" , "names" : [{"name" : "x"}, {"name" : "xx"},{"name" : "xxx"}]}
But it is not happenning. I am receiving output like {"id" : null , "names" : [{"name" : "xxx"}}
Aggregation is not happening with all the records present in the "inprocess-topic with "A" as key.
Is this because of
- Can Session window is expired?
- (or) Because of previous messages are already read by the consumer application and offset is committed?
Can we acheive this scenario with SessionWindow concept?
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
只是一个疯狂的猜测。因为您正在使用窗口的聚合,所以您可能需要考虑以下内容。
的详细行为
请参阅表中的第6行。
Just a wild guess. Because you are using you a windowed aggregation, you may need to consider the following.
Detailed behaviour of KGroupedTable/Streame
Refer Row no:6 in the table.