分布式消息队列 XXL-MQ
一、简介
1.1 概述
XXL-MQ是一款轻量级分布式消息队列,拥有 “水平扩展、高可用、海量数据堆积、单机TPS过10万、毫秒级投递” 等特性,
支持 “并发消息、串行消息、广播消息、延迟消息、事务消费、失败重试、超时控制” 等消息特性。现已开放源代码,开箱即用。
1.2 特性
- 1、简单易用: 一行代码即可发布一条消息; 一行注解即可订阅一个消息主题;
- 2、轻量级: 部署简单,不依赖第三方服务,一分钟上手;
- 3、水平扩展:消息中心支持无限水平扩展,这里的水平扩展包括两方面:消息生产能力、消息消费能力;通过集群扩展线性提升消息吞吐能力;
- 4、高可用:消息中心能够忍受部分示例失效,不影响整个集群的可用性。通过内置注册中心可以实现秒级摘除失效节点,消息服务动态转移;
- 5、消息持久化:全部消息持久化存储,消息中心支持通过配置选择是否清理过期消息。
- 6、强数据安全:消息数据存储在DB中,可事务保障数据安全,防止消息数据丢失;
- 7、海量数据堆积:消息数据存储在DB中,原生兼容支持 “MySQL、TIDB” 两种存储方式,前者支持千万级消息堆积,后者支持百亿级别消息堆积(TIDB理论上无上限);
- 8、单机TPS过10W:单机TPS受限于DB存储方式,选型 “MySQL” 时单机TPS过万,选型 “TIDB” 时单机TPS过10万;
- 9、毫秒级投递延迟:消息中心与客户端通过RPC的方式进行消息通讯,毫秒级延时;
- 10、多种消息模式:
- 并行消息:消息平均分配在该主题在线消费者,分片方式并行消费;适用于吞吐量较大的消息场景,如邮件发送、短信发送等业务逻辑
- 串行消息:消息固定分配给该主题在线消费者中其中一个,FIFO方式串行消费;适用于严格限制并发的消息场景,如秒杀、抢单等排队业务逻辑;
- 广播消息:消息将会广播发送给该主题在线消费者分组,全部分组都会消费该消息,但是一个分组下只会消费一次;适用于广播场景,如广播更新缓存等
- 11、延时消息: 支持设置消息的延迟生效时间, 到达设置的生效时间时该消息才会被消费;适用于延时消费场景,如订单超时取消等;
- 12、事务性: 消费者开启事务开关后,消息事务性保证只会成功执行一次;
- 13、失败重试: 支持设置消息的重试次数, 在消息执行失败后将会按照设置的值进行消息重试执行,直至重试次数耗尽或者执行成功;
- 14、超时控制: 支持自定义消息超时时间,消息消费超时将会主动中断;
- 15、消息可见: 系统中每一条消息可通过Web界面在线查看,甚至支持编辑消息内容和消息状态;
- 16、消息可追踪: 支持追踪每一条消息的执行路径, 便于排查业务问题;
- 17、消息失败告警:支持以Topic粒度监控消息,存在失败消息时主动推送告警邮件;默认提供邮件方式失败告警,同时预留扩展接口,可方面的扩展短信、钉钉等告警方式;
- 18、容器化:提供官方docker镜像,并实时更新推送dockerhub,进一步实现产品开箱即用;
- 19、访问令牌(accessToken):为提升系统安全性,消息中心和客户端进行安全性校验,双方AccessToken匹配才允许通讯;
1.3 发展
于2015年中,我在github上创建XXL-MQ项目仓库并提交第一个commit,随之进行系统结构设计,UI选型,交互设计……
Why MQ
- 异步: 很多场景下,不会立即处理消息,此时可以在MQ中存储message,并在某一时刻再进行处理;
- 解耦: 不同进程间添加一层实现解耦,方便今后的扩展。
- 消
除峰值:
在高并发环境下,由于来不及同步处理,请求往往会发生堵塞,比如大量的insert,update之类的请求同时到达mysql,直接导致无数的行锁表
锁,甚至最后请求会堆积过多,从而触发too manyconnections错误。通过使用消息队列,我们可以异步处理请求,从而缓解系统的压力。 - 耗时业务: 在一些比较耗时的业务场景中, 可以耗时较多的业务解耦通过异步队列执行, 提高系统响应速度和吞吐量;
Why XXL-MQ
目前流行的ActiveMQ、RabbitMQ和ZeroMQ等消息队列的软件中,大多为了实现AMQP,STOMP,XMPP之类的协议,变得极其重量级(如新版本Activemq建议分配内存达1G+),但在很多Web应用中的实际情况是:我们只是想找到一个缓解高并发请求的解决方案,一个轻量级的消息
队列实现方式才是我们真正需要的。
1.4 下载
源码仓库地址
源码仓库地址 | Release Download |
---|---|
https://github.com/xuxueli/xxl-mq | Download |
https://gitee.com/xuxueli0323/xxl-mq | Download |
中央仓库地址
<dependency> <groupId>com.xuxueli</groupId> <artifactId>xxl-mq-client</artifactId> <version>{最新Release版本}</version> </dependency>
1.5 环境
- Maven3+
- Jdk1.7+
- Mysql5.6+ (或 TIDB)
二、快速入门
2.1 初始化”消息中心数据库”
请下载项目源码并解压,获取 “消息中心数据库初始化SQL脚本” 并执行即可
“消息中心数据库初始化SQL脚本” 位置为:
/xxl-mq/doc/db/xxl-mq-mysql.sql
消息中心支持集群部署,集群情况下各节点务必连接同一个mysql实例;
注意:消息中心数据库,原生兼容支持 “MySQL、TIDB” 两种存储方式,前者支持千万级消息堆积,后者支持百亿级别消息堆积(TIDB理论上无上限);
可视情况选择使用,当选择TIDB时,仅需要修改消息中心数据库连接jdbc地址配置即可,其他部分如SQL和驱动兼容MySQL和TIDB使用,不需修改。
2.2 编译项目
解压源码,按照maven格式将源码导入IDE, 使用maven进行编译即可,源码结构如下:
- /xxl-mq-admin :消息中心,提供消息Broker、服务注册、消息在线管理功能; - /xxl-mq-client :客户端核心依赖, 提供API开发Producer和Consumer; - /xxl-mq-samples :接入项目参考示例, 可自行参考学习并使用; - /xxl-mq-samples-frameless :无框架示例项目,不依赖第三方框架,只需main方法即可启动运行; - /xxl-mq-samples-springboot :springboot版本示例项目;
2.3 配置部署“消息中心”
步骤一:消息中心配置:
消息中心配置文件地址:
/xxl-mq/xxl-mq-admin/src/main/resources/application.properties
消息中心配置内容说明:
### 数据库配置 spring.datasource.url=jdbc:mysql://127.0.0.1:3306/xxl-mq?Unicode=true&characterEncoding=UTF-8 ### 告警邮箱发送方配置 spring.mail.username=xxx@qq.com spring.mail.password=xxx ### 注册心跳时间 xxl.mq.registry.beattime=10 ### 注册信息磁盘存储目录,务必拥有读写权限; xxl.mq.registry.data.filepath=/data/applogs/xxl-mq/registrydata ### 消息中心Broker服务RPC通讯地址,为空则自动获取 xxl-mq.rpc.remoting.ip= ### 消息中心Broker服务RPC通讯端口 xxl-mq.rpc.remoting.port=7080 ### 日志保存天数,超过该阈值的成功消息将会被自动清理;大于等于3时生效 xxl.mq.log.logretentiondays=3 ### 登陆信息配置 xxl.mq.login.username=admin xxl.mq.login.password=123456
步骤二:部署项目:
如果已经正确进行上述配置,可将项目编译打包部署。
消息中心访问地址:http://localhost:8080/xxl-mq-admin (该地址接入方项目将会使用到,作为注册地址),登录后运行界面如下图所示
至此“消息中心”项目已经部署成功。
步骤三:消息中心集群(可选):
消息中心支持集群部署,提升消息系统容灾和可用性。
消息中心集群部署时,几点要求和建议:
- DB配置保持一致;
- 登陆账号配置保持一致;
- 集群机器时钟保持一致(单机集群忽视);
- 建议:推荐通过nginx为消息中心集群做负载均衡,分配域名。消息中心访问、客户端使用等操作均通过该域名进行。
其他:Docker 镜像方式搭建消息中心:
- 下载镜像
// Docker地址:https://hub.docker.com/r/xuxueli/xxl-mq-admin/ docker pull xuxueli/xxl-mq-admin
- 创建容器并运行
docker run -p 8080:8080 -v /tmp:/data/applogs --name xxl-mq-admin -d xuxueli/xxl-mq-admin /** * 如需自定义 mysql 等配置,可通过 "PARAMS" 指定,参数格式 RAMS="--key=value --key2=value2" ; * 配置项参考文件:/xxl-mq/xxl-mq-admin/src/main/resources/application.properties */ docker run -e PARAMS="--spring.datasource.url=jdbc:mysql://127.0.0.1:3306/xxl-mq?Unicode=true&characterEncoding=UTF-8" -p 8080:8080 -p 7080 -v /tmp:/data/applogs --name xxl-mq-admin -d xuxueli/xxl-mq-admin
2.4 接入XXL-MQ并使用
接入XXL-MQ项目:"xxl-mq-samples-springboot" (提供多种版本示例项目供参考选择,现以springboot版本为例讲解) 作用:生产消息、消费消息;可直接部署,也可以将集成到现有业务项目中。
步骤一:maven依赖
确认pom文件中引入了 “xxl-mq-client” 的maven依赖;
步骤二:”消息接入方”,属性配置
消息接入方配置,配置文件地址:
/xxl-mq/xxl-mq-samples/xxl-mq-samples-springboot/src/main/resources/application.properties
消息接入方配置,配置内容说明:
# 消息中心跟地址;支持配置多个,建议域名方式配置; xxl.mq.admin.address=http://localhost:8080/xxl-mq-admin
步骤三:”消息接入方”,组件配置
@Bean public XxlMqSpringClientFactory getXxlMqConsumer(){ XxlMqSpringClientFactory xxlMqSpringClientFactory = new XxlMqSpringClientFactory(); xxlMqSpringClientFactory.setAdminAddress(adminAddress); return xxlMqSpringClientFactory; }
步骤四:部署”消息接入方”项目:
如果已经正确进行上述配置,可将项目编译打包部署。
springboot版本示例项目,访问地址:http://localhost:8081/
至此“消息接入方”示例项目已经部署结束。
步骤五:”消息接入方”集群(可选):
消息接入方支持集群部署,提升消息系统可用性,同时提升消息处理能力。
消息接入方集群部署时,要求和建议:
- 消息中心跟地址(xxl.mq.admin.address)需要保持一致;
2.5 生产消息、消费消息
生产消息
/** * 生产消息:并行消息 */ XxlMqProducer.produce(new XxlMqMessage(topic, data)); /** * 生产消息:串行消费( ShardingId 保持一致即可;如秒杀消息,可将 ShardingId 设置为商品ID,则该商户全部消息固定在一台机器消费;) */ XxlMqMessage mqMessage = new XxlMqMessage(); mqMessage.setTopic(topic); mqMessage.setData(data); mqMessage.setShardingId(1); XxlMqProducer.produce(mqMessage); /** * 生产消息:广播消费( 消费者 IMqConsumer 注解的 group 属性修改不一致即可;一条消息将会广播给该主题全部在线 group,每个group都会消费,单个group只会消费一次; ) */ XxlMqProducer.broadcast(new XxlMqMessage(topic, data)); /** * 生产消息:延时消费( EffectTime 设置为固定时间点即可;如订单30min超时取消,可将 EffectTime 设置为30min后的时间点,到时将会自动消费;) */ XxlMqMessage mqMessage = new XxlMqMessage(); mqMessage.setTopic(topic); mqMessage.setData(data); mqMessage.setEffectTime(effectTime); XxlMqProducer.produce(mqMessage); /** * 生产消息:失败重试消费( RetryCount 设置重试次数即可;如发送短信消息,第三方服务不稳定时失败很常见,可设置 RetryCount 为3,失败是将会自动重试指定次数;) */ XxlMqMessage mqMessage = new XxlMqMessage(); mqMessage.setTopic(topic); mqMessage.setData(data); mqMessage.setRetryCount(3); XxlMqProducer.produce(mqMessage); ……
更多消息属性、场景,可参考章节 “4.2 Message设计”;
消费消息
@MqConsumer(topic = "topic_1") @Service public class DemoAMqComsumer implements IMqConsumer { private Logger logger = LoggerFactory.getLogger(DemoAMqComsumer.class); @Override public MqResult consume(String data) throws Exception { logger.info("[DemoAMqComsumer] 消费一条消息:{}", data); return MqResult.SUCCESS; } }
系统中每个消费者以 “IMqConsumer” 的形式存在, 规定如下:
- 1、每个 "IMqConsumer" 需要继承 "com.xxl.mq.client.consumer.IMqConsumer" 接口; - 2、需要扫描为Spring的Bean实例, 需加上 "@Service" 注解并被Spring扫描; - 3、需要加上注解 "com.xxl.mq.client.consumer.annotation.MqConsumer"。该注解 "value" 值为订阅的消息主题, "type" 值为消息类型(TOPIC广播消息、QUEUE并发消息队列 和 SERIAL_QUEUE串行消息队列);
更多消费者属性、场景,可参考章节 “4.6 Consumer设计”;
2.6 功能测试 & 性能测试
首选启动消息中心,然后启动 “springboot版本示例项目”;
访问部署成功的 “springboot版本示例项目” 地址,浏览器访问展示如下如下:
该示例项目已经提供了多个消息生产与消费的实例:
a、”并行消费” 测试:连续点击 “并行消费” 按钮4次,将会生产4条并行消息;
进入消息中心 “消息记录” 菜单,消息列表如下:
逐个查看消息流转日志如下:
可以注意 “锁定消息” 的 “消费者信息”,可以查看到当前消费者在集群中的排序 “rank”。
逐个查看每条消息对应消费者的 “rank” 属性,可以看到上面4条消息平局分配给不同 “rank” 的消费者,即平均分配给了不同消费者。测试正常;
b、”串行消费” 测试:连续点击 “串行消费” 按钮4次,将会生产4条串行消费;
操作步骤同 “并行消息”。最后一步逐个查看每条消息对应消费者的 “rank” 属性,会发现全部一致,即固定分配给了一个消费者。测试正常
c、”广播消息”:点击 “广播消息” 按钮一次,将会生产一条广播消息;
进入消息中心 “消息记录” 菜单,消息列表如下:
一条广播消息将会广播给该主题全部在线group,该消息主题存在2个消息group,所以会每个group创建一条,即两条消息。测试正常。
d、”延时消息”:点击 “延时消息” 按钮一次,将会生产一条延时消息;
进入消息中心 “消息记录” 菜单,可以查看消息 “生效时间”属性为 5min 之后,最终该消息在 5min 之后被消费执行。测定正常。
e、”性能测试” 测试:点击 “性能测试”按钮,将会批量发送10000条消息;
点击按钮后,页面下方展示文案 “Cost = 1055”,说明在 1055ms 之内客户端发送了 1000 条消息;
但是,由于测试代码中采用异步方式发送,消息发送事件与是否成功需要在消息中心中确认。
进入消息中心 “消息记录” 菜单,如下图,可以看到 10000 条消息创建事件最大为 “2018-12-02 04:51:54”,最小为
“2018-12-02 04:51:55”。说明在 1s 左右客户端成功发送了 10000 条消息,且 100% 投递成功,即单机TPS过万;
然后进入 “运行报表” 界面,如下图,点击成功比例图可知,成功消费 10000 条,比例 100%。说明客户端发送的 10000 条消息 100% 消费成功。
其他测试
如延时消息、重试消息 …… 可自行参考示例代码测试;
三、消息中心,操作指南
3.1 运行报表:
运行报表界面,展示消息中心系统信息,如业务线、消息主题、消息数量等;支持日期分布图、成功比例图方式查看;
3.2 消息主题
消息主题界面,可查看在线消息主题列表;底层会周期性扫描消息记录,发型并录入新的消息主题,并展示在这里;
消息主题界面,支持为消息主题设置一些附属参数,提供一些增强功能;如负责人、告警邮箱等;
消息主题属性:
- 业务线:该消息所属业务线,方便分组管理;
- 负责人:该消息所属负责人;
- 告警邮箱:一个或多个,多个逗号分隔;消息消费失败时,将会周期性发送告警邮件;
3.3 消息记录
消息记录界面,可查看在线消息记录;支持筛选、查看消息流转轨迹;
- 消息在线管理功能:支持在线 “新增”、”编辑” 和 “删除” 消息记录;
消息新增如下图所示,消息属性说明,可参考章节 “4.2 Message设计”;
- 消息手动清理:支持在线清理消息,可选择消息主题、状态、清理类型等;
3.4 业务线
业务先界面,可查看在线业务线列表,并管理维护;可通过自定义业务线,绑定消息主题,从而方便消息主题的分组管理;
四、系统设计
4.1 系统架构图
角色解释:
- Message : 消息实体;
- Broker : 消息代理中心, 负责连接Producer和Consumer;
- Topic : 消息主题, 每个消息队列的唯一性标示;
- Topic segment : 消息分段, 同一个Topic的消息队列,将会根据订阅的Consumer进行分片分组,每个Consumer拥有的消息片即一个segment;
- Producer : 消息生产者, 绑定一个消息Topic, 并向该Topic消息队列中生产消息;
- Consumer : 消息消费者, 绑定一个消息Topic, 只能消费该Topic消息队列中的消息;
- Consumer Group : 消费者分组,隔离消息;同一个Topic下一条消息消费一次;
架构图模块解读:
- Server
- Broker: 消息代理中心, 系统核心组成模块, 负责接受消息生产者Producer推送生产的消息, 同时负责提供RPC服务供消费者Consumer使用来消费消息;
- Message Queue: 消息存储模块, 目前底层使用mysql消息表;
- Registry Center
- Broker Registry Center: Broker注册中心子模块, 供Broker注册RPC服务使用;
- Consumer Registry Center: Consumer注册中心子模块, 供Consumer注册消费节点使用;
- Client
- Producer: 消息生产者模块, 负责提供API接口供开发者调用,并生成和发送队列消息;
- Consumer: 消息消费者模块, 负责订阅消息并消息;
4.2 Message设计
消息核心属性 | 说明 |
---|---|
topic | 消息主题 |
group | 消息分组, 分组一致时消息仅消费一次;存在多个分组时,多个分组时【广播消费】; |
data | 消息数据 |
retryCount | 重试次数, 执行失败且大于0时生效,每重试一次减一; |
shardingId | 分片ID, 大于0时启用,否则使用消息ID;消费者通过该参数进行消息分片消费;分片ID不一致时分片【并发消费】、一致时【串行消费】; |
timeout | 超时时间,单位秒;大于0时生效,处于锁定运行状态且运行超时时,将主动标记运行失败; |
effectTime | 生效时间, new Date()立即执行, 否则在生效时间点之后开始执行; |
4.3 Broker设计
Broker(消息代理中心):系统核心组成模块, 负责接受消息生产者Producer推送生产的消息, 同时负责提供RPC服务供消费者Consumer使用来消费消息;
Broker支持集群部署, 集群节点之间地位平等, 集群部署情况下可大大提高系统的消息吞吐量。
Broker通过内置注册中心实现集群功能, 各节点在启动时会自动注册到注册中心, Producer或Consumer在生产消息或者消费消息时,将会通过内置注册中心自动感知到在线的Broker节点。
Broker在接收到Produce的生产消息的RPC调用时, 并不会立即存储该消息, 而是立即push到内存队列中, 同时立即响应RPC调用。 内存队列将会异步将队列中的消息数据存储到Mysql中。
Broker在接收到 “消息锁定” 等同步RPC调用时, 将会触发同步调用, 采用乐观锁方式锁定消息;
4.4 Registry Center设计
Registry Center(注册中心)主要分为两个子模块: Broker注册中心、Consumer注册中心;
- Broker注册中心子模块: 供Broker注册RPC服务使用;
- Consumer注册中心子模块: 供Consumer注册消费节点使用;
4.5 Producer设计
Producer(消息生产者), 兼容“异步批量多线程生产”+“同步生产”两种方式,提升消息发送性能;
底层通讯全异步化:消息新增 + 消息新增接受 + 消息回调 + 消息回调接受;仅批量PULL消息与锁消息非异步;
4.6 Consumer设计
MqConsumer注解属性 | 说明 |
---|---|
group | 消息分组;为空时自动赋值UUID多分组【广播消费】; |
topic | 消息主题 |
transaction | 事务开关,开启消息事务性保证只会成功执行一次;关闭时可能重复消费,性能较优; |
消费者通过 “多线程轮训 + 消息分片 + PULL + 消息锁定” 的方式来实现:
- 多线程轮训: 该模式下每个Consumer将会存在一个线程, 如存在多个Consumer, 多个Consumer将会并行消息同一主题下的消息, 大大提高消息的消费速度;
- 轮训延时自适应:线程轮训方式PULL消息,如若获取不到消息将会主动休眠,休眠时间依次递增10s,最长60s;即消息生产之后距离被消费存在 0~60s 的时间差,分钟范围内;
- 消息分片 : 队列中消息将会按照 “Registry Center” 中注册的Consumer列表顺序进行消息分段,
保证一条消息只会被分配给其中一个Consumer, 每个Consumer只会消费分配给自己的消息。 因此在多个Consumer并发消息时,
可以保证同一条消息不被多个Consumer竞争来重复消息。- 分片函数: MOD(“消息分片ID”, #{在线消费者总数}) = #{当前消费者排名} ,
- 分片逻辑解释:
每个Consumer通过注册中心感知到在线所有的Consumer, 计算出在线Consumer总数total,
以及当前Consumer在所有Consumer中的排名rank; 把消息分片ID对在线Consumer总数total进行取模,
余数和当前Consumer排名rank一致的消息认定为分配给自己的消息;
- PULL : 每个Consumer将会轮训PULL消息分片分配给自己的消息, 顺序消费。
- 消息锁定: Consumer在消费每一条消息时,开启事务时,将会主动进行消息锁定, 通过数据库乐观锁来实现, 锁定成功后消息状态变更为执行中状态, 将不会被Consumer再次PULL到。因此, 可以更进一步保证每条消息只会被消费一次;
- 消息状态和日志: 消息执行结束后, 将会调用Broker的RPC服务修改消息状态并追加消息日志, Broker将会通过内存队列方式, 异步消息队列中变更存储到数据库中。
4.7 延时消息
支持设置消息的延迟生效时间, 到达设置的生效时间时该消息才会被消费;适用于延时消费场景,如订单超时取消等;
4.8 事务性
消费者开启事务开关后,消息事务性保证只会成功执行一次;
4.9 失败重试
支持设置消息的重试次数, 在消息执行失败后将会按照设置的值进行消息重试执行,直至重试次数耗尽或者执行成功;
4.10 超时控制
支持自定义消息超时时间,消息消费超时将会主动中断;
4.11 海量数据堆积
消息中心数据库,原生兼容支持 “MySQL、TIDB” 两种存储方式,前者支持千万级消息堆积,后者支持百亿级别消息堆积(TIDB理论上无上限);
可视情况选择使用,当选择TIDB时,仅需要修改消息中心数据库连接jdbc地址配置即可,其他部分如SQL和驱动兼容MySQL和TIDB使用,不需修改。
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论