RABBITMQ(C#):发送ACK后,我想将项目移回队列而不是从队列中删除

发布于 2025-01-30 01:55:35 字数 3210 浏览 2 评论 0原文

在我的程序示例中,我有100个任务和5台计算机可以处理它们。我的问题是关于如何通过这些任务启动我的队列一次,然后将它们永久保留(因为在计算机中完成了任务,而不是在删除任务后完成任务后,它将移至队列后,而不是正常的确认。反而)。我想在我的示例中将这100个任务保留在一个永无止境的队列中,该队列一直通过可用的任何计算机来处理。我有一个预取的数量为1,这意味着计算机一次只能执行一个任务。

我当前的伪代码在下面试图做我要做的事情,但最后,任务从队列中删除,似乎并没有一直被添加到队列中。昨晚我在队列中完成了100个任务,但到了早晨,我在队列中只有56个任务。

public static async Task Dequeue(CancellationToken token)
{
    try
    {
        using var channel = await GetQueue().ConfigureAwait(false);

        if (channel != null)
        {
            var consumer = new AsyncEventingBasicConsumer(channel);
            consumer.Received += async (ch, ea) =>
            {
                var body = ea.Body.ToArray();
                var task = Encoding.UTF8.GetString(body);
                var properties = ea.BasicProperties;

                // do long running task here
                await Task.Delay(600000).ConfigureAwait(false);

                // when finished with work then send ack
                channel.BasicAck(ea.DeliveryTag, false);

                // now add to end of queue when work is complete
                channel.BasicPublish(exchange: "",
                                     routingKey: "task_queue",
                                     basicProperties: properties,
                                     body: body);

                await Task.Yield();
            };

            // this consumer tag identifies the subscription
            // when it has to be cancelled
            string consumerTag = channel.BasicConsume("task_queue", false, consumer);
            channel.WaitForConfirmsOrDie(new TimeSpan(0, 0, 10000));
            
            while (!token.IsCancellationRequested)
            {
                // just wait until cancellation is requested
                await Task.Delay(1000, token).ConfigureAwait(false);
            }
        }
    }
    catch (Exception ex)
    {
        Console.WriteLine(ex.Message);
    }
}

public static async Task<IModel?> GetQueue(bool isPublisherMode = true)
{
    try
    {
        var factory = new ConnectionFactory()
        {
            HostName = "localhost",
            Port = 5672,
            UserName = "guest",
            Password = "*******",
            DispatchConsumersAsync = true,
            ContinuationTimeout = new TimeSpan(2, 0, 0),
            HandshakeContinuationTimeout = new TimeSpan(2, 0, 0),
            RequestedConnectionTimeout = new TimeSpan(2, 0, 0)
        };

        var connection = factory.CreateConnection();
        var channel = connection.CreateModel();

        if (isPublisherMode)
        {
            channel.ConfirmSelect();
        }
        
        var args = new Dictionary<string, object>();

        channel.QueueDeclare(queue: "task_queue",
                             durable: true,
                             exclusive: false,
                             autoDelete: false,
                             arguments: args);

        channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);

        Console.WriteLine("[*] Waiting for tasks.");

        return channel;
    }
    catch (Exception ex)
    {
        Console.WriteLine(ex.Message);
    }

    return null;
}

In my program example I have 100 tasks and 5 computers to work on them. My question is about how I can initiate my queue once with these tasks and then keep them permanent (as in a computer finishes a task and instead of a normal acknowledgment after the task is complete where it gets removed, it gets moved back to the queue instead). I want to keep these 100 tasks in my example in a never ending queue that keeps getting worked on by whatever computer is available. I have a prefetch count of 1 meaning a computer can only work on one task at a time.

My current pseudo code is below attempting to do what I'm trying to do but in the end, tasks get removed from the queue and don't seem to be getting added back into the queue all of the time. I started this test last night with the 100 tasks in the queue but by the morning I only had 56 tasks in the queue.

public static async Task Dequeue(CancellationToken token)
{
    try
    {
        using var channel = await GetQueue().ConfigureAwait(false);

        if (channel != null)
        {
            var consumer = new AsyncEventingBasicConsumer(channel);
            consumer.Received += async (ch, ea) =>
            {
                var body = ea.Body.ToArray();
                var task = Encoding.UTF8.GetString(body);
                var properties = ea.BasicProperties;

                // do long running task here
                await Task.Delay(600000).ConfigureAwait(false);

                // when finished with work then send ack
                channel.BasicAck(ea.DeliveryTag, false);

                // now add to end of queue when work is complete
                channel.BasicPublish(exchange: "",
                                     routingKey: "task_queue",
                                     basicProperties: properties,
                                     body: body);

                await Task.Yield();
            };

            // this consumer tag identifies the subscription
            // when it has to be cancelled
            string consumerTag = channel.BasicConsume("task_queue", false, consumer);
            channel.WaitForConfirmsOrDie(new TimeSpan(0, 0, 10000));
            
            while (!token.IsCancellationRequested)
            {
                // just wait until cancellation is requested
                await Task.Delay(1000, token).ConfigureAwait(false);
            }
        }
    }
    catch (Exception ex)
    {
        Console.WriteLine(ex.Message);
    }
}

public static async Task<IModel?> GetQueue(bool isPublisherMode = true)
{
    try
    {
        var factory = new ConnectionFactory()
        {
            HostName = "localhost",
            Port = 5672,
            UserName = "guest",
            Password = "*******",
            DispatchConsumersAsync = true,
            ContinuationTimeout = new TimeSpan(2, 0, 0),
            HandshakeContinuationTimeout = new TimeSpan(2, 0, 0),
            RequestedConnectionTimeout = new TimeSpan(2, 0, 0)
        };

        var connection = factory.CreateConnection();
        var channel = connection.CreateModel();

        if (isPublisherMode)
        {
            channel.ConfirmSelect();
        }
        
        var args = new Dictionary<string, object>();

        channel.QueueDeclare(queue: "task_queue",
                             durable: true,
                             exclusive: false,
                             autoDelete: false,
                             arguments: args);

        channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);

        Console.WriteLine("[*] Waiting for tasks.");

        return channel;
    }
    catch (Exception ex)
    {
        Console.WriteLine(ex.Message);
    }

    return null;
}

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文