暂停 spring SqsListener 从队列中提取消息

发布于 2025-01-15 15:24:40 字数 916 浏览 0 评论 0原文

目前我有一个 SQS 队列,重试 5 次后将消息移动到 DLQ,我使用 spring SqsListener 来读取队列:

@SqsListener(
    value = ["\${messaging.queue.orders}"],
    deletionPolicy = SqsMessageDeletionPolicy.ON_SUCCESS
)
fun handleOrders(message: String) {
    // process order
}

现在我尝试使用功能切换来暂停侦听器,但我找不到办法做到这一点,我尝试手动使用Acknowledgment,但如果我不发送成功确认事件,消息就会移动到DLQ:

@SqsListener(
    value = ["\${messaging.queue.orders}"],
    deletionPolicy = SqsMessageDeletionPolicy.NEVER
)
fun handleOrders(message: String, ack: Acknowledgment) {
    if(toggle.orders.enabled) {
        // process order
        ack.acknowledge()
    }
    // else don not process or ack the message
}

正如我所说,问题上面的解决方案是,如果我不执行 ack.acknowledge() ,消息将在第五次后移至 DLQ,因此我需要暂停/停止 SqsListener 如果禁用切换,则从队列中提取消息;如果再次启用,则启动/恢复。

Currently I have a SQS queue that move the messages to a DLQ after 5 retries, I'm using spring SqsListener to read the queue:

@SqsListener(
    value = ["\${messaging.queue.orders}"],
    deletionPolicy = SqsMessageDeletionPolicy.ON_SUCCESS
)
fun handleOrders(message: String) {
    // process order
}

Now I'm trying to use a feature toggle to pause the listener, but I could not find a way to do that, I tried to use the Acknowledgment manually, but the message is move to the DLQ if I don't send the success ack event:

@SqsListener(
    value = ["\${messaging.queue.orders}"],
    deletionPolicy = SqsMessageDeletionPolicy.NEVER
)
fun handleOrders(message: String, ack: Acknowledgment) {
    if(toggle.orders.enabled) {
        // process order
        ack.acknowledge()
    }
    // else don not process or ack the message
}

As I said, the problem with the solution above is that if I don't execute the ack.acknowledge() the message is moved to the DLQ after the fifth time, so I would need to pause/stop the SqsListener to pull messages from the Queue if the toggle is disabled, and start/resume if it's enabled again.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

羅雙樹 2025-01-22 15:24:40

我有类似的问题。

谷歌搜索,我发现这个问题spring-cloud-aws-sqs

它描述了此解决方法

  1. 获取包装 SQS 侦听器的侦听器容器的 bean(从 SpringBoot 应用程序上下文)
  2. 在它们上调用 stop() 方法

。 href="https://github.com/awspring/spring-cloud-aws" rel="nofollow noreferrer">spring-cloud-aws, @SqsListener 仍然存在。

您可能想要自动装配的侦听器容器类是 io.awspring.cloud.sqs.listener.SqsMessageListenerContainer (调用其继承的 io.awspring.cloud.sqs.listener.AbstractMessageListenerContainer#stop< /代码>)。

I'm having a similar issue.

Googling it, I found this issue on the archived predecessor of spring-cloud-aws-sqs.

It describes this workaround:

  1. Get the beans of the listener container that wraps the SQS listener (from the SpringBoot application context)
  2. Call the stop() method on them

Nowadays in spring-cloud-aws, @SqsListener is still there.

The listener container class you may want to autowire is io.awspring.cloud.sqs.listener.SqsMessageListenerContainer (to call its inherited io.awspring.cloud.sqs.listener.AbstractMessageListenerContainer#stop).

爱的十字路口 2025-01-22 15:24:40

您可以使用 @ConditionalOnExpression 来切换 SqsListener 的创建,

例如:

@SqsListener(
    value = ["\${messaging.queue.orders}"],
    deletionPolicy = SqsMessageDeletionPolicy.NEVER
)
@ConditionalOnExpression("\${toggle.orders.enabled:false}")
fun handleOrders(message: String) {
    // process order
}

如果更改属性,则将以下内容添加到 application.yaml 中

toggle:
  orders:
    enabled: true

,除非您使用 spring,否则必须重新启动服务器云进行配置

您还可以使用@ConditionalOnProperty(prefix = "toggle", name = "orders.enabled")

You can toggle creation of SqsListener using @ConditionalOnExpression

Like:

@SqsListener(
    value = ["\${messaging.queue.orders}"],
    deletionPolicy = SqsMessageDeletionPolicy.NEVER
)
@ConditionalOnExpression("\${toggle.orders.enabled:false}")
fun handleOrders(message: String) {
    // process order
}

And adding following to application.yaml

toggle:
  orders:
    enabled: true

if you change the property, you will have to restart the server unless you are using spring cloud for configuration

You can also use @ConditionalOnProperty(prefix = "toggle", name = "orders.enabled")

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文