如何用Redisson锁定并解锁反应堆通量流?
我有一个通量流,该流从数据库中读取对象。
对于这些对象,我都有一个要运行的处理函数。
我希望处理函数在给定对象的ID上获取REDIS锁定后,并在处理后释放锁定后执行(如果处理功能会引发错误)。
创建这样的流中最简单的通量方法是什么?
这是我尝试使用转换尝试执行此操作的一些代码。
我可能可以通过锁定功能,该函数将作为afterlock.flatmap(func)
附加,但我正在寻找可以避免的解决方案。
我希望这与该流的其余部分一样透明,而不需要锁定和解锁功能的单独附件,只有一个可以执行“ Lock-Process-unlock”的附件。
private <T> Function<Flux<T>, Publisher<T>> withLock(Function<T, String> keyExtractor) {
return flux -> {
Flux<T> afterLock = flux.flatMap(ev -> redis.getLock(keyExtractor.apply(ev)).lock(1000L, TimeUnit.MILLISECONDS).map(ret -> ev));
// processing logic should be attached somewhere here
afterLock
.flatMap(ret -> redis.getLock(keyExtractor.apply(ret)).unlock()
.thenReturn(ret)
.onErrorResume(e -> redis.getLock(keyExtractor.apply(ret)).unlock().thenReturn(ret)));
return afterLock;
};
}
Flux.just(someObjectFromDatabase)
.transform(withLock(t -> t.id()))
.flatMap(this::objectProcessor)
I have a Flux stream that reads objects from the database.
For each of these objects, I have a processing function to be run.
I want the processing function to execute after acquiring Redis lock on ID of given object, and after processing release the lock (also if the processing function throws an error).
What's the easiest way in Flux to create such a stream?
Here is some code of my failed attempt at doing this with transform.
I could probably make withLock take a function which would be attached as afterLock.flatMap(func)
, but I am looking for a solution that can avoid that.
I would like this to be as transparent to the rest of the stream as possible, and not require seperate attachment of lock and unlock functions, just one attachment that can do "lock-process-unlock".
private <T> Function<Flux<T>, Publisher<T>> withLock(Function<T, String> keyExtractor) {
return flux -> {
Flux<T> afterLock = flux.flatMap(ev -> redis.getLock(keyExtractor.apply(ev)).lock(1000L, TimeUnit.MILLISECONDS).map(ret -> ev));
// processing logic should be attached somewhere here
afterLock
.flatMap(ret -> redis.getLock(keyExtractor.apply(ret)).unlock()
.thenReturn(ret)
.onErrorResume(e -> redis.getLock(keyExtractor.apply(ret)).unlock().thenReturn(ret)));
return afterLock;
};
}
Flux.just(someObjectFromDatabase)
.transform(withLock(t -> t.id()))
.flatMap(this::objectProcessor)
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(2)
解决方案之一是使用
单声道。使用
允许使用异步操作来用于资源供应商,资源关闭和清理。在我们的情况下,我们将redis锁定到
lockservice
看起来如下One of the solution is to use
Mono.usingWhen
that allows to use async operations for resource supplier, resource closure and cleanup.In our case we wrapped Redis lock into
LockService
that looks like the following感谢您的回答@Alex,与此同时,我能够提供类似的东西,在组织流和失败方面非常灵活(花了我一段时间来掩盖边缘案例...)
它可以用作调用
stream.flatmap(with lock(...,处理器)
)Thanks for your answer @Alex, in the meantime I was able to come with something like this which is very flexible in terms of organizing the stream and resilent to failures (took me a while to cover edge cases...)
It can be used as a call to
stream.flatMap(withLock(..., processor)