rabbitmq channel.addconfirmlistener(),接口ackcallback一些回调丢失了吗?

发布于 2025-01-20 15:17:00 字数 3642 浏览 1 评论 0原文

这是我的代码,channel.addConfirmListener() ackCallback 会丢失一些回调,消息确实发送到了rabbitmq服务器并且可以正常消费,但是我发送消息后休眠了2ms,所有ack回调都可以收到,

我不知道这是我的代码中的错误还是rabbitmq的错误

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import lombok.extern.log4j.Log4j2;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;

@Log4j2
public class 异步确认发布{
    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("");
        connectionFactory.setPort(7005);
        connectionFactory.setUsername("");
        connectionFactory.setPassword("");
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        // 开启确认发布
        AMQP.Confirm.SelectOk selectOk = channel.confirmSelect();
    
        channel.queueDeclare("hello", true, false, false, null);
        //  异步确认发布消息 回调
        channel.addConfirmListener(
                (deliveryTag, multiple) -> {
                    log.info("消息deliveryTag=>{}, send successful", deliveryTag);
                },
                (deliveryTag, multiple) -> {
                    log.info("消息deliveryTag=>{}, fail in send", deliveryTag);
                }
        );
        for (int i = 0; i < 5; i++) {
            String message = "Hello World!!!   " + i;
            channel.basicPublish("", "hello", null, message.getBytes(StandardCharsets.UTF_8));
        }
    }
}

控制台显示缺少一些回调

17:04:29.607 [AMQP Connection 27.11.210.232:7005] INFO me.demo.me.rabbitmq.consumer.发布确认.异步确认发布 - ackCallback, deliveryTag=>4, send successful
17:04:29.615 [AMQP Connection 27.11.210.232:7005] INFO me.demo.me.rabbitmq.consumer.发布确认.异步确认发布 - ackCallback, deliveryTag=>5, send successful

但是发送消息后我睡了2毫秒,并且可以接收所有回调

示例代码

for (int i = 0; i < 5; i++) {
    String message = "Hello World!!!   " + i;
    channel.basicPublish("", "hello", null, message.getBytes(StandardCharsets.UTF_8));
    Thread.sleep(2);  // I sleep for 2ms after sending the message, and all ack callbacks can be received
}

控制台日志

17:05:18.037 [AMQP Connection 27.11.210.232:7005] INFO me.demo.me.rabbitmq.consumer.发布确认.异步确认发布 - ackCallback, deliveryTag=>1, send successful
17:05:18.043 [AMQP Connection 27.11.210.232:7005] INFO me.demo.me.rabbitmq.consumer.发布确认.异步确认发布 - ackCallback, deliveryTag=>2, send successful
17:05:18.043 [AMQP Connection 27.11.210.232:7005] INFO me.demo.me.rabbitmq.consumer.发布确认.异步确认发布 - ackCallback, deliveryTag=>3, send successful
17:05:18.043 [AMQP Connection 27.11.210.232:7005] INFO me.demo.me.rabbitmq.consumer.发布确认.异步确认发布 - ackCallback, deliveryTag=>4, send successful
17:05:18.043 [AMQP Connection 27.11.210.232:7005] INFO me.demo.me.rabbitmq.consumer.发布确认.异步确认发布 - ackCallback, deliveryTag=>5, send successful

我的RabbitMQ Server 版本是 3.9.14 (没有修改任何配置。使用默认配置),Erlang 24.3.2 ,

Maven项目依赖在

<dependency>
    <groupId>org.springframework.amqp</groupId>
    <artifactId>spring-rabbit</artifactId>
    <version>2.2.18.RELEASE</version>
</dependency>

我试图阻止主线程关闭,但它似乎不是主线程关闭的原因,因为一旦创建连接主线程不会自动关闭

This is my code, channel.addConfirmListener() ackCallback Some callbacks will be lost, The message is indeed sent to the rabbitmq server and can be consumed normally , But I sleep for 2ms after sending the message, and all ack callbacks can be received,

I don't know if this is an error in my code or a rabbitmq bug

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import lombok.extern.log4j.Log4j2;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;

@Log4j2
public class 异步确认发布{
    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("");
        connectionFactory.setPort(7005);
        connectionFactory.setUsername("");
        connectionFactory.setPassword("");
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        // 开启确认发布
        AMQP.Confirm.SelectOk selectOk = channel.confirmSelect();
    
        channel.queueDeclare("hello", true, false, false, null);
        //  异步确认发布消息 回调
        channel.addConfirmListener(
                (deliveryTag, multiple) -> {
                    log.info("消息deliveryTag=>{}, send successful", deliveryTag);
                },
                (deliveryTag, multiple) -> {
                    log.info("消息deliveryTag=>{}, fail in send", deliveryTag);
                }
        );
        for (int i = 0; i < 5; i++) {
            String message = "Hello World!!!   " + i;
            channel.basicPublish("", "hello", null, message.getBytes(StandardCharsets.UTF_8));
        }
    }
}

The console shows some callbacks missing

17:04:29.607 [AMQP Connection 27.11.210.232:7005] INFO me.demo.me.rabbitmq.consumer.发布确认.异步确认发布 - ackCallback, deliveryTag=>4, send successful
17:04:29.615 [AMQP Connection 27.11.210.232:7005] INFO me.demo.me.rabbitmq.consumer.发布确认.异步确认发布 - ackCallback, deliveryTag=>5, send successful

But I sleep for 2ms after sending the message, and all callbacks can be received

example code

for (int i = 0; i < 5; i++) {
    String message = "Hello World!!!   " + i;
    channel.basicPublish("", "hello", null, message.getBytes(StandardCharsets.UTF_8));
    Thread.sleep(2);  // I sleep for 2ms after sending the message, and all ack callbacks can be received
}

console log

17:05:18.037 [AMQP Connection 27.11.210.232:7005] INFO me.demo.me.rabbitmq.consumer.发布确认.异步确认发布 - ackCallback, deliveryTag=>1, send successful
17:05:18.043 [AMQP Connection 27.11.210.232:7005] INFO me.demo.me.rabbitmq.consumer.发布确认.异步确认发布 - ackCallback, deliveryTag=>2, send successful
17:05:18.043 [AMQP Connection 27.11.210.232:7005] INFO me.demo.me.rabbitmq.consumer.发布确认.异步确认发布 - ackCallback, deliveryTag=>3, send successful
17:05:18.043 [AMQP Connection 27.11.210.232:7005] INFO me.demo.me.rabbitmq.consumer.发布确认.异步确认发布 - ackCallback, deliveryTag=>4, send successful
17:05:18.043 [AMQP Connection 27.11.210.232:7005] INFO me.demo.me.rabbitmq.consumer.发布确认.异步确认发布 - ackCallback, deliveryTag=>5, send successful

My RabbitMQ Server Version is 3.9.14 (No configuration has been modified. The default configuration is used), Erlang 24.3.2 ,

Maven Project dependency in

<dependency>
    <groupId>org.springframework.amqp</groupId>
    <artifactId>spring-rabbit</artifactId>
    <version>2.2.18.RELEASE</version>
</dependency>

I tried to prevent the main thread from shutting down, but it doesn't seem to be the reason for the main thread to shut down, because the main thread won't shut down automatically once the connection is created

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

因为看清所以看轻 2025-01-27 15:17:00

我不知道你为什么用 因为你根本没有使用 spring-rabbit API;您直接使用 amqp-client 。

这是按设计工作的;出于性能原因,确认回调在 true 时具有附加参数 multiple;这意味着,直到并包括此标签在内的所有标签均通过一次确认进行确认。

https://www.rabbitmq.com/tutorials/tutorial-7-java。 html

multiple:这是一个布尔值。如果为 false,则仅确认/nack 一条消息,如果为 true,则确认/nack-ed 所有具有较低或相同序列号的消息。

I am not sure why you tagged this with because you are not using the spring-rabbit APIs at all; you are using the amqp-client directly.

This is working as designed; for performance reasons, the confirm callback has the additional argument multiple when true; this means that all tags up to and including this one are confirmed with a single confirmation.

https://www.rabbitmq.com/tutorials/tutorial-seven-java.html

multiple: this is a boolean value. If false, only one message is confirmed/nack-ed, if true, all messages with a lower or equal sequence number are confirmed/nack-ed.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文