春季云数据流自定义流动kafka绑定
实际上,我正在 Spring Cloud Data Flow 中做自己的流,开发自己的处理器和接收器。但我对内部卡夫卡绑定有点困惑。我将公开一个示例:
我制作此流:
自定义处理器的代码如下所示:
@EnableConfigurationProperties({Config.class})
@Configuration
public class ProcessorIndoorGps {
private CoordDispatcher dispatcher= new CoordDispatcher () ;
@Bean
public Function<String, List<Message>> split() {
System.setProperty("spring.cloud.stream.function.bindings.split-in-0", "input");
System.setProperty("spring.cloud.stream.function.bindings.split-out-0", "output");
System.setProperty("spring.cloud.stream.bindings.input.destination", "processor-indorgps.time");
System.setProperty("spring.cloud.stream.bindings.output.destination", "processor-indorgps.processor-indoorgps");
return string -> {
....
代码和流工作正常,但我对这些有点恶心System.setProperty
(类似于将它们写入属性文件)。它们是绑定 kafka 主题所必需的。
我记得旧版本使用注释(如 @Input 和 @Output )或其他东西来直接绑定主题。
有想法在最新版本中做到这一点吗?
多谢。
版本:
- dataflow=2.9.2
- skipper=2.8.2
- Spring boot version=2.6.3
编辑:我将添加一些有关旧版本的信息
代码是这样的
@EnableBinding(Processor.class)
@EnableConfigurationProperties({Config.class})
@Configuration
public class ProcessorIndoorGps {
@Splitter(inputChannel = Processor.INPUT,
outputChannel = Processor.OUTPUT)
public List<Message> split(String in) throws ParseException, IOException {
String uuid=".....";
Long date = new Date().getTime();
...........
- Spring boot version=2.1.4.RELEASE
- dataflow=2.9.2
- skipper=2.8.2
在这个版本中,我不需要任何属性来绑定 kafka 主题。我认为这个绑定是为注释 @EnableBinding 和 @Splitter 完成的,但是这个注释在新的 Spring boot 版本中不存在。
Actually I'm doing my own streams in spring cloud data flow, developing my own processors and sinks. But I'm a bit stucked with the internal kafka bindings. I will expose an example:
I make this stream:
The code of the custom proccesor seems like this:
@EnableConfigurationProperties({Config.class})
@Configuration
public class ProcessorIndoorGps {
private CoordDispatcher dispatcher= new CoordDispatcher () ;
@Bean
public Function<String, List<Message>> split() {
System.setProperty("spring.cloud.stream.function.bindings.split-in-0", "input");
System.setProperty("spring.cloud.stream.function.bindings.split-out-0", "output");
System.setProperty("spring.cloud.stream.bindings.input.destination", "processor-indorgps.time");
System.setProperty("spring.cloud.stream.bindings.output.destination", "processor-indorgps.processor-indoorgps");
return string -> {
....
The code and the stream works properly, but I'm a bit disgusting with these System.setProperty
(Similar to write them in properties file). They are necesary to bind the kafka topics.
I remember that the older version used annotations (like @Input and @Output ) or something to bind the topics directly.
Any idea to do it in the newest version ?
Thanks a lot.
Versions:
- dataflow=2.9.2
- skipper=2.8.2
- Spring boot version=2.6.3
Edit: I will add some info about the older version
The code was something like this
@EnableBinding(Processor.class)
@EnableConfigurationProperties({Config.class})
@Configuration
public class ProcessorIndoorGps {
@Splitter(inputChannel = Processor.INPUT,
outputChannel = Processor.OUTPUT)
public List<Message> split(String in) throws ParseException, IOException {
String uuid=".....";
Long date = new Date().getTime();
...........
- Spring boot version=2.1.4.RELEASE
- dataflow=2.9.2
- skipper=2.8.2
In this version, I not need any property to bind the kafka topics. I think that this binding was done for the annotations @EnableBinding and @Splitter , but this annotations not exist in the new Spring boot version.
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论