如何用Redisson锁定并解锁反应堆通量流?

发布于 2025-01-20 22:32:58 字数 1312 浏览 2 评论 0原文

我有一个通量流,该流从数据库中读取对象。
对于这些对象,我都有一个要运行的处理函数。
我希望处理函数在给定对象的ID上获取REDIS锁定后,并在处理后释放锁定后执行(如果处理功能会引发错误)。

创建这样的流中最简单的通量方法是什么?
这是我尝试使用转换尝试执行此操作的一些代码。
我可能可以通过锁定功能,该函数将作为afterlock.flatmap(func)附加,但我正在寻找可以避免的解决方案。

我希望这与该流的其余部分一样透明,而不需要锁定和解锁功能的单独附件,只有一个可以执行“ Lock-Process-unlock”的附件。


        private <T> Function<Flux<T>, Publisher<T>> withLock(Function<T, String> keyExtractor) {
          
            return flux -> {
                    Flux<T> afterLock = flux.flatMap(ev -> redis.getLock(keyExtractor.apply(ev)).lock(1000L, TimeUnit.MILLISECONDS).map(ret -> ev));
    
    // processing logic should be attached somewhere here
    
                afterLock
                        .flatMap(ret -> redis.getLock(keyExtractor.apply(ret)).unlock()
                                .thenReturn(ret)
                                .onErrorResume(e -> redis.getLock(keyExtractor.apply(ret)).unlock().thenReturn(ret)));
    
                return afterLock;
    
            };
        }


        Flux.just(someObjectFromDatabase)
                .transform(withLock(t -> t.id()))
                .flatMap(this::objectProcessor)

I have a Flux stream that reads objects from the database.
For each of these objects, I have a processing function to be run.
I want the processing function to execute after acquiring Redis lock on ID of given object, and after processing release the lock (also if the processing function throws an error).

What's the easiest way in Flux to create such a stream?
Here is some code of my failed attempt at doing this with transform.
I could probably make withLock take a function which would be attached as afterLock.flatMap(func), but I am looking for a solution that can avoid that.

I would like this to be as transparent to the rest of the stream as possible, and not require seperate attachment of lock and unlock functions, just one attachment that can do "lock-process-unlock".


        private <T> Function<Flux<T>, Publisher<T>> withLock(Function<T, String> keyExtractor) {
          
            return flux -> {
                    Flux<T> afterLock = flux.flatMap(ev -> redis.getLock(keyExtractor.apply(ev)).lock(1000L, TimeUnit.MILLISECONDS).map(ret -> ev));
    
    // processing logic should be attached somewhere here
    
                afterLock
                        .flatMap(ret -> redis.getLock(keyExtractor.apply(ret)).unlock()
                                .thenReturn(ret)
                                .onErrorResume(e -> redis.getLock(keyExtractor.apply(ret)).unlock().thenReturn(ret)));
    
                return afterLock;
    
            };
        }


        Flux.just(someObjectFromDatabase)
                .transform(withLock(t -> t.id()))
                .flatMap(this::objectProcessor)

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

明明#如月 2025-01-27 22:32:58

解决方案之一是使用单声道。使用允许使用异步操作来用于资源供应商,资源关闭和清理。

Mono.usingWhen(
   lockService.acquire(key),
   lock -> process(),
   lock -> lockService.release(lock)
);

在我们的情况下,我们将redis锁定到lockservice看起来如下

public interface ReactiveLockService {

    Mono<LockEntry> acquire(String key, Duration expireAfter);

    Mono<Void> release(LockEntry lock);

    interface LockEntry {
        String getKey();

        String getValue();
    }
}

One of the solution is to use Mono.usingWhen that allows to use async operations for resource supplier, resource closure and cleanup.

Mono.usingWhen(
   lockService.acquire(key),
   lock -> process(),
   lock -> lockService.release(lock)
);

In our case we wrapped Redis lock into LockService that looks like the following

public interface ReactiveLockService {

    Mono<LockEntry> acquire(String key, Duration expireAfter);

    Mono<Void> release(LockEntry lock);

    interface LockEntry {
        String getKey();

        String getValue();
    }
}
囍笑 2025-01-27 22:32:58

感谢您的回答@Alex,与此同时,我能够提供类似的东西,在组织流和失败方面非常灵活(花了我一段时间来掩盖边缘案例...)
它可以用作调用stream.flatmap(with lock(...,处理器)

    public static <T> Function<T, Flux<T>> withLock(
            long tid, String lockPrefix, int lockTimeMs, Function<T, String> keyExtractor, Function<Mono<T>, Flux<T>> processor, RedissonReactiveClient redis) {

        // If Redis lock or unlock operations fail, that will on purpose propagate the error.
        // If processor throws an error, lock will be unlocked first before error propagation.

        // tid has to be unique for each local task, it's a virtual "thread id" so if it's used concurrently locks will not protect the code

        return flux -> {
            Function<T, RLockReactive> getLock = ev -> redis.getLock(lockPrefix + keyExtractor.apply(ev));
            RLockReactive lock = getLock.apply(flux);

            Supplier<Mono<T>> unlock = () -> lock.unlock(tid).then(Mono.<T>empty());
            Supplier<Mono<T>> doLock = () -> lock.lock(lockTimeMs, TimeUnit.MILLISECONDS, tid).then(Mono.<T>empty());

            // Careful not to call map/flatMap on redis.lock/redis.unlock which returns Void and so it won't trigger on empty stream...lol!
            return Flux.concat(
                    Mono.defer(doLock),
                    Flux.defer(() -> processor.apply(Mono.just(flux))
                            .onErrorResume(err -> unlock.get()
                                    .onErrorResume(unlockError -> {
                                        err.addSuppressed(unlockError);
                                        // Propagate original processor error, but note the unlock error as suppressed
                                        return Mono.error(err);
                                    })
                                    .then(Mono.error(err)))),
                    Mono.defer(unlock)
            );
        };

Thanks for your answer @Alex, in the meantime I was able to come with something like this which is very flexible in terms of organizing the stream and resilent to failures (took me a while to cover edge cases...)
It can be used as a call to stream.flatMap(withLock(..., processor)

    public static <T> Function<T, Flux<T>> withLock(
            long tid, String lockPrefix, int lockTimeMs, Function<T, String> keyExtractor, Function<Mono<T>, Flux<T>> processor, RedissonReactiveClient redis) {

        // If Redis lock or unlock operations fail, that will on purpose propagate the error.
        // If processor throws an error, lock will be unlocked first before error propagation.

        // tid has to be unique for each local task, it's a virtual "thread id" so if it's used concurrently locks will not protect the code

        return flux -> {
            Function<T, RLockReactive> getLock = ev -> redis.getLock(lockPrefix + keyExtractor.apply(ev));
            RLockReactive lock = getLock.apply(flux);

            Supplier<Mono<T>> unlock = () -> lock.unlock(tid).then(Mono.<T>empty());
            Supplier<Mono<T>> doLock = () -> lock.lock(lockTimeMs, TimeUnit.MILLISECONDS, tid).then(Mono.<T>empty());

            // Careful not to call map/flatMap on redis.lock/redis.unlock which returns Void and so it won't trigger on empty stream...lol!
            return Flux.concat(
                    Mono.defer(doLock),
                    Flux.defer(() -> processor.apply(Mono.just(flux))
                            .onErrorResume(err -> unlock.get()
                                    .onErrorResume(unlockError -> {
                                        err.addSuppressed(unlockError);
                                        // Propagate original processor error, but note the unlock error as suppressed
                                        return Mono.error(err);
                                    })
                                    .then(Mono.error(err)))),
                    Mono.defer(unlock)
            );
        };
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文