zbus发送topic 接收不到,大神帮看看
@少帮主 你好,想跟你请教个问题:
消费者
public class TopicConsumer {
public static void main(String[] args) throws Exception {
// 1)创建Broker代表
BrokerConfig brokerConfig = new BrokerConfig();
brokerConfig.setServerAddress("127.0.0.1:15555");
Broker broker = new SingleBroker(brokerConfig);
MqConfig config = new MqConfig();
config.setBroker(broker);
config.setTopic("MyTopic");
config.setMode(MqMode.PubSub);
// 2) 创建消费者
Consumer c = new Consumer(config);
while (true) {
Message msg = c.recv(10000);
if (msg == null) continue;
System.out.println("消费者1接收到消息: " + msg);
}
}
}
生产者
public class TopicProducer {
public static void main(String[] args) throws Exception {
//1)创建Broker代表
BrokerConfig config = new BrokerConfig();
config.setServerAddress("127.0.0.1:15555");
Broker broker = new SingleBroker(config);
//2) 创建生产者
Producer p = new Producer(broker, "MyTopic", MqMode.PubSub);
Message msg = new Message();
msg.setBody("hello world topic");
p.sendSync(msg);
broker.close();
}
}
我运行消费者然后调用生产者发送消息,消费者一直没收到消息怎么回事?你帮我看看代码有问题吗
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(2)
是的,topic不是全局的:)
问题解决了,原来 zbus中topic必须属于某个队列
代码改成下面就可以了:
public class TopicProducer {
public static void main(String[] args) throws Exception {
//1)创建Broker代表
BrokerConfig config = new BrokerConfig();
config.setServerAddress("127.0.0.1:15555");
Broker broker = new SingleBroker(config);
//2) 创建生产者
Producer p = new Producer(broker, "MyTopicMQ", MqMode.PubSub);
Message msg = new Message();
msg.setTopic("MyTopic");
msg.setBody("hello world topic");
p.sendSync(msg);
broker.close();
}
}
public class TopicConsumer {
public static void main(String[] args) throws Exception {
// 1)创建Broker代表
BrokerConfig brokerConfig = new BrokerConfig();
brokerConfig.setServerAddress("127.0.0.1:15555");
Broker broker = new SingleBroker(brokerConfig);
MqConfig config = new MqConfig();
config.setBroker(broker);
config.setMq("MyTopicMQ");
config.setTopic("MyTopic");
config.setMode(MqMode.PubSub);
// 2) 创建消费者
Consumer c = new Consumer(config);
while (true) {
Message msg = c.recv(10000);
if (msg == null) continue;
System.out.println("消费者1接收到消息: " + msg);
}
}
}