Java 响应式的锁
由于响应式技术栈基本都是基于 eventloop 的,而对于当前(2022 年 5 月 18 日)java 的并发模型都是基于内核线程进行建模的,多个请求并发跑在同一个线程上导致了 synchronized 和 J.U.C 包的锁不再适用
而并发中锁又是必须的,那么我们该怎么办?
kotlin 协程锁
以当前 eventloop 作为调度器驱动多个无栈协程,为每一个请求分配一个协程即可
本质上这种并不会阻塞住当前的内核线程,而是”阻塞“当前协程,实际上就是当前协程通过 cas 抢到锁之后来处理自己的业务,此时其他抢不到的阻塞等待调度,锁持有者协程放锁时恢复等待的协程执行
实际原理很简单:cas 看看能不能拿到 不能拿到把当前的 continuation 封装到 waiter 里面 等待 resume
而 unlock 方法则是取出第一个等待的 waiter 节点,然后恢复其中的 continuation 执行就行了
vertx 的异步锁
实际上就是通过 ConcurrentMap#compute 的原子特性直接加入等待队列
若发现当前队列没有竞争者则直接获取到锁,触发 future 回调
而释放锁的 release 则是这样的
@Override
public void release() {
if (invoked.compareAndSet(false, true)) {
nextWaiter(lockName);
}
}
private void nextWaiter(String lockName) {
List<LockWaiter> waiters = waitersMap.compute(lockName, (s, list) -> {
return list == null || list.size() == 1 ? null : new ArrayList<>(list.subList(1, list.size()));
});
if (waiters != null) {
waiters.get(0).acquireLock();
}
}
直接让下一个等待者获取到锁即可
java 的 loom
如果我还是想用 juc 这种 lock 怎么办?
那么我们就可以使用 project loom(jdk19 将 preview),将当前的 eventloop 或者其他的线程池作为有栈协程的调度器,由于 loom 带来的 virtualthread(协程)是 Thread 的子类,且 LockSupport 类也对其有适配,阻塞虚拟线程而不是内核线程
val router = Router.router(vertx)
val lock = ReentrantLock()
router.get()
.handler{
Thread.startVirtualThread {
lock.lock()
try {
//做你要的处理
}finally {
lock.unlock()
}
}
}
然后我们来看看 LockSupport#park 方法 相当于从当前的内核线程上卸载这个协程 然后换一个协程来执行
public static void park(Object blocker) {
Thread t = Thread.currentThread();
setBlocker(t, blocker);
try {
if (t.isVirtual()) {
VirtualThreads.park();
} else {
U.park(false, 0L);
}
} finally {
setBlocker(t, null);
}
}
private void doPark() {
// complete immediately if parking permit available or interrupted
if (getAndSetParkPermit(false) || interrupted)
return;
// park the thread
setState(PARKING);
try {
if (!yieldContinuation()) {
// park on the carrier thread when pinned
parkOnCarrierThread(false, 0);
}
} finally {
assert (Thread.currentThread() == this) && (state() == RUNNING);
}
}
private boolean yieldContinuation() {
boolean notifyJvmti = notifyJvmtiEvents;
// unmount
if (notifyJvmti) notifyJvmtiUnmountBegin(false);
unmount();
try {
return Continuation.yield(VTHREAD_SCOPE);
} finally {
// re-mount
mount();
if (notifyJvmti) notifyJvmtiMountEnd(false);
}
}
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
上一篇: 为什么 Jvm 需要有栈协程
下一篇: Jvm 常量池
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论