kafka流:如何避免在完成之前抑制调度器注点器

发布于 2025-01-29 04:49:25 字数 1141 浏览 3 评论 0原文

我有一个Kafka流应用程序,需要将一些数据处理到stateStore每个 x 分钟。

我创建了一个接收数据的变压器并将其存储到stateStore中。变压器是通过transformerSupplier自动实例化的。

    TransformerSupplier<String, Message, KeyValue<String, Message>> getTransformerSupplier(){
            if(transformerSupplier == null)
                transformerSupplier = () -> new MyTransformer();
            return transformerSupplier;
    }

然后,当实例化时,变压器将标点器安排到上下文中。

this.context.schedule(Duration.ofSeconds(schedulerSeconds), PunctuationType.WALL_CLOCK_TIME, new MyScheduler);

调度程序覆盖了包含将数据处理到stateStore所需的代码的标点功能。我添加了一些日志线,以验证计划程序在调度程序结束之前完成。

@Override
public void punctuate(long timestamp) {
    logger.info("Scheduler started at {}", Instant.now());
    processMyDataIntoStateStore();
    logger.info("Scheduler ended at {}", Instant.now());      
}

我看到的问题是,在搜索上述日志行时,我应该期望“启动”和“结束”的数字相同,但是我实际上看到一半的执行程序未达到“调度程序结束”。

看来点仪操作在上下文完成之前过早停止了,为什么会发生这种情况?有没有办法避免在完成前停止?

无论如何,另一个调度程序是针对下一个执行实例化的,但是我希望如果没有先完成以前的情况,我希望不会停止操作。

感谢您的帮助

I have a Kafka stream application that needs to process some data into a StateStore every x minutes.

I have created a transformer receiving data and storing them into the StateStore. The Transformer is instantiated automatically by the kafka stream framework via a TransformerSupplier.

    TransformerSupplier<String, Message, KeyValue<String, Message>> getTransformerSupplier(){
            if(transformerSupplier == null)
                transformerSupplier = () -> new MyTransformer();
            return transformerSupplier;
    }

The transformer then Schedule a Punctuator into the context when instantiated.

this.context.schedule(Duration.ofSeconds(schedulerSeconds), PunctuationType.WALL_CLOCK_TIME, new MyScheduler);

The scheduler overrides the punctuate function containing the code needed to handle the data into the StateStore. I have added some log lines to verify that the punctuate operation is completed before the Scheduler ends.

@Override
public void punctuate(long timestamp) {
    logger.info("Scheduler started at {}", Instant.now());
    processMyDataIntoStateStore();
    logger.info("Scheduler ended at {}", Instant.now());      
}

The problem I am seeing is that when searching for the above log lines I should expect the same number for 'started' and 'ended', but I actually see that half of the executions don't reach the 'Scheduler ended'.

It seems that the punctuator operation is stopped prematurely before completed by the context, why is this happening? is there a way to avoid stopping before completion?

Another scheduler is anyway instantiated for the next execution but I would prefer that the operation is not stopped without first complete the previous.

thanks for any help

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

-小熊_ 2025-02-05 04:49:25

将其移动到单独的线程可能会完成作业,因为标点是在变压器的同一线程上执行的。

probably moving it on to a separate thread would do the job as the punctuator is executed on the same thread of the transformer.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文