RABBITMQ(C#):发送ACK后,我想将项目移回队列而不是从队列中删除
在我的程序示例中,我有100个任务和5台计算机可以处理它们。我的问题是关于如何通过这些任务启动我的队列一次,然后将它们永久保留(因为在计算机中完成了任务,而不是在删除任务后完成任务后,它将移至队列后,而不是正常的确认。反而)。我想在我的示例中将这100个任务保留在一个永无止境的队列中,该队列一直通过可用的任何计算机来处理。我有一个预取的数量为1,这意味着计算机一次只能执行一个任务。
我当前的伪代码在下面试图做我要做的事情,但最后,任务从队列中删除,似乎并没有一直被添加到队列中。昨晚我在队列中完成了100个任务,但到了早晨,我在队列中只有56个任务。
public static async Task Dequeue(CancellationToken token)
{
try
{
using var channel = await GetQueue().ConfigureAwait(false);
if (channel != null)
{
var consumer = new AsyncEventingBasicConsumer(channel);
consumer.Received += async (ch, ea) =>
{
var body = ea.Body.ToArray();
var task = Encoding.UTF8.GetString(body);
var properties = ea.BasicProperties;
// do long running task here
await Task.Delay(600000).ConfigureAwait(false);
// when finished with work then send ack
channel.BasicAck(ea.DeliveryTag, false);
// now add to end of queue when work is complete
channel.BasicPublish(exchange: "",
routingKey: "task_queue",
basicProperties: properties,
body: body);
await Task.Yield();
};
// this consumer tag identifies the subscription
// when it has to be cancelled
string consumerTag = channel.BasicConsume("task_queue", false, consumer);
channel.WaitForConfirmsOrDie(new TimeSpan(0, 0, 10000));
while (!token.IsCancellationRequested)
{
// just wait until cancellation is requested
await Task.Delay(1000, token).ConfigureAwait(false);
}
}
}
catch (Exception ex)
{
Console.WriteLine(ex.Message);
}
}
public static async Task<IModel?> GetQueue(bool isPublisherMode = true)
{
try
{
var factory = new ConnectionFactory()
{
HostName = "localhost",
Port = 5672,
UserName = "guest",
Password = "*******",
DispatchConsumersAsync = true,
ContinuationTimeout = new TimeSpan(2, 0, 0),
HandshakeContinuationTimeout = new TimeSpan(2, 0, 0),
RequestedConnectionTimeout = new TimeSpan(2, 0, 0)
};
var connection = factory.CreateConnection();
var channel = connection.CreateModel();
if (isPublisherMode)
{
channel.ConfirmSelect();
}
var args = new Dictionary<string, object>();
channel.QueueDeclare(queue: "task_queue",
durable: true,
exclusive: false,
autoDelete: false,
arguments: args);
channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
Console.WriteLine("[*] Waiting for tasks.");
return channel;
}
catch (Exception ex)
{
Console.WriteLine(ex.Message);
}
return null;
}
In my program example I have 100 tasks and 5 computers to work on them. My question is about how I can initiate my queue once with these tasks and then keep them permanent (as in a computer finishes a task and instead of a normal acknowledgment after the task is complete where it gets removed, it gets moved back to the queue instead). I want to keep these 100 tasks in my example in a never ending queue that keeps getting worked on by whatever computer is available. I have a prefetch count of 1 meaning a computer can only work on one task at a time.
My current pseudo code is below attempting to do what I'm trying to do but in the end, tasks get removed from the queue and don't seem to be getting added back into the queue all of the time. I started this test last night with the 100 tasks in the queue but by the morning I only had 56 tasks in the queue.
public static async Task Dequeue(CancellationToken token)
{
try
{
using var channel = await GetQueue().ConfigureAwait(false);
if (channel != null)
{
var consumer = new AsyncEventingBasicConsumer(channel);
consumer.Received += async (ch, ea) =>
{
var body = ea.Body.ToArray();
var task = Encoding.UTF8.GetString(body);
var properties = ea.BasicProperties;
// do long running task here
await Task.Delay(600000).ConfigureAwait(false);
// when finished with work then send ack
channel.BasicAck(ea.DeliveryTag, false);
// now add to end of queue when work is complete
channel.BasicPublish(exchange: "",
routingKey: "task_queue",
basicProperties: properties,
body: body);
await Task.Yield();
};
// this consumer tag identifies the subscription
// when it has to be cancelled
string consumerTag = channel.BasicConsume("task_queue", false, consumer);
channel.WaitForConfirmsOrDie(new TimeSpan(0, 0, 10000));
while (!token.IsCancellationRequested)
{
// just wait until cancellation is requested
await Task.Delay(1000, token).ConfigureAwait(false);
}
}
}
catch (Exception ex)
{
Console.WriteLine(ex.Message);
}
}
public static async Task<IModel?> GetQueue(bool isPublisherMode = true)
{
try
{
var factory = new ConnectionFactory()
{
HostName = "localhost",
Port = 5672,
UserName = "guest",
Password = "*******",
DispatchConsumersAsync = true,
ContinuationTimeout = new TimeSpan(2, 0, 0),
HandshakeContinuationTimeout = new TimeSpan(2, 0, 0),
RequestedConnectionTimeout = new TimeSpan(2, 0, 0)
};
var connection = factory.CreateConnection();
var channel = connection.CreateModel();
if (isPublisherMode)
{
channel.ConfirmSelect();
}
var args = new Dictionary<string, object>();
channel.QueueDeclare(queue: "task_queue",
durable: true,
exclusive: false,
autoDelete: false,
arguments: args);
channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
Console.WriteLine("[*] Waiting for tasks.");
return channel;
}
catch (Exception ex)
{
Console.WriteLine(ex.Message);
}
return null;
}
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论