聚合&与Kafka流分组

发布于 2025-02-11 08:59:15 字数 2361 浏览 2 评论 0原文

我正在Kafka流和州商店工作。我有以下逻辑,可以根据密钥汇总消息组。

 final Materialized<String, Sample, SessionStore<Bytes, byte[]>> abcStore =
            Materialized.<String, Sample, SessionStore<Bytes, 
 byte[]>>as("topicforchangelog").withCachingDisabled();


 KStream<String, Sample> abcStream = builder.stream("inprocess-topic");


 SessionWindowedKStream<String, Sample> windowedKStream =
            abcStream.groupByKey().windowedBy(SessionWindows.with(ofSeconds("36000")));


    windowedKStream
            .aggregate(
                    Aggregater::initialize,
                    Aggregater::merge,
                    Aggregater::merge,
                    abcStore)
            .toStream()
            .map((key, value) -> KeyValue.pair(key.key(), value))
            .filter((k, v) -> v != null )
            .selectKey((key, value) -> value.getId())
            .peek(abcClass::logOut)
            .to("outputtopic");

基于汇总类中的逻辑,我能够按照我的期望合并以下消息。

 Message 1 : Key : A , value : {"id" : "abc" , "names" : []} 

 Message 2 : Key : A , value : {"id" : null , "names" : [{"name" : "x"}]}

 Message 3 : Key : A , value : {"id" : null , "names" : [{"name" : "xx"}]}

以及我在下面得到的最后一条消息。

 {"id" : "abc" , "names" : [{"name" : "x"}, {"name" : "xx"}]}

首先 三个 消息 > 主题 分钟 >(一个 更多) 如果 > i 发送 a 消息 具有 key ”一个“ -

 Message 4 : Key : A , value : {"id" : null ,  "names" : [{"name" : "xxx"}}

我的投影是形成一个类似的信息

{"id" : "abc" , "names" : [{"name" : "x"}, {"name" : "xx"},{"name" : "xxx"}]}

,但没有发生。我正在接收到{“ id”:null,“ names”的输出:[{“ name”:“ xxx”}}}的

聚合并未发生,而所有记录中存在的所有记录都会以“ a”为键

。由于

  1. 会话窗口是否已过期
  2. 因为消费者的应用程序已经读取了以前的消息?

(或) ,

I am working on kafka streams and state stores. I have a logic below to aggregate group of message based on key.

 final Materialized<String, Sample, SessionStore<Bytes, byte[]>> abcStore =
            Materialized.<String, Sample, SessionStore<Bytes, 
 byte[]>>as("topicforchangelog").withCachingDisabled();


 KStream<String, Sample> abcStream = builder.stream("inprocess-topic");


 SessionWindowedKStream<String, Sample> windowedKStream =
            abcStream.groupByKey().windowedBy(SessionWindows.with(ofSeconds("36000")));


    windowedKStream
            .aggregate(
                    Aggregater::initialize,
                    Aggregater::merge,
                    Aggregater::merge,
                    abcStore)
            .toStream()
            .map((key, value) -> KeyValue.pair(key.key(), value))
            .filter((k, v) -> v != null )
            .selectKey((key, value) -> value.getId())
            .peek(abcClass::logOut)
            .to("outputtopic");

Based on the logic in the Aggregater class , i am able to merge the following messages as i expect.

 Message 1 : Key : A , value : {"id" : "abc" , "names" : []} 

 Message 2 : Key : A , value : {"id" : null , "names" : [{"name" : "x"}]}

 Message 3 : Key : A , value : {"id" : null , "names" : [{"name" : "xx"}]}

And the final message i am getting like below.

 {"id" : "abc" , "names" : [{"name" : "x"}, {"name" : "xx"}]}

The first three messages are pushed to topic in the gap of 5 mins. But After some period of time (one day or more) , if i send a message having key "A" -

 Message 4 : Key : A , value : {"id" : null ,  "names" : [{"name" : "xxx"}}

My expection is to form a message like

{"id" : "abc" , "names" : [{"name" : "x"}, {"name" : "xx"},{"name" : "xxx"}]}

But it is not happenning. I am receiving output like {"id" : null , "names" : [{"name" : "xxx"}}

Aggregation is not happening with all the records present in the "inprocess-topic with "A" as key.

Is this because of

  1. Can Session window is expired?
  2. (or) Because of previous messages are already read by the consumer application and offset is committed?

Can we acheive this scenario with SessionWindow concept?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

浅唱々樱花落 2025-02-18 08:59:15

只是一个疯狂的猜测。因为您正在使用窗口的聚合,所以您可能需要考虑以下内容。

的详细行为

  • KgroupedTable/Streame输入记录
  • 被忽略了。首次收到记录键时,将调用初始化器(并在加法器和减法器之前调用)。请注意,与KgroupedStream相比,随着时间的推移,由于已接收到该密钥的输入墓碑记录,因此可以多次调用初始化器以不止一次调用(请参阅下文)。
  • 当收到一个键(例如,插入)的第一个非NULL值时,则仅调用加法器。
  • 当接收到一个键的随后的非挂钩值(例如,更新)时,(1)以旧值为单位存储的旧值和(2)调用旧值,以及(2)将加法器与输入记录的新值调用那是刚刚收到的。未定义减法器和加法器的执行顺序。
  • 当收到一个键(例如删除)的墓碑记录(即带有零值的记录)时,仅调用减法器。请注意,每当减法器返回零值本身时,就将相应的键从结果的ktable中删除。如果发生这种情况,该密钥的任何下一个输入记录都会再次触发初始化机。

请参阅表中的第6行。

Just a wild guess. Because you are using you a windowed aggregation, you may need to consider the following.

Detailed behaviour of KGroupedTable/Streame

  • Input records with null keys are ignored.
  • When a record key is received for the first time, the initializer is called (and called before the adder and subtractor). Note that, in contrast to KGroupedStream, over time the initializer may be called more than once for a key as a result of having received input tombstone records for that key (see below).
  • When the first non-null value is received for a key (e.g., INSERT), then only the adder is called.
  • When subsequent non-null values are received for a key (e.g., UPDATE), then (1) the subtractor is called with the old value as stored in the table and (2) the adder is called with the new value of the input record that was just received. The order of execution for the subtractor and adder is not defined.
  • When a tombstone record – i.e. a record with a null value – is received for a key (e.g., DELETE), then only the subtractor is called. Note that, whenever the subtractor returns a null value itself, then the corresponding key is removed from the resulting KTable. If that happens, any next input record for that key will trigger the initializer again.

Refer Row no:6 in the table.

enter image description here

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文