返回介绍

使用详解

发布于 2024-08-18 11:12:34 字数 25292 浏览 0 评论 0 收藏 0

在介绍了Spring Cloud Steam的基础结构和核心概念之后,下面我们来详细地学习一下它所提供的一些核心注解的具体使用方法。

开启绑定功能

在Spring Cloud Stream中,我们需要通过@EnableBinding注解来为应用启动消息驱动的功能,该注解我们在快速入门中已经有了基本的介绍,下面来详细看看它的定义:

@Target({ElementType.TYPE,ElementType.ANNOTATION_TYPE})

@Retention(RetentionPolicy.RUNTIME)

@Documented

@Inherited

@Configuration

@Import({

ChannelBindingServiceConfiguration.class,

BindingBeansRegistrar.class,

BinderFactoryConfiguration.class,

SpelExpressionConverterConfiguration.class})

@EnableIntegration

public @interface EnableBinding {

Class<? >[]value()default {};

}

从该注解的定义中我们可以看到,它自身包含了@Configuration注解,所以用它注解的类也会成为Spring 的基本配置类。另外该注解还通过@Import 加载了 Spring Cloud Stream运行需要的几个基础配置类。

- ChannelBindingServiceConfiguration:该配置会加载消息通道绑定必要的一些实例,比如,用于处理消息通道绑定的ChannelBindingService 实例、消息类型转换器MessageConverterConfigurer、消息通道工厂BindableChannelFactory等重要实例,有兴趣的读者可以自行查看这些默认配置,以对其有更深入的理解。

- BindingBeansRegistrar:该类是ImportBeanDefinitionRegistrar接口的实现,主要是在Spring加载Bean的时候被调用,用来实现加载更多的Bean。由于BindingBeansRegistrar被@EnableBinding注解的@Import所引用,所以在其他配置加载完后,它的实现会被回调来创建其他的Bean,而这些Bean则从@EnableBinding注解的value属性定义的类中获取。就如我们入门实例中定义的@EnableBinding(Sink.class),它在加载用于消息驱动的基础Bean之后,会继续加载Sink中定义的具体消息通道绑定。

- BinderFactoryConfiguration:Binder工厂的配置,主要用来加载与消息中间件相关的配置信息,比如,它会从应用工程的META-INF/spring.binders中加载针对具体消息中间件相关的配置文件等。

- SpelExpressionConverterConfiguration:SpEL表达式转换器配置。

@EnableBinding注解只有一个唯一的属性:value。上面已经介绍过,由于该注解@Import了BindingBeansRegistrar实现,所以在加载了基础配置内容之后,它会回调来读取value中的类,以创建消息通道的绑定。另外,由于value是一个Class类型的数组,所以我们可以通过value属性一次性指定多个关于消息通道的配置。

绑定消息通道

在Spring Cloud Steam中,我们可以在接口中通过@Input和@Output注解来定义消息通道,而用于定义绑定消息通道的接口则可以被@EnableBinding注解的value参数来指定,从而在应用启动的时候实现对定义消息通道的绑定。

在快速入门的示例中,我们演示了使用Sink接口绑定的消息通道。Sink接口是Spring Cloud Steam提供的一个默认实现,除此之外还有Source和Processor,可从它们的源码中学习它们的定义方式:

public interface Sink {

String INPUT="input";

@Input(Sink.INPUT)

SubscribableChannel input();

}

public interface Source {

String OUTPUT="output";

@Output(Source.OUTPUT)

MessageChannel output();

}

public interface Processor extends Source,Sink {

}

从上面的源码中,我们可以看到,Sink和Source中分别通过@Input和@Output注解定义了输入通道和输出通道,而Processor通过继承Source和Sink的方式同时定义了一个输入通道和一个输出通道。

另外,@Input和@Output注解都还有一个value属性,该属性可以用来设置消息通道的名称,这里Sink和Source中指定的消息通道名称分别为input和output。如果我们直接使用这两个注解而没有指定具体的value值,将默认使用方法名作为消息通道的名称。

最后,需要注意一点,当我们定义输出通道的时候,需要返回 MessageChannel 接口对象,该接口定义了向消息通道发送消息的方法;而定义输入通道时,需要返回SubscribableChannel接口对象,该接口继承自MessageChannel接口,它定义了维护消息通道订阅者的方法。

注入绑定接口

在完成了消息通道绑定的定义之后,Spring Cloud Stream会为其创建具体的实例,而开发者只需要通过注入的方式来获取这些实例并直接使用即可。举个简单的例子,我们在快速入门示例中已经为Sink接口绑定的input消息通道实现了具体的消息消费者,下面可以通过注入的方式实现一个消息生成者,向input消息通道发送数据。

- 创建一个将Input消息通道作为输出通道的接口,具体如下:

public interface SinkSender {

@Output(Sink.INPUT)

MessageChannel output();

}

- 对快速入门中定义的SinkReceiver做一些修改:在@EnableBinding注解中增加对SinkSender接口的指定,使Spring Cloud Stream能创建出对应的实例。

@EnableBinding(value={Sink.class,SinkSender.class})

public class SinkReceiver {

private static Logger logger=LoggerFactory.getLogger(SinkReceiver.class);

@StreamListener(Sink.INPUT)

public void receive(Object payload){

logger.info("Received: "+payload);

}

}

- 创建一个单元测试类,通过@Autowired注解注入SinkSender的实例,并在测试用例中调用它的发送消息方法。

@RunWith(SpringJUnit4ClassRunner.class)

@SpringApplicationConfiguration(classes=HelloApplication.class)

@WebAppConfiguration

public class HelloApplicationTests {

@Autowired

private SinkSender sinkSender;

@Test

public void contextLoads(){

sinkSender.output().send(MessageBuilder.withPayload("From SinkSender").build());

}

}

- 运行该单元测试用例,如果可以在控制台中找到如下输出内容,表明我们的试验已经成功了,消息被正确地发送到了input通道中,并被相对应的消息消费者输出。

...

INFO 10656---[     main]com.didispace.HelloApplication     : Received:

From SinkSender

...

注入消息通道

由于Spring Cloud Stream会根据绑定接口中的@Input和@Output注解来创建消息通道实例,所以我们也可以通过直接注入的方式来使用消息通道对象。比如,我们可以通过下面的示例,注入上面例子中SinkSender接口中定义的名为input的消息输入通道。

@RunWith(SpringJUnit4ClassRunner.class)

@SpringApplicationConfiguration(classes=HelloApplication.class)

@WebAppConfiguration

public class HelloApplicationTests {

@Autowired

private MessageChannel input;

@Test

public void contextLoads(){

input.send(MessageBuilder.withPayload("From MessageChannel").build());

}

}

上面定义的内容,完成了与之前通过注入绑定接口SinkSender方式实现的测试用例相同的操作。因为在通过注入绑定接口实现时,sinkSender.output()方法实际获得的就是SinkSender接口中定义的MessageChannel实例,只是在这里我们直接通过注入的方式来实现了而已。这种用法虽然很直接,但是也容易犯错,很多时候我们在一个微服务应用中可能会创建多个不同名的MessageChannel实例,这样通过@Autowired注入时,要注意参数命名需要与通道同名才能被正确注入,或者也可以使用@Qualifier注解来特别指定具体实例的名称,该名称需要与定义 MessageChannel 的@Output 中的value参数一致,这样才能被正确注入。比如下面的例子,在一个接口中定义了两个输出通道,分别命名为Output-1和 Output-2,当要使用 Output-1的时候,可以通过@Qualifier("Output-1")来指定这个具体的实例来注入使用。

public interface MySource {

@Output("Output-1")

MessageChannel output1();

@Output("Output-2")

MessageChannel output2();

}

@Component

public class OutputSender {

@Autowired @Qualifier("Output-1")

private MessageChannel output;

...

}

消息生产与消费

由于 Spring Cloud Stream 是基于 Spring Integration 构建起来的,所以在使用 Spring Cloud Stream构建消息驱动服务的时候,完全可以使用Spring Integration的原生注解来实现各种业务需求。同时,为了简化面向消息的编程模型,Spring Cloud Stream 还提供了@StreamListener注解对输入通道的处理做了进一步优化。下面我们分别从这两方面来学习一下对消息的处理。

Spring Integration原生支持

通过之前的内容,我们已经能够通过注入绑定接口和消息通道的方式实现向名为input的消息通道发送信息。接下来,我们通过Spring Integration原生的@ServiceActivator和@InboundChannelAdapter注解来尝试实现相同的功能,具体实现如下:

@EnableBinding(value={Sink.class})

public class SinkReceiver {

private static Logger logger=LoggerFactory.getLogger(SinkReceiver.class);

@ServiceActivator(inputChannel=Sink.INPUT)

public void receive(Object payload){

logger.info("Received: "+payload);

}

}

@EnableBinding(value={SinkSender.SinkOutput.class})

public class SinkSender {

private static Logger logger=LoggerFactory.getLogger(SinkSender.class);

@Bean

@InboundChannelAdapter(value=SinkOutput.OUTPUT,poller=@Poller(fixedDelay="2000"))

public MessageSource<Date> timerMessageSource(){

return()-> new GenericMessage<>(new Date());

}

public interface SinkOutput {

String OUTPUT="input";

@Output(SinkOutput.OUTPUT)

MessageChannel output();

}

}

上面展示的两段代码分别属于两个不同的应用。

- SinkReceiver类属于消息消费者实现,与之前实现的类似,只是做了一些修改:使用原生的@ServiceActivator 注解替换了@StreamListener,实现对Sink.INPUT通道的监听处理,而该通道绑定了名为input的主题。

- SinkSender类属于消息生产者实现,它在内部定义了SinkOutput接口来将输出通道绑定到名为input的主题中。由于SinkSender和SinkReceiver共用一个主题,所以它们构成了一组生产者与消费者。另外,在SinkSender 中还创建了用于生产消息的 timerMessageSource 方法,该方法会将当前时间作为消息返回。而@InboundChannelAdapter注解定义了该方法是对SinkOutput.OUTPUT通道的输出绑定,同时使用poller参数将该方法设置为轮询执行,这里我们定义为2000毫秒,所以它会以2秒的频率向SinkOutput.OUTPUT通道输出当前时间。执行上面定义的程序,可以得到类似下面的输出:

INFO 248---[ask-scheduler-2]com.didispace.HelloApplication     : Received:

Sun Nov 13 11:22:39 CST 2016

INFO 248---[ask-scheduler-3]com.didispace.HelloApplication     : Received:

Sun Nov 13 11:22:41 CST 2016

INFO 248---[ask-scheduler-2]com.didispace.HelloApplication     : Received:

Sun Nov 13 11:22:43 CST 2016

INFO 248---[ask-scheduler-1]com.didispace.HelloApplication     : Received:

Sun Nov 13 11:22:45 CST 2016

INFO 248---[ask-scheduler-5]com.didispace.HelloApplication     : Received:

Sun Nov 13 11:22:47 CST 2016

另外,还可以通过@Transformer注解对指定通道的消息进行转换。比如,我们可以在上面的SinkSender类中增加下面的内容:

@Transformer(inputChannel=Sink.INPUT,outputChannel=Sink.INPUT)

public Object transform(Date message){

return new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(message);

}

再次执行程序,可以得到下面的输出内容,消息内容被成功转换为yyyy-MM-dd HH:mm:ss格式了。

INFO 22092---[ask-scheduler-2]com.didispace.HelloApplication    :Received:

2016-11-13 12:12:21

INFO 22092---[ask-scheduler-3]com.didispace.HelloApplication    :Received:

2016-11-13 12:12:23

INFO 22092---[ask-scheduler-2]com.didispace.HelloApplication    :Received:

2016-11-13 12:12:25

INFO 22092---[ask-scheduler-1]com.didispace.HelloApplication    :Received:

2016-11-13 12:12:27

INFO 22092---[ask-scheduler-5]com.didispace.HelloApplication    :Received:

2016-11-13 12:12:29

更多关于Spring Integration的使用细节请查阅官方文档。

@StreamListener详解

通过入门示例,对于@StreamListener注解,大家应该都已经有了一些基本的认识,通过该注解修饰的方法,Spring Cloud Steam会将其注册为输入消息通道的监听器。当输入消息通道中有消息到达的时候,会立即触发该注解修饰方法的处理逻辑对消息进行消费。

消息转换

@SteamListener和@ServiceActivator注解都实现了对输入消息通道的监听,但是@SteamListener相比@ServiceActivator更为强大,因为它还内置了一系列的消息转换功能,这使得基于@SteamListener注解实现的消息处理模型更为简单。

大部分情况下,我们通过消息来对接服务或系统时,消息生产者都会以结构化的字符串形式来发送,比如JSON或XML。当消息到达的时候,输入通道的监听器需要对该字符串做一定的转化,将JSON或XML转换成具体的对象,然后再做后续的处理。

假设,我们需要传输一个User对象,该对象有name和age两个字段,这时,如果使用@ServiceActivator注解的话,可以通过下面的代码实现:

@EnableBinding(value={Sink.class})

public class SinkReceiver {

private static Logger logger=LoggerFactory.getLogger(SinkReceiver.class);

@ServiceActivator(inputChannel=Sink.INPUT)

public void receive(User user){

logger.info("Received: "+user);

}

@Transformer(inputChannel=Sink.INPUT,outputChannel=Sink.INPUT)

public User transform(String message)throws Exception {

ObjectMapper objectMapper=new ObjectMapper();

User user=objectMapper.readValue(message,User.class);

return user;

}

}

@EnableBinding(value={SinkSender.SinkOutput.class})

public class SinkSender {

private static Logger logger=LoggerFactory.getLogger(SinkSender.class);

@Bean

@InboundChannelAdapter(value=Sink.INPUT,poller=@Poller(fixedDelay="2000"))

public MessageSource<String> timerMessageSource(){

return()-> new GenericMessage<>("{\"name\":\"didi\",\"age\":30}");

}

public interface SinkOutput {

String OUTPUT="input";

@Output(SinkOutput.OUTPUT)

MessageChannel output();

}

}

由于@ServiceActivator 本身不具备对消息的转换能力,所以当代表 User 对象的JSON 字符串到达后,它自身无法将其转换成 User 对象。所以,这里需要通过@Transformer 注解帮助将字符串类型的消息转换成 User 对象,并将转换结果传递给@ServiceActivator的处理方法做后续的消费。

如果我们使用@SteamListener注解的话,就可以把上面的实现简化为下面的代码:

@EnableBinding(value={Sink.class})

public class SinkReceiver {

private static Logger logger=LoggerFactory.getLogger(SinkReceiver.class);

@StreamListener(Sink.INPUT)

public void receive(User user){

logger.info("Received: "+user);

}

}

从上面的实现中可以看到,我们去掉了@Transformer 注解的方法,同时用@StreamListener替代了@ServiceActivator注解,而SinkSender类不需要做任何修改,只需在配置文件中增加spring.cloud.stream.bindings.input.contenttype=application/json属性设置,这样我们可以得到与之前一样的结果。

@StreamListener注解能够通过配置属性实现JSON字符串到对象的转换,这是因为在Spring Cloud Steam中实现了一套可扩展的消息转换机制。在消息消费逻辑执行之前,消息转换机制会根据消息头信息中声明的消息类型(即上面对 input 通道配置的content-type参数),找到对应的消息转换器并实现对消息的自动转换。

消息反馈

很多时候在处理完输入消息之后,需要反馈一个消息给对方,这时候可以通过@SendTo注解来指定返回内容的输出通道。

比如,假设我们有这样的两个应用,App1和App2。

- App1中实现了对input输入通道的监听,并且在接收到消息之后,对消息做了一些简单加工,然后通过@SendTo 把处理方法返回的内容以消息的方式发送到output通道中,具体如下:

@EnableBinding(value={Processor.class})

public class App1 {

private static Logger logger=LoggerFactory.getLogger(App1.class);

@StreamListener(Processor.INPUT)

@SendTo(Processor.OUTPUT)

public Object receiveFromInput(Object payload){

logger.info("Received: "+payload);

return "From Input Channel Return-"+payload;

}

}

- App2是App1应用中input通道的生产者以及output通道的消费者。为了继续使用 Processor 绑定接口的定义,我们可以在配置文件中将该应用的 input 和output绑定反向地做一些配置。因为对于App2来说,它的input绑定通道实际上是对output主题的消费者,而output绑定通道实际上是对input主题的生产者,所以我们可以做如下具体配置。指定通道的具体主题来实现与 App1应用的消息交互:

spring.cloud.stream.bindings.input.destination=output

spring.cloud.stream.bindings.output.destination=input

- App2中程序的逻辑实现如下所示。通过timerMessageSource方法,向output绑定的输出通道中发送内容,也就是向名为input的主题中传递消息,这样App1中的input输入通道就可以收到这个消息。同时,这里还创建了对input输入消息通道的绑定。通过上面的配置,它会监听来自 output 主题的消息,通过receiveFromOutput方法,会将消息内容输出。

@EnableBinding(value={Processor.class})

public class App2 {

private static Logger logger=LoggerFactory.getLogger(App2.class);

@Bean

@InboundChannelAdapter(value=Processor.OUTPUT,poller=@Poller(fixedDelay=

"2000"))

public MessageSource<Date> timerMessageSource(){

return()-> new GenericMessage<>(new Date());

}

@StreamListener(Processor.INPUT)

public void receiveFromOutput(Object payload){

logger.info("Received: "+payload);

}

}

将App1和App2同时运行起来,我们分别可以从它们的控制台中看到如下内容。

- App1应用的控制台输出如下所示,它输出了来自App2应用中定义的向input主题中轮询发送的时间。

INFO 28180---[mWYODqFPjxMmg-1]com.didispace.HelloApplication   : Received:

Wed Nov 16 08:19:16 CST 2016

INFO 28180---[mWYODqFPjxMmg-1]com.didispace.HelloApplication   : Received:

Wed Nov 16 08:19:18 CST 2016

INFO 28180---[mWYODqFPjxMmg-1]com.didispace.HelloApplication   : Received:

Wed Nov 16 08:19:20 CST 2016

- App2应用的控制台输出如下所示,由于App1应用在打印了input主题中的消息之后做了一些简单的字符串拼接,然后将拼接后的字符串输出到output主题,而下面的输出内容正是App2应用从output主题中获取的消息内容。

INFO 31916---[xmrMN07yhd8Ag-1]com.didispace.HelloApplication   : Received:

From Input Channel Return-Wed Nov 16 08:19:16 CST 2016

INFO 31916---[xmrMN07yhd8Ag-1]com.didispace.HelloApplication   : Received:

From Input Channel Return-Wed Nov 16 08:19:18 CST 2016

INFO 31916---[xmrMN07yhd8Ag-1]com.didispace.HelloApplication   : Received:

From Input Channel Return-Wed Nov 16 08:19:20 CST 2016

在Spring Cloud Stream中除了可以使用@SendTo注解将方法返回结果输出到消息通道中,还可以使用原生注解@ServiceActivator 的 outputChannel 属性配置输出通道把返回结果发送给消息中间件。根据上面的例子,我们可以将App1应用修改成下面这样,它们实现的效果是一样的。

@EnableScheduling

@EnableBinding(value={Processor.class})

public class App1 {

private static Logger logger=LoggerFactory.getLogger(App1.class);

@ServiceActivator(inputChannel=Processor.INPUT,outputChannel=Processor.OUTPUT)

public Object receiveFromInput(Object payload){

logger.info("Received: "+payload);

return "From Input Channel Return-"+payload;

}

}

响应式编程

在Spring Cloud Stream中还支持使用基于RxJava的响应式编程来处理消息的输入和输出。与RxJava的整合使用同样很容易,下面我们详细看看如何使用RxJava实现上面消息反馈中试验的场景:App1应用接收来自App2应用发送到input主题的消息,并返回一条消息到output主题供App2应用消息输出。

- 在pom.xml中引入spring-cloud-stream-rxjava依赖,具体如下:

<dependency>

<groupId>org.springframework.cloud</groupId>

<artifactId>spring-cloud-stream-rxjava</artifactId>

<version>1.0.2.RELEASE</version>

</dependency>

- 改造App1的实现,具体如下:

@EnableRxJavaProcessor

public class App1 {

private static Logger logger=LoggerFactory.getLogger(App1.class);

@Bean

public RxJavaProcessor<String,String> processor(){

return inputStream-> inputStream.map(data-> {

logger.info("Received: "+data);

return data;

}).map(data-> String.valueOf("From Input Channel Return-"+data));

}

}

通过上面的改造,我们再次运行App1和App2就可以得到与上一节消息反馈中的试验相同的结果。下面对前面的实现内容做一些说明。

- 在 App1的类名上,我们使用@EnableRxJavaProcessor 替代了原来的@EnableBinding(value={Processor.class}),该注解用来标识当前类中应该提供一个RxJavaProcessor实现的Bean。另外从其源码中还可以看到它自身就实现了对Processor定义通道的绑定:@EnableBinding({Processor.class})。

@Target({ElementType.TYPE})

@Retention(RetentionPolicy.RUNTIME)

@Documented

@Inherited

@EnableBinding({Processor.class})

@Import({RxJavaProcessorConfiguration.class})

public @interface EnableRxJavaProcessor {

}

- 在App1中还定义了一个RxJavaProcessor的实现Bean。在RxJavaProcessor接口中定义了一个用来处理输入通道和返回内容给输出通道的 process 方法,由于输入输出都采用了Observable,所以该方法只会在应用启动的时候调用一次,用来设置数据流。当有消息到达输入通道时,会采用RxJava实现的观察者模式来消费和输出内容。

public interface RxJavaProcessor<I,O> {

Observable<O> process(Observable<I> input);

}

- 除了实现上面的场景之外,通过利用RxJava的支持,我们还能轻易地实现消息的缓存聚合。比如,我们希望App1在接收到5条消息之后才将处理结果返回给输出通道,那么只需通过下面的改进即可轻松实现这样的场景:

@EnableRxJavaProcessor

public class App1 {

@Bean

public RxJavaProcessor<String,String> processor(){

private static Logger logger=LoggerFactory.getLogger(App1.class);

return inputStream-> inputStream.map(data-> {

logger.info("Received: "+data);

return data;

}).buffer(5).map(data-> String.valueOf("From Input Channel Return-"+data));

}

}

再次执行App1和App2应用,此时我们在App1应用中会得到一样的输出,但是在App2应用中会得到下面的输出:

INFO 8796---[-maOhv-th7Raw-1]com.didispace.HelloApplication     : Received:

From Input Channel Return-[1479390916408,1479390918409,1479390920412,1479390922414,

1479390924415]

INFO 8796---[-maOhv-th7Raw-1]com.didispace.HelloApplication     : Received:

From Input Channel Return-[1479390926416,1479390928418,1479390930419,1479390932421,

1479390934423]

消费组与消息分区

在“核心概念”一节中,我们对消费组和消息分区已经进行了基本的介绍,在这里来详细介绍一下这两个概念的使用方法。

消费组

通常每个服务都不会以单节点的方式运行在生产环境中,当同一个服务启动多个实例的时候,这些实例会绑定到同一个消息通道的目标主题上。默认情况下,当生产者发出一条消息到绑定通道上,这条消息会产生多个副本被每个消费者实例接收和处理。但是在有些业务场景之下,我们希望生产者产生的消息只被其中一个实例消费,这个时候就需要为这些消费者设置消费组来实现这样的功能。实现的方式非常简单,只需在服务消费者端设置spring.cloud.stream.bindings.input.group属性即可,比如可以像下面这样实现。

- 可以先实现一个消费者应用SinkReceiver,实现greetings主题上的输入通道绑定,它的实现如下:

@EnableBinding(value={Sink.class})

public class SinkReceiver {

private static Logger logger=LoggerFactory.getLogger(SinkReceiver.class);

@StreamListener(Sink.INPUT)

public void receive(User user){

logger.info("Received: "+user);

}

}

- 为了将 SinkReceiver 的输入通道目标设置为greetings 主题,以及将该服务的实例设置为同一个消费组,可做如下设置:

spring.cloud.stream.bindings.input.group=Service-A

spring.cloud.stream.bindings.input.destination=greetings

通过spring.cloud.stream.bindings.input.group属性指定了该应用实例都属于 Service-A 消费组,而 spring.cloud.stream.bindings.input.destination属性则指定了输入通道对应的主题名。

- 完成了消息消费者应用之后,我们再来实现一个消息生产者应用SinkSender,具体如下:

@EnableBinding(value={Source.class})

public class SinkSender {

private static Logger logger=LoggerFactory.getLogger(SinkSender.class);

@Bean

@InboundChannelAdapter(value=Source.OUTPUT,poller=@Poller(fixedDelay="2000"))

public MessageSource<String> timerMessageSource(){

return()-> new GenericMessage<>("{\"name\":\"didi\",\"age\":30}");

}

}

- 为消息生产者 SinkSender 做一些设置,让它的输出通道绑定目标也指向greetings主题,具体如下:

spring.cloud.stream.bindings.output.destination=greetings

到这里,对于消费分组的示例就完成了。分别运行上面实现的生产者与消费者,其中消费者我们启动多个实例。通过控制台,可以发现,每个生产者发出的消息会被启动的消费者以轮询的方式进行接收和输出。

消息分区

通过消费组的设置,虽然我们已经能够在多实例环境下,保证同一消息只被一个消费者实例进行接收和处理,但是,对于一些特殊场景,除了要保证单一实例消费之外,还希望那些具备相同特征的消息都能够被同一个实例进行消费。这时候我们就需要对消息进行分区处理。

在Spring Cloud Stream中实现消息分区非常简单,我们对消费组示例做一些配置修改就能实现,具体如下所示。

- 在消费者应用SinkReceiver中,对配置文件做一些修改,具体如下:

spring.cloud.stream.bindings.input.group=Service-A

spring.cloud.stream.bindings.input.destination=greetings

spring.cloud.stream.bindings.input.consumer.partitioned=true

spring.cloud.stream.instanceCount=2

spring.cloud.stream.instanceIndex=0

从上面的配置中,我们可以看到增加了下面这三个参数。

1.spring.cloud.stream.bindings.input.consumer.partitioned:通过该参数开启消费者分区功能。

2.spring.cloud.stream.instanceCount:该参数指定了当前消费者的总实例数量。

3.spring.cloud.stream.instanceIndex:该参数设置当前实例的索引号,从0开始,最大值为spring.cloud.stream.instanceCount参数-1。试验的时候需要启动多个实例,可以通过运行参数来为不同实例设置不同的索引值。

- 在生产者应用SinkSender中,对配置文件也做一些修改,具体如下所示。

spring.cloud.stream.bindings.output.destination=greetings

spring.cloud.stream.bindings.output.producer.partitionKeyExpression=payload

spring.cloud.stream.bindings.output.producer.partitionCount=2

从上面的配置中,我们可以看到增加了下面这两个参数。

1.spring.cloud.stream.bindings.output.producer.partitionKeyExpression:通过该参数指定了分区键的表达式规则,我们可以根据实际的输出消息规则配置SpEL来生成合适的分区键。

2.spring.cloud.stream.bindings.output.producer.partitionCount:该参数指定了消息分区的数量。

到这里消息分区配置就完成了,我们可以再次启动这两个应用,同时启动多个消费者。但需要注意的是,要为消费者指定不同的实例索引号,这样当同一个消息被发送给消费组时,可以发现只有一个消费实例在接收和处理这些相同的消息。

消息类型

Spring Cloud Stream为了让开发者能够在消息中声明它的内容类型,在输出消息中定义了一个默认的头信息:contentType。对于那些不直接支持头信息的消息中间件,Spring Cloud Stream 提供了自己的实现机制,它会在消息发出前自动将消息包装进它自定义的消息封装格式中,并加入头信息。而对于那些自身就支持头信息的消息中间件,Spring Cloud Stream构建的服务可以接收并处理来自非Spring Cloud Stream构建但包含符合规范头信息的应用程序发出的消息。

Spring Cloud Stream允许使用spring.cloud.stream.bindngs.<channelName>.content-type属性以声明式的配置方式为绑定的输入和输出通道设置消息内容的类型。此外,原生的消息类型转换器依然可以轻松地用于我们的应用程序。目前,Spring Cloud Stream中自带支持了以下几种常用的消息类型转换。

- JSON与POJO的互相转换。

- JSON与org.springframework.tuple.Tuple的互相转换。

- Object 与 byte[]的互相转换。为了实现远程传输序列化的原始字节,应用程序需要发送 byte 类型的数据,或是通过实现 Java 的序列化接口来转换为字节(Object对象必须可序列化)。

- String与byte[]的互相转换。

- Object向纯文本的转换:Object需要实现toString()方法。

上面所指的 JSON 类型可以表现为一个 byte 类型的数组,也可以是一个包含有效JSON内容的字符串。另外,Object对象可以由JSON、byte数组或者字符串转换而来,但是在转换为JSON的时候总是以字符串的形式返回。

MIME类型

在Spring Cloud Stream中定义的content-type属性采用了Media Type,即Internet Media Type(互联网媒体类型),也被称为MIME类型,常见的有application/json、text/plain; charset=UTF-8,相信接触过HTTP的工程师们对这些类型都不会感到陌生。MIME类型对于标示如何转换为String或byte[]非常有用。并且,我们还可以使用 MIME 类型格式来表示 Java 类型,只需要使用带有类型参数的一般类型:application/x-java-object。比如,我们可以使用 application/x-javaobject; type=java.util.Map来表示传输的是一个java.util.Map对象,或是使用application/x-java-object; type=com.didispace.User 来表示传输的是一个com.didispace.User对象;除此之外,更重要的是,它还提供了自定义的MIME类型,比如通过application/x-spring-tuple来指定Spring的Tuple类型。

在Spring Cloud Stream中默认提供了一些可以开箱即用的类型转换器,具体如下表所示。

消息类型的转换行为只会在需要进行转换时才被执行,比如,当服务模块产生了一个头信息为application/json的XML字符串消息,Spring Cloud Stream是不会将该XML字符串转换为JSON 的,这是因为该模块的输出内容已经是一个字符串类型了,所以它并不会将其做进一步的转换。

另外需要注意的是,Spring Cloud Stream虽然同时支持输入通道和输出通道的消息类型转换,但还是推荐开发者尽量在输出通道中做消息转换。因为对于输入通道的消费者来说,当目标是一个POJO的时候,使用@StreamListener注解是能够支持自动对其进行转换的。

Spring Cloud Stream除了提供上面这些开箱即用的转换器之外,还支持开发者自定义的消息转换器。这使得我们可以使用任意格式(包括二进制)的数据进行发送和接收,并且将这些数据与特定的contentType相关联。在应用启用的时候,Spring Cloud Stream会将所有org.springframework.messaging.converter.MessageConverter接口实现的自定义转换器以及默认实现的那些转换器都加载到消息转换工厂中,以提供给消息处理时使用。

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
    原文