zbus发送topic 接收不到,大神帮看看

发布于 2021-11-28 16:16:29 字数 1462 浏览 948 评论 2

@少帮主 你好,想跟你请教个问题:

消费者

public class TopicConsumer {

public static void main(String[] args) throws Exception {
// 1)创建Broker代表
BrokerConfig brokerConfig = new BrokerConfig();
brokerConfig.setServerAddress("127.0.0.1:15555");
Broker broker = new SingleBroker(brokerConfig);


MqConfig config = new MqConfig();
config.setBroker(broker);
config.setTopic("MyTopic");
config.setMode(MqMode.PubSub);

// 2) 创建消费者
Consumer c = new Consumer(config);
while (true) {
Message msg = c.recv(10000);
if (msg == null) continue;
System.out.println("消费者1接收到消息: " + msg);
}
}


}

生产者

public class TopicProducer {


public static void main(String[] args) throws Exception {
//1)创建Broker代表
BrokerConfig config = new BrokerConfig();
   config.setServerAddress("127.0.0.1:15555");
   Broker broker = new SingleBroker(config);


   //2) 创建生产者
   Producer p = new Producer(broker, "MyTopic", MqMode.PubSub);
   Message msg = new Message();
   msg.setBody("hello world topic");


   p.sendSync(msg);
   broker.close();
}
}

我运行消费者然后调用生产者发送消息,消费者一直没收到消息怎么回事?你帮我看看代码有问题吗


如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

复古式 2021-11-30 17:05:05

是的,topic不是全局的:)

白龙吟 2021-11-30 09:42:09

问题解决了,原来 zbus中topic必须属于某个队列

代码改成下面就可以了:

public class TopicProducer {

public static void main(String[] args) throws Exception {
//1)创建Broker代表
BrokerConfig config = new BrokerConfig();
   config.setServerAddress("127.0.0.1:15555");
   Broker broker = new SingleBroker(config);

   //2) 创建生产者
   Producer p = new Producer(broker, "MyTopicMQ", MqMode.PubSub);
   Message msg = new Message();
   msg.setTopic("MyTopic");
   msg.setBody("hello world topic");

   p.sendSync(msg);
   broker.close();
}
}

public class TopicConsumer {

public static void main(String[] args) throws Exception {
// 1)创建Broker代表
BrokerConfig brokerConfig = new BrokerConfig();
brokerConfig.setServerAddress("127.0.0.1:15555");
Broker broker = new SingleBroker(brokerConfig);

MqConfig config = new MqConfig();
config.setBroker(broker);
config.setMq("MyTopicMQ");
config.setTopic("MyTopic");
config.setMode(MqMode.PubSub);

// 2) 创建消费者
Consumer c = new Consumer(config);
while (true) {
Message msg = c.recv(10000);
if (msg == null) continue;
System.out.println("消费者1接收到消息: " + msg);
}
}

}

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文