暂停 spring SqsListener 从队列中提取消息
目前我有一个 SQS 队列,重试 5 次后将消息移动到 DLQ,我使用 spring SqsListener
来读取队列:
@SqsListener(
value = ["\${messaging.queue.orders}"],
deletionPolicy = SqsMessageDeletionPolicy.ON_SUCCESS
)
fun handleOrders(message: String) {
// process order
}
现在我尝试使用功能切换来暂停侦听器,但我找不到办法做到这一点,我尝试手动使用Acknowledgment
,但如果我不发送成功确认事件,消息就会移动到DLQ:
@SqsListener(
value = ["\${messaging.queue.orders}"],
deletionPolicy = SqsMessageDeletionPolicy.NEVER
)
fun handleOrders(message: String, ack: Acknowledgment) {
if(toggle.orders.enabled) {
// process order
ack.acknowledge()
}
// else don not process or ack the message
}
正如我所说,问题上面的解决方案是,如果我不执行 ack.acknowledge() ,消息将在第五次后移至 DLQ,因此我需要暂停/停止 SqsListener 如果禁用切换,则从队列中提取消息;如果再次启用,则启动/恢复。
Currently I have a SQS queue that move the messages to a DLQ after 5 retries, I'm using spring SqsListener
to read the queue:
@SqsListener(
value = ["\${messaging.queue.orders}"],
deletionPolicy = SqsMessageDeletionPolicy.ON_SUCCESS
)
fun handleOrders(message: String) {
// process order
}
Now I'm trying to use a feature toggle to pause the listener, but I could not find a way to do that, I tried to use the Acknowledgment
manually, but the message is move to the DLQ if I don't send the success ack event:
@SqsListener(
value = ["\${messaging.queue.orders}"],
deletionPolicy = SqsMessageDeletionPolicy.NEVER
)
fun handleOrders(message: String, ack: Acknowledgment) {
if(toggle.orders.enabled) {
// process order
ack.acknowledge()
}
// else don not process or ack the message
}
As I said, the problem with the solution above is that if I don't execute the ack.acknowledge()
the message is moved to the DLQ after the fifth time, so I would need to pause/stop the SqsListener
to pull messages from the Queue if the toggle is disabled, and start/resume if it's enabled again.
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(2)
我有类似的问题。
谷歌搜索,我发现这个问题spring-cloud-aws-sqs。
它描述了此解决方法:
stop()
方法。 href="https://github.com/awspring/spring-cloud-aws" rel="nofollow noreferrer">spring-cloud-aws, @SqsListener 仍然存在。
您可能想要自动装配的侦听器容器类是 io.awspring.cloud.sqs.listener.SqsMessageListenerContainer (调用其继承的 io.awspring.cloud.sqs.listener.AbstractMessageListenerContainer#stop< /代码>)。
I'm having a similar issue.
Googling it, I found this issue on the archived predecessor of spring-cloud-aws-sqs.
It describes this workaround:
stop()
method on themNowadays in spring-cloud-aws, @SqsListener is still there.
The listener container class you may want to autowire is
io.awspring.cloud.sqs.listener.SqsMessageListenerContainer
(to call its inheritedio.awspring.cloud.sqs.listener.AbstractMessageListenerContainer#stop
).您可以使用 @ConditionalOnExpression 来切换 SqsListener 的创建,
例如:
如果更改属性,则将以下内容添加到 application.yaml 中
,除非您使用 spring,否则必须重新启动服务器云进行配置
您还可以使用@ConditionalOnProperty(prefix = "toggle", name = "orders.enabled")
You can toggle creation of SqsListener using
@ConditionalOnExpression
Like:
And adding following to
application.yaml
if you change the property, you will have to restart the server unless you are using spring cloud for configuration
You can also use
@ConditionalOnProperty(prefix = "toggle", name = "orders.enabled")