我正在研究一个项目,在该项目中,如果检测到有效的事件,则将数据库进行轮询,然后将服务激活器称为下游,以获取有关事件的信息。执行一些处理,然后将结果写回数据库。
我要实现的目的是在数据库下降或任何下游服务都无法使用的情况下,我的微服务将在可配置的时间内暂停轮询,之后它将再次开始。
到目前为止,我已经研究了CurriverBreakerAdvice和RetryAdvice,但似乎适用于服务激活器,而不是入站通道适配器。我还知道Resilience4J提供了一种全面的断路机制,但我没有找到将其实施到我的项目中的方法。
我提出的解决方案是实施接收活动,以使投票主动并将其传递给投票程序。错误通道将跟踪累积的错误数量以及达到配置的阈值时,它将投票权属性设置为false。至于重新激活投票,我有点卡住。我的猜测是安排一项任务,以便在一段时间后将价值更改回真实,但不确定在哪里或如何做。
频道
@Bean
public IntegrationFlow readDBMessage() {
return IntegrationFLows.fromSupplier(
() -> dbService.readMessage(),
channelAdapter ->
channelAdapter.poller(
pollerSpec ->
pollerSpec.fixedDelay(
\\polling period)
.advice(messagePollingControlAdvice())
.channel("apiCallChannel")
.get();
}
Message PollingControlAdvice
public static class MessagePollingControlAdvice implements ReceiveMessageAdvice {
private volatile boolean pollingActive = false;
@Override
public boolean beforeReceive(Object source) {
return pollingActive;
}
@Override
public Message<?> afterReceive(Message<?> result, Object source) {
return result;
}
public boolean isPollingActive() {
return pollingActive;
}
//call this method from whatever place in your code to activate/deactivate poller
public void setPollingActive(boolean pollingActive) {
this.pollingActive = pollingActive;
}
}
从
关于我应该如何继续这样做的任何建议?
文档中是否缺少一些东西?
更新
谢谢Artem!
我已经实施了Artem给出的建议。以下是参考的代码,以防其他任何人陷入其中。
MessagePollingControlAdvice
public static class MessagePollingControlAdvice implements ReceiveMessageAdvice {
private volatile boolean pollingActive = false;
private volatile Long pollingDeactivatedTime = Instant.now().getEpochSecond();
@Override
public boolean beforeReceive(Object source) {
// Get the desired time from configuration file
if (!pollingActive && (Instant.now().getEpochSecond() - pollingDeactivatedTime) > 30) {
pollingActive = true;
}
return pollingActive;
}
@Override
public Message<?> afterReceive(Message<?> result, Object source) {
return result;
}
public boolean isPollingActive() {
return pollingActive;
}
//call this method from whatever place in your code to activate/deactivate poller
public void setPollingActive(boolean pollingActive) {
this.pollingDeactivatedTime = Instant.now().getEpochSecond();
this.pollingActive = pollingActive;
}
}
我已经看了 simpleeeactiveIdlereCeiveMessageAgeadVice ,并且肯定还将在我的代码中实现一些逻辑。
作为一个后续问题:据我了解,即使在投票期间发生错误时,建议的代码也会执行,因此是否可以跟踪此类中的错误并将逻辑扩展到内部停用投票?
I am working on a project in which a database is polled for events, if a valid event is detected then service activators are called downstream to get information about the event. Some processing is performed and then the result is written to back to the database.
What I am trying to achieve is in the case in which the database is down or any of the downstream services are unavailable then my microservice would pause polling for a configurable amount of time after which it would start again.
So far I have looked into CircuitBreakerAdvice and RetryAdvice but those seem to apply to service activators and not inbound channel adapters. I am also aware that Resilience4j provides a comprehensive circuit breaker mechanism but I have found no way to implement it into my project.
The solution that I have come up with is to implement a ReceiveMessageAdvice which sets the polling active and pass it to the poller. The error channel will keep track of the number of errors that have accumulated and when a configured threshold is reached it sets the pollingActive attribute to false. As for reactivating the polling I am a bit stuck. My guess would be scheduling a task to change value back to true after some time but am unsure of where or how to do it.
Polling Channel
@Bean
public IntegrationFlow readDBMessage() {
return IntegrationFLows.fromSupplier(
() -> dbService.readMessage(),
channelAdapter ->
channelAdapter.poller(
pollerSpec ->
pollerSpec.fixedDelay(
\\polling period)
.advice(messagePollingControlAdvice())
.channel("apiCallChannel")
.get();
}
MessagePollingControlAdvice
public static class MessagePollingControlAdvice implements ReceiveMessageAdvice {
private volatile boolean pollingActive = false;
@Override
public boolean beforeReceive(Object source) {
return pollingActive;
}
@Override
public Message<?> afterReceive(Message<?> result, Object source) {
return result;
}
public boolean isPollingActive() {
return pollingActive;
}
//call this method from whatever place in your code to activate/deactivate poller
public void setPollingActive(boolean pollingActive) {
this.pollingActive = pollingActive;
}
}
Taken from How to stop OR change delay of Spring Integration Poller
Any advice on how I should go on about doing this?
Is there something that I am missing in the documentation?
UPDATE
Thank you Artem!
I have implemented the suggestion that Artem has given. Below is the code for reference in case anyone else runs into this.
MessagePollingControlAdvice
public static class MessagePollingControlAdvice implements ReceiveMessageAdvice {
private volatile boolean pollingActive = false;
private volatile Long pollingDeactivatedTime = Instant.now().getEpochSecond();
@Override
public boolean beforeReceive(Object source) {
// Get the desired time from configuration file
if (!pollingActive && (Instant.now().getEpochSecond() - pollingDeactivatedTime) > 30) {
pollingActive = true;
}
return pollingActive;
}
@Override
public Message<?> afterReceive(Message<?> result, Object source) {
return result;
}
public boolean isPollingActive() {
return pollingActive;
}
//call this method from whatever place in your code to activate/deactivate poller
public void setPollingActive(boolean pollingActive) {
this.pollingDeactivatedTime = Instant.now().getEpochSecond();
this.pollingActive = pollingActive;
}
}
I have taken a look at SimpleActiveIdleReceiveMessageAdvice and will certainly implement some of the logic in my code as well.
As a follow up question: from what I understand the code inside the advice gets executed even when an error occurs during polling, so is it possible to keep track of errors in this class and extend the logic to deactivate polling from inside it?
发布评论
评论(1)
您可以在
MessagePollingControladvice
中进行超时检查。由于仍计划在频道适配器上的此调查器,因此无需进行额外的计划任务。因此,在该
beforeReceive()
中,您不仅可以跟踪当前状态,还可以跟踪以前打开的更改的时间。SimpleActiveIdlereceiveMessageAgeadVice
也应该是整个算法的一个很好的补充。You can do that timeout check in your
MessagePollingControlAdvice
. There is no need in extra scheduled task since this poller on the channel adapter is still scheduled.So, in that
beforeReceive()
you can track not only the current state, but also the time from the previous opened change.The
SimpleActiveIdleReceiveMessageAdvice
should be also a good addition to your whole algorithm.