AWS SQS 监听与轮询

发布于 2025-01-09 17:20:32 字数 2251 浏览 1 评论 0原文

我目前已经在 Fargate 上运行的 Spring Boot 项目中实现了 SQS 侦听器。

有可能在幕后,看似监听器的 SqsAsyncClient 实际上正在轮询。

另外,作为 PoC,我在不同的队列上实现了 Lambda 函数触发器。当队列中有项目并将其发布到我的服务时,这将被调用。这对我来说似乎不必要地复杂,但如果我只有一个服务实例,则可以消除单点故障。

我想我的主要困惑点是我是否不必要地担心 SQS 队列上的轮询与监听以及它是否重要。

用于示例目的的代码:

@Component
@Slf4j
@RequiredArgsConstructor
public class SqsListener {

private final SqsAsyncClient sqsAsyncClient;
private final Environment environment;
private final SmsMessagingServiceImpl smsMessagingService;

@PostConstruct
public void continuousListener() {
    String queueUrl = environment.getProperty("aws.sqs.sms.queueUrl");
    Mono<ReceiveMessageResponse> responseMono = receiveMessage(queueUrl);
    Flux<Message> messages = getItems(responseMono);
    messages.subscribe(message -> disposeOfFlux(message, queueUrl));
}

protected Flux<Message> getItems(Mono<ReceiveMessageResponse> responseMono) {
   return responseMono.repeat().retry()
            .map(ReceiveMessageResponse::messages)
            .map(Flux::fromIterable)
            .flatMap(messageFlux -> messageFlux);

}

protected void disposeOfFlux(Message message, String queueUrl) {
    log.info("Inbound SMS Received from SQS with MessageId: {}", message.messageId());
    if (someConditionIsMet()) 
        deleteMessage(queueUrl, message);
}

protected Mono<ReceiveMessageResponse> receiveMessage(String queueUrl) {
    return Mono.fromFuture(() -> sqsAsyncClient.receiveMessage(
                    ReceiveMessageRequest.builder()
                            .maxNumberOfMessages(5)
                            .messageAttributeNames("All")
                            .queueUrl(queueUrl)
                            .waitTimeSeconds(10)
                            .visibilityTimeout(30)
                            .build()));
}

protected void deleteMessage(String queueUrl, Message message) {
    sqsAsyncClient.deleteMessage(DeleteMessageRequest.builder()
                    .queueUrl(queueUrl)
                    .receiptHandle(message.receiptHandle())
                    .build())
            .thenAccept(deleteMessageResponse -> log.info("deleted message with handle {}", message.receiptHandle()));
}
}

I currently have implemented in a Spring Boot project running on Fargate an SQS listener.

It's possible that under the hood, the SqsAsyncClient which appears to be a listener, is actually polling though.

Separately, as a PoC, on I implemented a Lambda function trigger on a different queue. This would be invoked when there are items in the queue and would post to my service. This seems unnecessarily complex to me but removes a single point of failure if I were to only have one instance of the service.

I guess my major point of confusion is whether I am needlessly worrying about polling vs listening on a SQS queue and whether it matters.

Code for example purposes:

@Component
@Slf4j
@RequiredArgsConstructor
public class SqsListener {

private final SqsAsyncClient sqsAsyncClient;
private final Environment environment;
private final SmsMessagingServiceImpl smsMessagingService;

@PostConstruct
public void continuousListener() {
    String queueUrl = environment.getProperty("aws.sqs.sms.queueUrl");
    Mono<ReceiveMessageResponse> responseMono = receiveMessage(queueUrl);
    Flux<Message> messages = getItems(responseMono);
    messages.subscribe(message -> disposeOfFlux(message, queueUrl));
}

protected Flux<Message> getItems(Mono<ReceiveMessageResponse> responseMono) {
   return responseMono.repeat().retry()
            .map(ReceiveMessageResponse::messages)
            .map(Flux::fromIterable)
            .flatMap(messageFlux -> messageFlux);

}

protected void disposeOfFlux(Message message, String queueUrl) {
    log.info("Inbound SMS Received from SQS with MessageId: {}", message.messageId());
    if (someConditionIsMet()) 
        deleteMessage(queueUrl, message);
}

protected Mono<ReceiveMessageResponse> receiveMessage(String queueUrl) {
    return Mono.fromFuture(() -> sqsAsyncClient.receiveMessage(
                    ReceiveMessageRequest.builder()
                            .maxNumberOfMessages(5)
                            .messageAttributeNames("All")
                            .queueUrl(queueUrl)
                            .waitTimeSeconds(10)
                            .visibilityTimeout(30)
                            .build()));
}

protected void deleteMessage(String queueUrl, Message message) {
    sqsAsyncClient.deleteMessage(DeleteMessageRequest.builder()
                    .queueUrl(queueUrl)
                    .receiptHandle(message.receiptHandle())
                    .build())
            .thenAccept(deleteMessageResponse -> log.info("deleted message with handle {}", message.receiptHandle()));
}
}

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文