SQS - 即使 maxNumberOfMessages 设置为 1,所有消息也会进入飞行状态

发布于 2025-01-19 16:55:39 字数 8365 浏览 0 评论 0原文

我根据SQ中的消息数量有一个定义的POD缩放。我希望每个POD处理1个消息。

因此,如果我有3条消息,我将有3个POD和每个处理1条消息。

这就是我从SQS中撤回消息的方式。使用使用maxnumberofMessages(1)

ReceiveMessageRequest receiveMessageRequest = new 
ReceiveMessageRequest(queueUrl).withMaxNumberOfMessages(1).withWaitTimeSeconds(10);
List<Message> messages = sqs.receiveMessage(receiveMessageRequest).getMessages();
System.out.println("Number of messages - "+messages.size());

,我可以看到它选择的消息数是1。

我面临的问题是当运行1个POD时,排队中的所有消息都进入飞行模式。其余的POD会获取零消息要读取。

为什么发生这种情况。即使我将MaxnumberofMessages指定为1,为什么所有消息都进入飞行模式。我希望它只会选择1条消息,然后该消息进入飞行模式,其余消息仍在队列中,并且可用于其他吊舱,

这就是我在POD启动上运行代码的方式

@EventListener(ApplicationReadyEvent.class)
    public void init() throws InterruptedException {
        SQSS3Event message = sqsRepository.getMessage(queueUrl);
        while(message != null){
            System.out.println(message.getBucketName());
            System.out.println(message.getFileName());
            System.out.println(message.getReceiptHandle());
            sqsRepository.changeMessageVisibility(queueUrl, message.getReceiptHandle(), 70);
            Thread.sleep(60000);
            sqsRepository.changeMessageVisibility(queueUrl, message.getReceiptHandle(), 10);
            sqsRepository.deleteMessage(queueUrl,message.getReceiptHandle());

            message = sqsRepository.getMessage(queueUrl);
        }
        System.out.println("No more messages to process");
    }

,这是从SQS检索消息的帮助方法

public SQSS3Event getMessage(String queueUrl){
    ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(queueUrl).withMaxNumberOfMessages(1).withWaitTimeSeconds(10);
    List<Message> messages = sqs.receiveMessage(receiveMessageRequest).getMessages();
    System.out.println("Number of messages - "+messages.size());
    if(messages.size()>0) {
        S3EventNotification notification = S3EventNotification.parseJson(messages.get(0).getBody());
        return SQSS3Event.builder()
                .bucketName(notification.getRecords().get(0).getS3().getBucket().getName())
                .fileName(notification.getRecords().get(0).getS3().getObject().getKey())
                .receiptHandle(messages.get(0).getReceiptHandle())
                .build();
    }
    else {
        return null;
    }
}

添加了打印语句以在阅读之前和阅读消息后读取队列属性

ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest().withQueueUrl(queueUrl).withMaxNumberOfMessages(1).withWaitTimeSeconds(5);
GetQueueAttributesResult att = sqs.getQueueAttributes(queueUrl, List.of("ApproximateNumberOfMessages", "ApproximateNumberOfMessagesNotVisible"));
System.out.println("##########################Before reading##########################");
System.out.println("No of Messages - "+ att.getAttributes().get("ApproximateNumberOfMessages"));
System.out.println("No of Messages on Flight - "+ att.getAttributes().get("ApproximateNumberOfMessagesNotVisible"));
System.out.println("##################################################################");
ReceiveMessageResult result = sqs.receiveMessage(receiveMessageRequest);
List<Message> messages = result.getMessages();

System.out.println("No of messages in the result - "+messages.size());

S3EventNotification notification = S3EventNotification.parseJson(messages.get(0).getBody());
att = sqs.getQueueAttributes(queueUrl, List.of("ApproximateNumberOfMessages", "ApproximateNumberOfMessagesNotVisible"));
System.out.println("##########################After reading##########################");
System.out.println("No of Messages - "+ att.getAttributes().get("ApproximateNumberOfMessages"));
System.out.println("No of Messages on Flight - "+ att.getAttributes().get("ApproximateNumberOfMessagesNotVisible"));
System.out.println("##################################################################");

输出

##########################Before reading##########################
No of Messages - 4
No of Messages on Flight - 0
##################################################################
No of messages in the result- 1
##########################After reading##########################
No of Messages - 0
No of Messages on Flight - 4
##################################################################

带有跟踪

 ##########################Before reading##########################
No of Messages - 2
No of Messages on Flight - 0
##################################################################
2022-04-07 21:59:24.036 TRACE 21096 --- [           main] c.a.s.sqs.buffered.ReceiveQueueBuffer    : Spawned receive batch #1 (1 of 10 inflight) for queue http://localhost:4576/queue/upload-notifications
2022-04-07 21:59:24.166 DEBUG 21096 --- [rWorkerThread-1] c.a.s.sqs.MessageMD5ChecksumHandler      : Message body: {"Records": [{"eventVersion": "2.0", "eventName": "ObjectCreated:Put", "eventTime": "2022-04-07T19:14:39.050963Z", "userIdentity": {"principalId": "AIDAJDPLRKLG7UEXAMPLE"}, "eventSource": "aws:s3", "requestParameters": {"sourceIPAddress": "127.0.0.1"}, "s3": {"configurationId": "testConfigRule", "object": {"versionId": "096fKKXTRTtl3on89fVO.nfljtsv6qko", "eTag": "d41d8cd98f00b204e9800998ecf8427e", "sequencer": "0055AED6DCD90281E5", "key": "3.txt", "size": 1024}, "bucket": {"arn": "arn:aws:s3:::uploads-toprocess", "name": "uploads-toprocess", "ownerIdentity": {"principalId": "A3NL1KOZZKExample"}}, "s3SchemaVersion": "1.0"}, "responseElements": {"x-amz-id-2": "eftixk72aD6Ap51TnqcoF8eFidJG9Z/2", "x-amz-request-id": "2e30a6e9"}, "awsRegion": "us-east-1"}]}
2022-04-07 21:59:24.167 DEBUG 21096 --- [rWorkerThread-1] c.a.s.sqs.MessageMD5ChecksumHandler      : Expected  MD5 of message body: 9428bd5190b9d47af3368b3f67c62d02
2022-04-07 21:59:24.167 DEBUG 21096 --- [rWorkerThread-1] c.a.s.sqs.MessageMD5ChecksumHandler      : Message body: {"Records": [{"eventVersion": "2.0", "eventName": "ObjectCreated:Put", "eventTime": "2022-04-07T19:15:50.375772Z", "userIdentity": {"principalId": "AIDAJDPLRKLG7UEXAMPLE"}, "eventSource": "aws:s3", "requestParameters": {"sourceIPAddress": "127.0.0.1"}, "s3": {"configurationId": "testConfigRule", "object": {"versionId": "096fKKXTRTtl3on89fVO.nfljtsv6qko", "eTag": "d41d8cd98f00b204e9800998ecf8427e", "sequencer": "0055AED6DCD90281E5", "key": "3.txt", "size": 1024}, "bucket": {"arn": "arn:aws:s3:::uploads-toprocess", "name": "uploads-toprocess", "ownerIdentity": {"principalId": "A3NL1KOZZKExample"}}, "s3SchemaVersion": "1.0"}, "responseElements": {"x-amz-id-2": "eftixk72aD6Ap51TnqcoF8eFidJG9Z/2", "x-amz-request-id": "a8ba5ab3"}, "awsRegion": "us-east-1"}]}
2022-04-07 21:59:24.167 DEBUG 21096 --- [rWorkerThread-1] c.a.s.sqs.MessageMD5ChecksumHandler      : Expected  MD5 of message body: 18ce2856addac8a08c394ce8fbd7d315
2022-04-07 21:59:24.167 TRACE 21096 --- [rWorkerThread-1] c.a.s.sqs.buffered.ReceiveQueueBuffer    : Queue http://localhost:4576/queue/upload-notifications now has 1 receive results cached 
2022-04-07 21:59:24.168 TRACE 21096 --- [rWorkerThread-1] c.a.s.sqs.buffered.ReceiveQueueBuffer    : Spawned receive batch #2 (1 of 10 inflight) for queue http://localhost:4576/queue/upload-notifications
No of messages in the result- 1
##########################After reading##########################
No of Messages - 0
No of Messages on Flight - 2
##################################################################

解决方案 - (获得解决方法。尽管我不完全了解它)

我正在使用弹簧云AWS依赖关系,并且正在使用@Autowired的Amazonsqs实例与队列互动。默认情况下,将10条消息传递到某种bufferqueue中,然后一一提供消息。这就是为什么所有消息都进入飞行然后一一处理的原因。

能够通过定义一个简单的豆来覆盖那个。

@Bean
public SimpleMessageListenerContainer simpleMessageListenerContainer(AmazonSQSAsync amazonSQSAsync) {
    SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer();
    simpleMessageListenerContainer.setAmazonSqs(amazonSQSAsync);
    simpleMessageListenerContainer.setMessageHandler(queueMessageHandler());
    simpleMessageListenerContainer.setMaxNumberOfMessages(1);
    simpleMessageListenerContainer.setTaskExecutor(threadPoolTaskExecutor());
    return simpleMessageListenerContainer;
}

现在,它与与预期的同步使用MaxnumberofMessages(n)

I have a defined pod scaling based on the number of messages in sqs. And i want each pod to process 1 message.

So if i have 3 messages, i will have 3 pods and each processing 1 message.

This is how i am retreiving the messages from sqs. with withMaxNumberOfMessages(1)

ReceiveMessageRequest receiveMessageRequest = new 
ReceiveMessageRequest(queueUrl).withMaxNumberOfMessages(1).withWaitTimeSeconds(10);
List<Message> messages = sqs.receiveMessage(receiveMessageRequest).getMessages();
System.out.println("Number of messages - "+messages.size());

And i can see the number of messages it picks is 1.

Issue that i am facing is when 1 pod run, all the messages in queue goes into flight-mode. And the remaining pods gets zero messages to read.

Why is that happening. even though i specified maxNumberOfMessages to 1 , why all the messages goes into flight-mod. I expect it to just pick 1 message and that message goes into flight-mode and the remaining messages remain in queue and is available for other pods

This is how i ran the code on pod startup

@EventListener(ApplicationReadyEvent.class)
    public void init() throws InterruptedException {
        SQSS3Event message = sqsRepository.getMessage(queueUrl);
        while(message != null){
            System.out.println(message.getBucketName());
            System.out.println(message.getFileName());
            System.out.println(message.getReceiptHandle());
            sqsRepository.changeMessageVisibility(queueUrl, message.getReceiptHandle(), 70);
            Thread.sleep(60000);
            sqsRepository.changeMessageVisibility(queueUrl, message.getReceiptHandle(), 10);
            sqsRepository.deleteMessage(queueUrl,message.getReceiptHandle());

            message = sqsRepository.getMessage(queueUrl);
        }
        System.out.println("No more messages to process");
    }

And this is the helper method for retrieving message from SQS

public SQSS3Event getMessage(String queueUrl){
    ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(queueUrl).withMaxNumberOfMessages(1).withWaitTimeSeconds(10);
    List<Message> messages = sqs.receiveMessage(receiveMessageRequest).getMessages();
    System.out.println("Number of messages - "+messages.size());
    if(messages.size()>0) {
        S3EventNotification notification = S3EventNotification.parseJson(messages.get(0).getBody());
        return SQSS3Event.builder()
                .bucketName(notification.getRecords().get(0).getS3().getBucket().getName())
                .fileName(notification.getRecords().get(0).getS3().getObject().getKey())
                .receiptHandle(messages.get(0).getReceiptHandle())
                .build();
    }
    else {
        return null;
    }
}

Added print statements to read queue attributes before reading and after reading a message

ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest().withQueueUrl(queueUrl).withMaxNumberOfMessages(1).withWaitTimeSeconds(5);
GetQueueAttributesResult att = sqs.getQueueAttributes(queueUrl, List.of("ApproximateNumberOfMessages", "ApproximateNumberOfMessagesNotVisible"));
System.out.println("##########################Before reading##########################");
System.out.println("No of Messages - "+ att.getAttributes().get("ApproximateNumberOfMessages"));
System.out.println("No of Messages on Flight - "+ att.getAttributes().get("ApproximateNumberOfMessagesNotVisible"));
System.out.println("##################################################################");
ReceiveMessageResult result = sqs.receiveMessage(receiveMessageRequest);
List<Message> messages = result.getMessages();

System.out.println("No of messages in the result - "+messages.size());

S3EventNotification notification = S3EventNotification.parseJson(messages.get(0).getBody());
att = sqs.getQueueAttributes(queueUrl, List.of("ApproximateNumberOfMessages", "ApproximateNumberOfMessagesNotVisible"));
System.out.println("##########################After reading##########################");
System.out.println("No of Messages - "+ att.getAttributes().get("ApproximateNumberOfMessages"));
System.out.println("No of Messages on Flight - "+ att.getAttributes().get("ApproximateNumberOfMessagesNotVisible"));
System.out.println("##################################################################");

Output

##########################Before reading##########################
No of Messages - 4
No of Messages on Flight - 0
##################################################################
No of messages in the result- 1
##########################After reading##########################
No of Messages - 0
No of Messages on Flight - 4
##################################################################

With the trace

 ##########################Before reading##########################
No of Messages - 2
No of Messages on Flight - 0
##################################################################
2022-04-07 21:59:24.036 TRACE 21096 --- [           main] c.a.s.sqs.buffered.ReceiveQueueBuffer    : Spawned receive batch #1 (1 of 10 inflight) for queue http://localhost:4576/queue/upload-notifications
2022-04-07 21:59:24.166 DEBUG 21096 --- [rWorkerThread-1] c.a.s.sqs.MessageMD5ChecksumHandler      : Message body: {"Records": [{"eventVersion": "2.0", "eventName": "ObjectCreated:Put", "eventTime": "2022-04-07T19:14:39.050963Z", "userIdentity": {"principalId": "AIDAJDPLRKLG7UEXAMPLE"}, "eventSource": "aws:s3", "requestParameters": {"sourceIPAddress": "127.0.0.1"}, "s3": {"configurationId": "testConfigRule", "object": {"versionId": "096fKKXTRTtl3on89fVO.nfljtsv6qko", "eTag": "d41d8cd98f00b204e9800998ecf8427e", "sequencer": "0055AED6DCD90281E5", "key": "3.txt", "size": 1024}, "bucket": {"arn": "arn:aws:s3:::uploads-toprocess", "name": "uploads-toprocess", "ownerIdentity": {"principalId": "A3NL1KOZZKExample"}}, "s3SchemaVersion": "1.0"}, "responseElements": {"x-amz-id-2": "eftixk72aD6Ap51TnqcoF8eFidJG9Z/2", "x-amz-request-id": "2e30a6e9"}, "awsRegion": "us-east-1"}]}
2022-04-07 21:59:24.167 DEBUG 21096 --- [rWorkerThread-1] c.a.s.sqs.MessageMD5ChecksumHandler      : Expected  MD5 of message body: 9428bd5190b9d47af3368b3f67c62d02
2022-04-07 21:59:24.167 DEBUG 21096 --- [rWorkerThread-1] c.a.s.sqs.MessageMD5ChecksumHandler      : Message body: {"Records": [{"eventVersion": "2.0", "eventName": "ObjectCreated:Put", "eventTime": "2022-04-07T19:15:50.375772Z", "userIdentity": {"principalId": "AIDAJDPLRKLG7UEXAMPLE"}, "eventSource": "aws:s3", "requestParameters": {"sourceIPAddress": "127.0.0.1"}, "s3": {"configurationId": "testConfigRule", "object": {"versionId": "096fKKXTRTtl3on89fVO.nfljtsv6qko", "eTag": "d41d8cd98f00b204e9800998ecf8427e", "sequencer": "0055AED6DCD90281E5", "key": "3.txt", "size": 1024}, "bucket": {"arn": "arn:aws:s3:::uploads-toprocess", "name": "uploads-toprocess", "ownerIdentity": {"principalId": "A3NL1KOZZKExample"}}, "s3SchemaVersion": "1.0"}, "responseElements": {"x-amz-id-2": "eftixk72aD6Ap51TnqcoF8eFidJG9Z/2", "x-amz-request-id": "a8ba5ab3"}, "awsRegion": "us-east-1"}]}
2022-04-07 21:59:24.167 DEBUG 21096 --- [rWorkerThread-1] c.a.s.sqs.MessageMD5ChecksumHandler      : Expected  MD5 of message body: 18ce2856addac8a08c394ce8fbd7d315
2022-04-07 21:59:24.167 TRACE 21096 --- [rWorkerThread-1] c.a.s.sqs.buffered.ReceiveQueueBuffer    : Queue http://localhost:4576/queue/upload-notifications now has 1 receive results cached 
2022-04-07 21:59:24.168 TRACE 21096 --- [rWorkerThread-1] c.a.s.sqs.buffered.ReceiveQueueBuffer    : Spawned receive batch #2 (1 of 10 inflight) for queue http://localhost:4576/queue/upload-notifications
No of messages in the result- 1
##########################After reading##########################
No of Messages - 0
No of Messages on Flight - 2
##################################################################

Solution - (Got a workaround. Eventhough i don't fully understand it)

I was using spring cloud aws dependencies and was using an @Autowired AmazonSQS instance to interact with the queue. That one by default, gets 10 messages into some kind of BufferQueue and then serves the messages one by one. That was the reason why all messages goes into flight and then processed one by one.

Was able to override that one by defining a simple bean.

@Bean
public SimpleMessageListenerContainer simpleMessageListenerContainer(AmazonSQSAsync amazonSQSAsync) {
    SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer();
    simpleMessageListenerContainer.setAmazonSqs(amazonSQSAsync);
    simpleMessageListenerContainer.setMessageHandler(queueMessageHandler());
    simpleMessageListenerContainer.setMaxNumberOfMessages(1);
    simpleMessageListenerContainer.setTaskExecutor(threadPoolTaskExecutor());
    return simpleMessageListenerContainer;
}

And now it works in sync with withMaxNumberOfMessages(n) as expected

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

水水月牙 2025-01-26 16:55:39

解决方案-(找到了解决方法。尽管我不完全理解)

我正在使用 spring cloud aws 依赖项,并使用 @Autowired AmazonSQS 实例与队列交互。默认情况下,它将 10 条消息放入某种 BufferQueue 中,然后一条一条地提供这些消息。这就是为什么所有消息都会传播,然后一一处理的原因。

能够通过定义一个简单的 bean 来覆盖它。

@Bean
public SimpleMessageListenerContainer simpleMessageListenerContainer(AmazonSQSAsync amazonSQSAsync) {
    SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer();
    simpleMessageListenerContainer.setAmazonSqs(amazonSQSAsync);
    simpleMessageListenerContainer.setMessageHandler(queueMessageHandler());
    simpleMessageListenerContainer.setMaxNumberOfMessages(1);
    simpleMessageListenerContainer.setTaskExecutor(threadPoolTaskExecutor());
    return simpleMessageListenerContainer;
}

现在它按预期与 withMaxNumberOfMessages(n) 同步工作

Solution - (Got a workaround. Eventhough i don't fully understand it)

I was using spring cloud aws dependencies and was using an @Autowired AmazonSQS instance to interact with the queue. That one by default, gets 10 messages into some kind of BufferQueue and then serves the messages one by one. That was the reason why all messages goes into flight and then processed one by one.

Was able to override that one by defining a simple bean.

@Bean
public SimpleMessageListenerContainer simpleMessageListenerContainer(AmazonSQSAsync amazonSQSAsync) {
    SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer();
    simpleMessageListenerContainer.setAmazonSqs(amazonSQSAsync);
    simpleMessageListenerContainer.setMessageHandler(queueMessageHandler());
    simpleMessageListenerContainer.setMaxNumberOfMessages(1);
    simpleMessageListenerContainer.setTaskExecutor(threadPoolTaskExecutor());
    return simpleMessageListenerContainer;
}

And now it works in sync with withMaxNumberOfMessages(n) as expected

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文