春季云数据流自定义流动kafka绑定

发布于 2025-01-20 05:45:07 字数 1884 浏览 4 评论 0原文

实际上,我正在 Spring Cloud Data Flow 中做自己的流,开发自己的处理器和接收器。但我对内部卡夫卡绑定有点困惑。我将公开一个示例:

我制作此流:

在此处输入图像描述

自定义处理器的代码如下所示:

@EnableConfigurationProperties({Config.class})
@Configuration
public class ProcessorIndoorGps {

    private CoordDispatcher dispatcher= new CoordDispatcher () ;

    @Bean
    public Function<String, List<Message>> split() {
        System.setProperty("spring.cloud.stream.function.bindings.split-in-0", "input");
        System.setProperty("spring.cloud.stream.function.bindings.split-out-0", "output");
        System.setProperty("spring.cloud.stream.bindings.input.destination", "processor-indorgps.time");
        System.setProperty("spring.cloud.stream.bindings.output.destination", "processor-indorgps.processor-indoorgps");
        return string -> {

....

代码和流工作正常,但我对这些有点恶心System.setProperty (类似于将它们写入属性文件)。它们是绑定 kafka 主题所必需的。

我记得旧版本使用注释(如 @Input 和 @Output )或其他东西来直接绑定主题。

有想法在最新版本中做到这一点吗?

多谢。

版本:

- dataflow=2.9.2
- skipper=2.8.2
- Spring boot version=2.6.3

编辑:我将添加一些有关旧版本的信息

代码是这样的

@EnableBinding(Processor.class)
@EnableConfigurationProperties({Config.class})
@Configuration
public class ProcessorIndoorGps {

    @Splitter(inputChannel = Processor.INPUT, 
              outputChannel = Processor.OUTPUT)
    public List<Message> split(String in) throws ParseException, IOException {

        String uuid=".....";
        Long date = new Date().getTime();
        ...........


- Spring boot version=2.1.4.RELEASE
- dataflow=2.9.2
- skipper=2.8.2

在这个版本中,我不需要任何属性来绑定 kafka 主题。我认为这个绑定是为注释 @EnableBinding 和 @Splitter 完成的,但是这个注释在新的 Spring boot 版本中不存在。

Actually I'm doing my own streams in spring cloud data flow, developing my own processors and sinks. But I'm a bit stucked with the internal kafka bindings. I will expose an example:

I make this stream:

enter image description here

The code of the custom proccesor seems like this:

@EnableConfigurationProperties({Config.class})
@Configuration
public class ProcessorIndoorGps {

    private CoordDispatcher dispatcher= new CoordDispatcher () ;

    @Bean
    public Function<String, List<Message>> split() {
        System.setProperty("spring.cloud.stream.function.bindings.split-in-0", "input");
        System.setProperty("spring.cloud.stream.function.bindings.split-out-0", "output");
        System.setProperty("spring.cloud.stream.bindings.input.destination", "processor-indorgps.time");
        System.setProperty("spring.cloud.stream.bindings.output.destination", "processor-indorgps.processor-indoorgps");
        return string -> {

....

The code and the stream works properly, but I'm a bit disgusting with these System.setProperty (Similar to write them in properties file). They are necesary to bind the kafka topics.

I remember that the older version used annotations (like @Input and @Output ) or something to bind the topics directly.

Any idea to do it in the newest version ?

Thanks a lot.

Versions:

- dataflow=2.9.2
- skipper=2.8.2
- Spring boot version=2.6.3

Edit: I will add some info about the older version

The code was something like this

@EnableBinding(Processor.class)
@EnableConfigurationProperties({Config.class})
@Configuration
public class ProcessorIndoorGps {

    @Splitter(inputChannel = Processor.INPUT, 
              outputChannel = Processor.OUTPUT)
    public List<Message> split(String in) throws ParseException, IOException {

        String uuid=".....";
        Long date = new Date().getTime();
        ...........


- Spring boot version=2.1.4.RELEASE
- dataflow=2.9.2
- skipper=2.8.2

In this version, I not need any property to bind the kafka topics. I think that this binding was done for the annotations @EnableBinding and @Splitter , but this annotations not exist in the new Spring boot version.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文