返回介绍

Spring 系列

MyBatis

Netty

Dubbo

Tomcat

Redis

Nacos

Sentinel

RocketMQ

番外篇(JDK 1.8)

学习心得

RocketMQ Broker 处理拉取消息请求流程

发布于 2024-05-19 21:34:33 字数 6795 浏览 0 评论 0 收藏 0

该文所涉及的 RocketMQ 源码版本为 4.9.3。

RocketMQ broker 处理拉取消息请求流程

org.apache.rocketmq.broker.processor.PullMessageProcessor#processRequest(io.netty.channel.ChannelHandlerContext, org.apache.rocketmq.remoting.protocol.RemotingCommand)

第 1 步、校验broker是否可读

if (!PermName.isReadable(this.brokerController.getBrokerConfig().getBrokerPermission())) {
    response.setCode(ResponseCode.NO_PERMISSION);
    response.setRemark(String.format("the broker[%s] pulling message is forbidden", this.brokerController.getBrokerConfig().getBrokerIP1()));
    return response;
}

第 2 步、根据消费组获取订阅信息

SubscriptionGroupConfig subscriptionGroupConfig =
    this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(requestHeader.getConsumerGroup());

第 3 步、校验是否允许消费

if (!subscriptionGroupConfig.isConsumeEnable()) {
    response.setCode(ResponseCode.NO_PERMISSION);
    response.setRemark("subscription group no permission, " + requestHeader.getConsumerGroup());
    return response;
}

第 4 步、根据主题获取对应的配置信息

TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
if (null == topicConfig) {
    log.error("the topic {} not exist, consumer: {}", requestHeader.getTopic(), RemotingHelper.parseChannelRemoteAddr(channel));
    response.setCode(ResponseCode.TOPIC_NOT_EXIST);
    response.setRemark(String.format("topic[%s] not exist, apply first please! %s", requestHeader.getTopic(), FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL)));
    return response;
}

第 5 步、校验主题对应的队列

if (requestHeader.getQueueId() < 0 || requestHeader.getQueueId() >= topicConfig.getReadQueueNums()) {
    String errorInfo = String.format("queueId[%d] is illegal, topic:[%s] topicConfig.readQueueNums:[%d] consumer:[%s]",
        requestHeader.getQueueId(), requestHeader.getTopic(), topicConfig.getReadQueueNums(), channel.remoteAddress());
    log.warn(errorInfo);
    response.setCode(ResponseCode.SYSTEM_ERROR);
    response.setRemark(errorInfo);
    return response;
}

第 6 步、如果配置了消息过滤表达式,根据表达式进行构建consumerFilterData,如果没有,则根据主题构建

consumerFilterData = ConsumerFilterManager.build(
                        requestHeader.getTopic(), requestHeader.getConsumerGroup(), requestHeader.getSubscription(),
                        requestHeader.getExpressionType(), requestHeader.getSubVersion()

consumerFilterData = this.brokerController.getConsumerFilterManager().get(requestHeader.getTopic(),
                    requestHeader.getConsumerGroup());

第 7 步、校验如果不是Tag过滤,是否开启了自定义属性过滤,如果没有开启,不允许操作 只有使用push推送模式的消费者才能用使用SQL92标准的sql语句,pull拉取模式的消费者是不支持这个功能的。

if (!ExpressionType.isTagType(subscriptionData.getExpressionType())
    && !this.brokerController.getBrokerConfig().isEnablePropertyFilter()) {
    response.setCode(ResponseCode.SYSTEM_ERROR);
    response.setRemark("The broker does not support consumer to filter message by " + subscriptionData.getExpressionType());
    return response;
}

第 8 步、根据是否支持重试过滤创建不同的MessageFilter

if (this.brokerController.getBrokerConfig().isFilterSupportRetry()) {
    messageFilter = new ExpressionForRetryMessageFilter(subscriptionData, consumerFilterData,
        this.brokerController.getConsumerFilterManager());
} else {
    messageFilter = new ExpressionMessageFilter(subscriptionData, consumerFilterData,
        this.brokerController.getConsumerFilterManager());
}

第 9 步、根据消费组、主题、队列、偏移量、最大拉取消息数量、消息过滤器查找信息

final GetMessageResult getMessageResult =
    this.brokerController.getMessageStore().getMessage(requestHeader.getConsumerGroup(), requestHeader.getTopic(),
        requestHeader.getQueueId(), requestHeader.getQueueOffset(), requestHeader.getMaxMsgNums(), messageFilter);

第 10 步、消息为空 设置code为系统错误 返回response

response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("store getMessage return null");

第 11 步、提交偏移量

if (storeOffsetEnable) {
    this.brokerController.getConsumerOffsetManager().commitOffset(RemotingHelper.parseChannelRemoteAddr(channel),
        requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId(), requestHeader.getCommitOffset());
}

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
    原文