spring cloud stream 3 消费者绑不到交换机上

发布于 2022-09-30 23:08:03 字数 2462 浏览 42 评论 0

spring cloud版本:2020.0.2
spring cloud stream 3版本:3.1.2
RabbitMQ 版本:3.7.7

消费者配置

  cloud:
    stream:
      bindings:
        topicAction-in-0:
          destination: threadsEvent
      binders:
        defaultRabbit:
          type: rabbit
          environment:
            spring:
              rabbitmq:
                host: localhost
                port: 5672
                username: guest
                password: guest

消息的处理方法

@Configuration
public class RabbitMQEventActionConfig {
    @Autowired
    private TopicActionCollectionService topicActionCollectionService;
    private final static Logger logger = LoggerFactory.getLogger(RabbitMQEventActionConfig.class);

    //spring-cloud-stream事件消费者
    @Bean(name="topicAction")
    public Consumer<String> topicAction() {
        logger.error("[TE]start consume topic event");
        return talBody -> {
            TopicActionLog tal = new Gson().fromJson(talBody, TopicActionLog.class);
            logger.error("[MDA][Receive]topic action: "+tal.getAction().getTitle()+", operator: "+tal.getMemberNickname());
        };
    }

消息的生产者:

@Component
public class TopicActionLogMessageProvider {
    @Autowired
    private StreamBridge streamBridge;
    private final static Logger logger = LoggerFactory.getLogger(TopicActionLogMessageProvider.class);

    @Override
    public void logs(ForumActionEnum action, long topicId, long rodeMember, ActionEventCulpritor culpritor) {
        TopicActionLog tal = new TopicActionLog(culpritor.getMemberId(), culpritor.getMemberNickname(), action, topicId, rodeMember, culpritor.getIpAddr(), culpritor.getToken());
        String body = new Gson().toJson(tal);
        streamBridge.send("topicAction-out-0", body);
        logger.error("[MDA][Provider]topic action: "+action.getTitle()+", operator: "+culpritor.getMemberNickname());
    }
}

生产者的配置

  cloud:
    config:
      label: master
      name: system
      profile: dev
    stream:
      bindings:
        topicAction-out-0:
          destination: threadsEvent
      binders:
        defaultRabbit:
          type: rabbit
          environment:
            spring:
              rabbitmq:
                host: localhost
                port: 5672
                username: guest
                password: guest

生产者可以连接到rabbitmq.交换机:threadsEvent能创建.消息能发送到交换机上

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

北风几吹夏 2022-10-07 23:08:04

话题服务(消息的生产者)
java代码:

streamBridge.send("topicAction-out-0", body);

topicAction-out-0是绑定名称

yaml配置:

spring:
 cloud:    
   stream:
      bindings:
        topicAction-out-0:
          destination: threadsEvent
          group: threadsQueue
      binders:
        defaultRabbit:
          type: rabbit
          environment:
            spring:
              rabbitmq:
                host: localhost
                port: 5672
                username: guest
                password: guest

回复服务(消息的生产者)
java代码:

streamBridge.send("postsAction-out-0", body);

postsAction-out-0是绑定名称

yaml配置:

spring:
   cloud:    
    stream:
      bindings:
        postsAction-out-0:
          destination: repliesEvent
          group: repliesQueue
      binders:
        defaultRabbit:
          type: rabbit
          environment:
            spring:
              rabbitmq:
                host: localhost
                port: 5672
                username: guest
                password: guest

日志服务(消息的消费者)
java代码:

@Configuration
public class RabbitMQEventActionConfig {
    @Autowired
    private TopicActionCollectionService topicActionCollectionService;
    private final static Logger logger = LoggerFactory.getLogger(RabbitMQEventActionConfig.class);

    //spring-cloud-stream事件消费者
    @Bean
    public Consumer<String> topicAction() {
        logger.error("[TE]start consume topic event");
        return talBody -> {
            TopicActionLog tal = new Gson().fromJson(talBody, TopicActionLog.class);
            logger.error("[MDA][Receive]topic action: "+tal.getAction().getTitle()+", operator: "+tal.getMemberNickname());
            TopicActionCollection tac = new TopicActionCollection(tal.getMemberNickname(), tal.getMemberId(), tal.getAction(), tal.getTopicId(), tal.getIpAddr(), tal.getToken(), tal.getRodeMember());
            topicActionCollectionService.create(tac);
        };
    }

    @Bean
    public Consumer<String> postsAction(){
        logger.error("[TE]start consume posts event");
        return palBody -> {
            PostsActionLog pal = new Gson().fromJson(palBody, PostsActionLog.class);
            logger.error("[MDA][Receive]posts action: "+pal.getAction().getTitle()+", operator: "+pal.getMemberNickname());
            TopicActionCollection pac = new TopicActionCollection(pal.getMemberNickname(), pal.getMemberId(), pal.getAction(), pal.getTopicId(), pal.getPostsId(), pal.getIpAddr(), pal.getToken(), pal.getRodeMember());
            topicActionCollectionService.create(pac);
        };
    }
}

yaml配置:

  cloud:
    function:
      definition: topicAction;postsAction
    stream:
      bindings:
        topicAction-in-0:
          destination: threadsEvent
          group: threadsQueue
        postsAction-in-0:
          destination: repliesEvent
          group: repliesQueue
      binders:
        defaultRabbit:
          type: rabbit
          environment:
            spring:
              rabbitmq:
                host: localhost
                port: 5672
                username: guest
                password: guest

最后:

  1. 多看文档. 多试验。不要相信网上的各种搜索结果
  2. spring.cloud.function.definition中的管道符号和逗号都是拼接符号. 若消费者的配置中这样写:spring.cloud.function.definition=topicAction|postsAction哪么在消息的消费者服务启动完后会创建一个交换机名称:topicActionpostsAction-in-0. 并绑定一个队列,队列的名称是交换机名称后面拼接的anonymous加最后一串值
  3. 若消息消费者配的不对spring会在消息生产者执行完后创建一个交换机.并不会绑定队列.
  4. 上面的配置消费者服务启来后都可以创建好交换机和队列。rabbitMQ截图如下:
    交换机:
    mq交换机

队列:
mq队列

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文