synchronized 适配
最新 loom 的 dev mailist 公布了一则消息,关于 VitrualThread 对于 synchronized 适配 的,目前遇到抢 objectMonitor 失败后不再会阻塞载体线程,但是对于 Object::wait 和 Object::的阻塞问题还没有得到解决。
Jvm 层适配
对于 ObjectMonitor 整体的逻辑,抛开避免进入重量级 mutex 的那些优化来讲,其实跟传统的 mutex 实现和 jdk 中常见的 aqs 并没有太多的区别,这里面我们只关注对于“挂起”与“恢复”的适配,类似于 LockSupport::park 和 LockSupport::unpark 的功能。
ObjectMonitor::enter
这里的适配非常的容易寻找,直接去找 LOOM_MONITOR_SUPPORT 这个宏即可,这个宏起效果的前提在于 #if defined(AMD64) || defined (AARCH64)
,目前只支持这两个平台,从某种角度来看足够了。
#ifdef LOOM_MONITOR_SUPPORT
ContinuationEntry* ce = current->last_continuation();
if (ce != nullptr && ce->is_virtual_thread() && current->is_on_monitorenter()) {
//这里扮演了类似于 continuation::yield 的功能,让当前的 continuation 让出了计算资源
int result = Continuation::try_preempt(current, ce->cont_oop(current));
if (result == freeze_ok) {
//在这里进行的 park 的适配
bool acquired = HandlePreemptedVThread(current);
DEBUG_ONLY(int state = java_lang_VirtualThread::state(current->vthread()));
assert((acquired && current->preemption_cancelled() && state == java_lang_VirtualThread::RUNNING) ||
(!acquired && !current->preemption_cancelled() && state == java_lang_VirtualThread::BLOCKING), "invariant");
return true;
}
}
#endif
类似于 aqs 里面 node 的概念,在 objectMonitor 中对应物为 ObjectWaiter,当有线程释放 objectMonitor 时会选择 ObjectWaiter 来唤醒对应的线程抢 ObjectMonitor。
oop vthread = current->vthread();
assert(java_lang_VirtualThread::state(vthread) == java_lang_VirtualThread::RUNNING, "wrong state for vthread");
java_lang_VirtualThread::set_state(vthread, java_lang_VirtualThread::BLOCKING);
ObjectWaiter* node = new ObjectWaiter(vthread);
node->_prev = (ObjectWaiter*) 0xBAD;
node->TState = ObjectWaiter::TS_CXQ;
这里能够看到 ObjectWaiter 的构造器函数传入的参数为当前的 virtualthread 实例,这个重载是这个功能带来的,我们可以仔细看下这个构造器函数和之前的构造器函数有什么区别,parkEvent 你可以理解为一个 condition 的同步原语
ObjectWaiter::ObjectWaiter(JavaThread* current) {
_next = nullptr;
_prev = nullptr;
_notified = 0;
_notifier_tid = 0;
TState = TS_RUN;
_thread = current;
// 这里是新旧两个的最大区别 对应的 parkEvent 不同,新的构造器对应的 parkEvent 是一个全局的 parkEvent 而非是对应线程的 parkEvent
_event = _thread != nullptr ? _thread->_ParkEvent : ObjectMonitor::vthread_unparker_ParkEvent();
_active = false;
assert(_event != nullptr, "invariant");
}
// 新的构造函数
ObjectWaiter::ObjectWaiter(oop vthread) : ObjectWaiter((JavaThread*)nullptr) {
_vthread = OopHandle(JavaThread::thread_oop_storage(), vthread);
}
现在新的 ObjectWaiter 也入队了,continuation 也 yield 了,整体的 enter 就完成了
ObjectMonitor::exit
由于 ObjectMonitor 是可重入的,所以这里我们直接跳过递归部分,直接看完全释放 ObjectMonitor 后的操作,除此之外我再做一个已经有很多 vthread 在等待的假设,这样方便寻找 unpark 的适配
// 寻找到下一个需要被唤醒的节点
_EntryList = w;
ObjectWaiter* q = nullptr;
ObjectWaiter* p;
for (p = w; p != nullptr; p = p->_next) {
guarantee(p->TState == ObjectWaiter::TS_CXQ, "Invariant");
p->TState = ObjectWaiter::TS_ENTER;
p->_prev = q;
q = p;
}
// In 1-0 mode we need: ST EntryList; MEMBAR #storestore; ST _owner = nullptr
// The MEMBAR is satisfied by the release_store() operation in ExitEpilog().
// See if we can abdicate to a spinner instead of waking a thread.
// A primary goal of the implementation is to reduce the
// context-switch rate.
if (_succ != nullptr) continue;
w = _EntryList;
if (w != nullptr) {
guarantee(w->TState == ObjectWaiter::TS_ENTER, "invariant");
//正式的处理在这里
ExitEpilog(current, w);
return;
}
}
对于 ExitEpilog 正式唤醒对应的线程,要做两件事情,第一件是判断是否为 vthread,第二件事情是获取到对应的 ParkEvent
在过去的版本实现中,只需要 ParkEvent 来使用 condition 唤醒对应线程即可,但是这里为了适配 vthread 做了一些看起来并不对称的操作。这里的实现是将 vthread 入队,然后唤醒一个在 vthread 类初始化时开启的线程来执行 submitRunContinuation 方法,让对应调度器来重新执行 Continuation(这部分后面能看到)
void ObjectMonitor::ExitEpilog(JavaThread* current, ObjectWaiter* Wakee) {
assert(owner_raw() == owner_for(current), "invariant");
// Exit protocol:
// 1. ST _succ = wakee
// 2. membar #loadstore|#storestore;
// 2. ST _owner = nullptr
// 3. unpark(wakee)
oop vthread = nullptr;
//在 vthread 的场景下只需要塞入 vthread 反之塞入_thread 这两个值二选一且不会同时出现
if (Wakee->_thread != nullptr) {
// Platform thread case
_succ = Wakee->_thread;
} else {
assert(Wakee->vthread() != nullptr, "invariant");
vthread = Wakee->vthread();
_succ = (JavaThread*)java_lang_Thread::thread_id(vthread);
}
ParkEvent * Trigger = Wakee->_event;
Wakee = nullptr;
release_clear_owner(current);
OrderAccess::fence();
if (vthread == nullptr) {
// Platform thread case
Trigger->unpark();
} else if (java_lang_VirtualThread::set_onWaitingList(vthread, _vthread_cxq_head)) {
//先入队再唤醒对应的线程
//看这里这个 unpark 操作的是全局的那个 ParkEvent
Trigger->unpark();
}
// Maintain stats and report events to JVMTI
OM_PERFDATA_OP(Parks, inc());
}
java_lang_VirtualThread::set_onWaitingList
这里就有点 jvm 层和 java 层交界的味道了,简单来说这里就是一个介于 jvm 和 java 层之间的一个 mpsc 阻塞队列(BlockingQueue\<virtualthread\>),jvm 层面向里面塞入 vthread 对象,java 层面的线程不断 poll 出来 vthread 来 resume</virtualthread\>
bool java_lang_VirtualThread::set_onWaitingList(oop vthread, OopHandle& list_head) {
//这里是指的 VirtualThread 中 onWaitingList 字段来指示是否在队列中
//0 为不在
uint8_t* addr = vthread->field_addr<uint8_t>(_onWaitingList_offset);
uint8_t value = Atomic::load(addr);
assert(value == 0x00 || value == 0x01, "invariant");
if (value == 0x00) {
value = Atomic::cmpxchg(addr, (uint8_t)0x00, (uint8_t)0x01);
if (value == 0x00) {
for (;;) {
//从调用方可得 list_head 来自于一个全局字段_vthread_cxq_head
oop head = list_head.resolve();
//以下两句为头插法入队
java_lang_VirtualThread::set_next(vthread, head);
if (list_head.cmpxchg(head, vthread) == head) return true;
}
}
}
return false; // already on waiting list
}
void java_lang_VirtualThread::set_next(oop vthread, oop next_vthread) {
vthread->obj_field_put(_next_offset, next_vthread);
}
Java 层面适配
在 VirtualThread 中我们能发现在类初始化的时候多了一些代码,而且多了一些字段。整体的逻辑就是通过 takeVirtualThreadListToUnblock
这个函数获取到 vthread 然后根据这个链依次唤醒对应的 vthread
// has the value 1 when on the list of virtual threads waiting to be unblocked
private volatile byte onWaitingList;
// next virtual thread on the list of virtual threads waiting to be unblocked
private volatile VirtualThread next;
private static void unblockVirtualThreads() {
while (true) {
VirtualThread vthread = takeVirtualThreadListToUnblock();
while (vthread != null) {
assert vthread.onWaitingList == 1;
VirtualThread nextThread = vthread.next;
// remove from list and unblock
vthread.next = null;
boolean changed = vthread.compareAndSetOnWaitingList((byte) 1, (byte) 0);
assert changed;
vthread.unblock();
vthread = nextThread;
}
}
}
// takes the list of virtual threads that are waiting to be unblocked, waiting if
// necessary until a list becomes available
private static native VirtualThread takeVirtualThreadListToUnblock();
static {
var unblocker = InnocuousThread.newThread("VirtualThread-unblocker",
VirtualThread::unblockVirtualThreads);
unblocker.setDaemon(true);
unblocker.start();
}
private void unblock() {
assert !Thread.currentThread().isVirtual();
unblocked = true;
if (state() == BLOCKED && compareAndSetState(BLOCKED, UNBLOCKED)) {
unblocked = false;
submitRunContinuation();
}
}
那么具体就要去看看 takeVirtualThreadListToUnblock
这个 native 方法是如何实现的了
通过 VirtualThread.c 中的 Java_java_lang_VirtualThread_registerNatives 可知,其对应的真实函数为 JVM_TakeVirtualThreadListToUnblock。
JVM_ENTRY(jobject, JVM_TakeVirtualThreadListToUnblock(JNIEnv* env, jclass ignored))
//这个就是之前提到的全局的那个 vthread_unparker_ParkEvent,会在 exit 的时候将 vthread 入队之后再唤醒这个
ParkEvent* parkEvent = ObjectMonitor::vthread_unparker_ParkEvent();
assert(parkEvent != nullptr, "not initialized");
//这个也是 exit 时挂载的那个队头元素,就是我们之前提到的那个全局的“阻塞队列”
OopHandle& list_head = ObjectMonitor::vthread_cxq_head();
oop vthread_head = nullptr;
while (true) {
if (list_head.peek() != nullptr) {
for (;;) {
oop head = list_head.resolve();
//相当于把整个链摘下来 并把队头返回给 java 层
if (list_head.cmpxchg(head, nullptr) == head) {
return JNIHandles::make_local(THREAD, head);
}
}
}
ThreadBlockInVM tbivm(THREAD);
parkEvent->park();
}
JVM_END
这样整体的兼容就做完了
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论