- 推荐序一
- 推荐序二
- 推荐序三
- 推荐语
- 前言
- 第1章 基础知识
- 第2章 微服务构建:Spring Boot
- 第3章 服务治理:Spring Cloud Eureka
- 第4章 客户端负载均衡:Spring Cloud Ribbon
- 第5章 服务容错保护:Spring Cloud Hystrix
- 第6章 声明式服务调用:Spring Cloud Feign
- 第7章 API网关服务:Spring Cloud Zuul
- 第8章 分布式配置中心:Spring Cloud Config
- 第9章 消息总线:Spring Cloud Bus
- 第10章 消息驱动的微服务:Spring Cloud Stream
- 附录 A Starter POMs
- 后记
快速入门
下面我们通过构建一个简单的示例来对Spring Cloud Stream有一个初步认识。该示例的主要目标是构建一个基于Spring Boot的微服务应用,这个微服务应用将通过使用消息中间件 RabbitMQ 来接收消息并将消息打印到日志中。所以,在进行下面的步骤之前请先确认已经在本地安装了RabbitMQ,具体安装步骤可根据在上一章中的介绍来操作。
- 创建一个基础的Spring Boot工程,命名为stream-hello。
- 编辑pom.xml中的依赖关系,引入Spring Cloud Stream对RabbitMQ的支持,具体如下:
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.3.7.RELEASE</version>
<relativePath/> <!--lookup parent from repository-->
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>Brixton.SR5</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
- 创建用于接收来自RabbitMQ消息的消费者SinkReceiver,具体如下:
@EnableBinding(Sink.class)
public class SinkReceiver {
private static Logger logger=LoggerFactory.getLogger(HelloApplication.class);
@StreamListener(Sink.INPUT)
public void receive(Object payload){
logger.info("Received: "+payload);
}
}
- 创建应用主类,这里同其他Spring Boot一样,没有什么特别之处,具体如下:
@SpringBootApplication
public class HelloApplication {
public static void main(String[]args){
SpringApplication.run(HelloApplication.class,args);
}
}
到这里,快速入门示例的编码任务已经完成了。下面我们分别启动 RabbitMQ 以及该Spring Boot应用,然后做下面的测试,看看它们是如何运作的。
- 先来看一下Spring Boot应用的启动日志。
...
INFO 16272---[main]o.s.c.s.b.r.RabbitMessageChannelBinder :declaring queue for
inbound: input.anonymous.Y8VsFILmSC27eS5StsXp6A,bound to: input
INFO 16272---[main]o.s.a.r.c.CachingConnectionFactory : Created new
connection: SimpleConnection@3c78e551[delegate=amqp://guest@127.0.0.1:5672/]
INFO 16272---[main]o.s.integration.channel.DirectChannel : Channel
'input.anonymous.Y8VsFILmSC27eS5StsXp6A.bridge' has 1 subscriber(s).
INFO 16272---[main]o.s.i.a.i.AmqpInboundChannelAdapter : started
inbound.input.anonymous.Y8VsFILmSC27eS5StsXp6A
...
从上面的日志内容中,可以获得以下信息:
- 使用 guest 用户创建了一个指向127.0.0.1:5672位置的 RabbitMQ 连接,在RabbitMQ的控制台中我们也可以发现它。
- 声明了一个名为input.anonymous.Y8VsFILmSC27eS5StsXp6A的队列,并通过RabbitMessageChannelBinder将自己绑定为它的消费者。这些信息我们也能在RabbitMQ的控制台中发现它们。
下面我们可以在 RabbitMQ 的控制台中进入 input.anonymous.Y8VsFILmSC27eS5StsXp6A队列的管理页面,通过Publish message功能来发送一条消息到该队列中。
此时,我们可以在当前启动的Spring Boot应用程序的控制台中看到下面的内容:
INFO 16272---[C27eS5StsXp6A-1]com.didispace.HelloApplication :Received:
[B@7cba610e
可以发现在应用控制台中输出的内容就是 SinkReceiver 中的 receive 方法定义的,而输出的具体内容则来自消息队列中获取的对象。这里由于我们没有对消息进行序列化,所以输出的只是该对象的引用,在后面的章节中我们会详细介绍接收消息后的处理。
在顺利完成上面快速入门的示例后,我们简单解释一下上面的步骤是如何将我们的Spring Boot应用连接上RabbitMQ来消费消息以实现消息驱动业务逻辑的。
首先,我们对 Spring Boot 应用做的就是引入 spring-cloud-starter-streamrabbit依赖,该依赖包是Spring Cloud Stream对RabbitMQ支持的封装,其中包含了对RabbitMQ 的自动化配置等内容。从下面它定义的依赖关系中,我们还可以知道它等价于spring-cloud-stream-binder-rabbit依赖。
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>
</dependencies>
接着,我们再来看看这里用到的几个Spring Cloud Stream的核心注解,它们都被定义在SinkReceiver中。
- @EnableBinding,该注解用来指定一个或多个定义了@Input 或@Output 注解的接口,以此实现对消息通道(Channel)的绑定。在上面的例子中,我们通过@EnableBinding(Sink.class)绑定了 Sink 接口,该接口是 Spring Cloud Stream中默认实现的对输入消息通道绑定的定义,它的源码如下:
public interface Sink {
String INPUT="input";
@Input(Sink.INPUT)
SubscribableChannel input();
}
它通过@Input注解绑定了一个名为input的通道。除了Sink之外,Spring Cloud Stream 还默认实现了绑定 output 通道的 Source 接口,还有结合了 Sink 和Source的Processor接口,实际使用时我们也可以自己通过@Input和@Output注解来定义绑定消息通道的接口。当需要为@EnableBinding 指定多个接口来绑定消息通道的时候,可以这样定义:@EnableBinding(value={Sink.class,Source.class})。
- @StreamListener:如果已经读过上一章Spring Cloud Bus的源码分析,相信对该注解不会感到陌生。它主要定义在方法上,作用是将被修饰的方法注册为消息中间件上数据流的事件监听器,注解中的属性值对应了监听的消息通道名。在上面的例子中,我们通过@StreamListener(Sink.INPUT)注解将receive方法注册为input消息通道的监听处理器,所以当我们在RabbitMQ的控制页面中发布消息的时候,receive方法会做出对应的响应动作。
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论