我们可以使用.NET代码从Azure Service Bus中的Dead Letter队列中删除基于序列编号的特定死信消息?

发布于 2025-02-04 00:55:49 字数 4034 浏览 2 评论 0原文

我创建了一个程序来根据序列编号读取Deadletter队列中的消息,并复制Dead Letter消息的内容,并将其发送到Active Queue作为新消息,其中包含Deadletter消息的内容以及相同的MessageID和其他属性。后来,我也从DLQ删除了相同的消息。

 public static async Task<string> GetDeadLetterMessagesAsync(string connectionString,
string queueName, long seqNum, int countDLQMessages)
    {
        //creating a service bus client
        var serviceBusClient = new ServiceBusClient(connectionString);
        Console.WriteLine("ServiceBusClient is created");
        //ReceiverOptions to access dead letter queue
        var receiverOptions = new ServiceBusReceiverOptions { SubQueue = SubQueue.DeadLetter };
        Console.WriteLine("receiverOption is created");
        //Create receiver to access the deadletter queue of main queue
        var receiver = serviceBusClient.CreateReceiver(queueName, receiverOptions);
        Console.WriteLine("receiver is created");
        // serviceBusSender used to send the message to service bus
        ServiceBusSender sender;
        sender= serviceBusClient.CreateSender(queueName);
        
        IList<ServiceBusReceivedMessage> receivedMessages = (IList<ServiceBusReceivedMessage>)await receiver.ReceiveMessagesAsync(countDLQMessages);
        Console.WriteLine("read all the messages in service bus DLQ");
        var totalMessageCount = receivedMessages.Count;
        if (totalMessageCount == 0)
        {
            return "No Message is available in DLQ";
        }
        // Binary search on deadletter messages in DLQ on sequence number
        Int32 lower = 0;
            Int32 upper = receivedMessages.Count - 1;
            Console.WriteLine("ReceivedMessage List");
            Console.WriteLine(receivedMessages[0].Body);
            int sequenceFlag = 0;
            while (lower <= upper)
            {
                Int32 middle = lower + (upper - lower) / 2;
                if (seqNum == receivedMessages[middle].SequenceNumber)
                {
                    var Body = receivedMessages[middle].Body;
                    var MessageId = receivedMessages[middle].MessageId;
                    var CorrelationId = receivedMessages[middle].CorrelationId;
                    var msg = new ServiceBusMessage
                    {
                        Body = Body,
                        MessageId = MessageId,
                        CorrelationId = CorrelationId
                    };
                    //Sending the dead letter message to active queue along with its other properties i.e MessageId, etc.
                    await sender.SendMessageAsync(msg);
                    Console.WriteLine($"Message has been published from dead letter to Active Queue.");
                    //complete the dead letter message which will remove the message from dead letter queue
                    await receiver.CompleteMessageAsync(receivedMessages[middle]);
                    //Changing the sequenceFlag value to 1, if requested sequence number exists in DLQ.
                    sequenceFlag = 1;
                    //clean up the service bus resources used by sender and receiver
                    await sender.DisposeAsync();
                    await receiver.DisposeAsync();
                    break;
                }
                else if (seqNum < receivedMessages[middle].SequenceNumber)
                    upper = middle - 1;
                else
                    lower = middle + 1;
            }
       
        if (sequenceFlag != 1)
        {
            return ($"Sequence number: {seqNum} doesn't exist in queue - {queueName}");
        }
        else
        {
            return "Data is moved to Active queue";
        }
        
    }

如果DLQ中的消息少于100个消息,则此代码有效,但是当我们在DLQ中有超过100或1000个消息时,它不会读取DLQ中可用的死信件,并将输出丢弃,因为消息是不可用的。序列编号“ xyz”。 由于接收到方法,因此发生了此问题,因为它不能保证它将返回与其参数中要求的完全相同的消息数。

我们还有其他方法可以立即提取n个消息数,并且可以对相同的进一步操作执行进一步的操作?

我也尝试根据序列编号窥视消息并发送其内容要活跃队列作为新消息,但我无法在DLQ中完成该特定消息。

我们可以根据序列编号删除/完成死信消息吗?

I have created a program to read the message from deadletter queue based on the sequence number and copy the contents of dead letter message and sends it to active queue as a new message with the content of deadletter message along with the same messageId and other properties. And later I delete the same message from DLQ as well.

 public static async Task<string> GetDeadLetterMessagesAsync(string connectionString,
string queueName, long seqNum, int countDLQMessages)
    {
        //creating a service bus client
        var serviceBusClient = new ServiceBusClient(connectionString);
        Console.WriteLine("ServiceBusClient is created");
        //ReceiverOptions to access dead letter queue
        var receiverOptions = new ServiceBusReceiverOptions { SubQueue = SubQueue.DeadLetter };
        Console.WriteLine("receiverOption is created");
        //Create receiver to access the deadletter queue of main queue
        var receiver = serviceBusClient.CreateReceiver(queueName, receiverOptions);
        Console.WriteLine("receiver is created");
        // serviceBusSender used to send the message to service bus
        ServiceBusSender sender;
        sender= serviceBusClient.CreateSender(queueName);
        
        IList<ServiceBusReceivedMessage> receivedMessages = (IList<ServiceBusReceivedMessage>)await receiver.ReceiveMessagesAsync(countDLQMessages);
        Console.WriteLine("read all the messages in service bus DLQ");
        var totalMessageCount = receivedMessages.Count;
        if (totalMessageCount == 0)
        {
            return "No Message is available in DLQ";
        }
        // Binary search on deadletter messages in DLQ on sequence number
        Int32 lower = 0;
            Int32 upper = receivedMessages.Count - 1;
            Console.WriteLine("ReceivedMessage List");
            Console.WriteLine(receivedMessages[0].Body);
            int sequenceFlag = 0;
            while (lower <= upper)
            {
                Int32 middle = lower + (upper - lower) / 2;
                if (seqNum == receivedMessages[middle].SequenceNumber)
                {
                    var Body = receivedMessages[middle].Body;
                    var MessageId = receivedMessages[middle].MessageId;
                    var CorrelationId = receivedMessages[middle].CorrelationId;
                    var msg = new ServiceBusMessage
                    {
                        Body = Body,
                        MessageId = MessageId,
                        CorrelationId = CorrelationId
                    };
                    //Sending the dead letter message to active queue along with its other properties i.e MessageId, etc.
                    await sender.SendMessageAsync(msg);
                    Console.WriteLine(
quot;Message has been published from dead letter to Active Queue.");
                    //complete the dead letter message which will remove the message from dead letter queue
                    await receiver.CompleteMessageAsync(receivedMessages[middle]);
                    //Changing the sequenceFlag value to 1, if requested sequence number exists in DLQ.
                    sequenceFlag = 1;
                    //clean up the service bus resources used by sender and receiver
                    await sender.DisposeAsync();
                    await receiver.DisposeAsync();
                    break;
                }
                else if (seqNum < receivedMessages[middle].SequenceNumber)
                    upper = middle - 1;
                else
                    lower = middle + 1;
            }
       
        if (sequenceFlag != 1)
        {
            return (
quot;Sequence number: {seqNum} doesn't exist in queue - {queueName}");
        }
        else
        {
            return "Data is moved to Active queue";
        }
        
    }

This code works if we have less than 100 of messages in DLQ, but when we have more than 100 or 1000 of messages in DLQ, it doesn't read the dead letter messages available in DLQ and throws an output as message is not available for sequence number 'xyz'.
This issue happens due to the ReceiveMessagesAsync(MaxMessages) method, as it doesn't guarantee that it will return the exact same number of messages as requested in its parameter.

Do we have any other method from which we can extract n number of messages at once and can perform the further action on the same?

I have Also tried to peek the message based on sequence number and sends its content to active queue as a new message but I am not able to complete that specific message in DLQ.

Can we delete/complete the dead letter message based on sequence number?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

天涯离梦残月幽梦 2025-02-11 00:55:49

使用收到的ageSageAsync方法,不能保证接收n个消息数。如果您需要接收n个消息,则需要循环直到达到该金额,例如:

var remaining = numMessages;
var receivedMsgs = new List<ServiceBusReceivedMessage>();
while (remaining > 0)
{
    // loop in case we don't receive all messages in one attempt
    var received = await receiver.ReceiveMessagesAsync(remaining);
    receivedMsgs.AddRange(received);
    remaining -= received.Count;
}

如果您能够以一种可以推迟消息而不是在这种情况下延期deadletterter的方式,则需要循环。您可以根据序列编号直接接收递延消息:

var deferredMessage = await receiver.ReceiveDeferredMessageAsync(seqNumber);

Receiving N number of messages is not guaranteed when using the ReceiveMessageAsync method. If you need to receive N messages, you would need to loop until you've reached that amount, e.g.:

var remaining = numMessages;
var receivedMsgs = new List<ServiceBusReceivedMessage>();
while (remaining > 0)
{
    // loop in case we don't receive all messages in one attempt
    var received = await receiver.ReceiveMessagesAsync(remaining);
    receivedMsgs.AddRange(received);
    remaining -= received.Count;
}

Alternatively, if you are able to design your application in such a way that the messages can be deferred rather than deadlettered for this scenario, you can receive deferred messages directly based on the sequence number:

var deferredMessage = await receiver.ReceiveDeferredMessageAsync(seqNumber);
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文