synchronized 适配

发布于 2024-09-10 14:53:04 字数 8949 浏览 16 评论 0

最新 loom 的 dev mailist 公布了一则消息,关于 VitrualThread 对于 synchronized 适配 的,目前遇到抢 objectMonitor 失败后不再会阻塞载体线程,但是对于 Object::wait 和 Object::的阻塞问题还没有得到解决。

Jvm 层适配

对于 ObjectMonitor 整体的逻辑,抛开避免进入重量级 mutex 的那些优化来讲,其实跟传统的 mutex 实现和 jdk 中常见的 aqs 并没有太多的区别,这里面我们只关注对于“挂起”与“恢复”的适配,类似于 LockSupport::park 和 LockSupport::unpark 的功能。

ObjectMonitor::enter

这里的适配非常的容易寻找,直接去找 LOOM_MONITOR_SUPPORT 这个宏即可,这个宏起效果的前提在于 #if defined(AMD64) || defined (AARCH64) ,目前只支持这两个平台,从某种角度来看足够了。

#ifdef LOOM_MONITOR_SUPPORT
  ContinuationEntry* ce = current->last_continuation();
  if (ce != nullptr && ce->is_virtual_thread() && current->is_on_monitorenter()) {
    //这里扮演了类似于 continuation::yield 的功能,让当前的 continuation 让出了计算资源
    int result = Continuation::try_preempt(current, ce->cont_oop(current));
    if (result == freeze_ok) {
      //在这里进行的 park 的适配
    bool acquired = HandlePreemptedVThread(current);
    DEBUG_ONLY(int state = java_lang_VirtualThread::state(current->vthread()));
    assert((acquired && current->preemption_cancelled() && state == java_lang_VirtualThread::RUNNING) ||
         (!acquired && !current->preemption_cancelled() && state == java_lang_VirtualThread::BLOCKING), "invariant");
    return true;
    }
  }
#endif

类似于 aqs 里面 node 的概念,在 objectMonitor 中对应物为 ObjectWaiter,当有线程释放 objectMonitor 时会选择 ObjectWaiter 来唤醒对应的线程抢 ObjectMonitor。

oop vthread = current->vthread();
assert(java_lang_VirtualThread::state(vthread) == java_lang_VirtualThread::RUNNING, "wrong state for vthread");
java_lang_VirtualThread::set_state(vthread, java_lang_VirtualThread::BLOCKING);

ObjectWaiter* node = new ObjectWaiter(vthread);
node->_prev   = (ObjectWaiter*) 0xBAD;
node->TState  = ObjectWaiter::TS_CXQ;

这里能够看到 ObjectWaiter 的构造器函数传入的参数为当前的 virtualthread 实例,这个重载是这个功能带来的,我们可以仔细看下这个构造器函数和之前的构造器函数有什么区别,parkEvent 你可以理解为一个 condition 的同步原语

ObjectWaiter::ObjectWaiter(JavaThread* current) {
  _next   = nullptr;
  _prev   = nullptr;
  _notified = 0;
  _notifier_tid = 0;
  TState  = TS_RUN;
  _thread   = current;
// 这里是新旧两个的最大区别  对应的 parkEvent 不同,新的构造器对应的 parkEvent 是一个全局的 parkEvent 而非是对应线程的 parkEvent
  _event  = _thread != nullptr ? _thread->_ParkEvent : ObjectMonitor::vthread_unparker_ParkEvent();
  _active   = false;
  assert(_event != nullptr, "invariant");
}
// 新的构造函数
ObjectWaiter::ObjectWaiter(oop vthread) : ObjectWaiter((JavaThread*)nullptr) {
  _vthread = OopHandle(JavaThread::thread_oop_storage(), vthread);
}

现在新的 ObjectWaiter 也入队了,continuation 也 yield 了,整体的 enter 就完成了

ObjectMonitor::exit

由于 ObjectMonitor 是可重入的,所以这里我们直接跳过递归部分,直接看完全释放 ObjectMonitor 后的操作,除此之外我再做一个已经有很多 vthread 在等待的假设,这样方便寻找 unpark 的适配

// 寻找到下一个需要被唤醒的节点
_EntryList = w;
ObjectWaiter* q = nullptr;
ObjectWaiter* p;
  for (p = w; p != nullptr; p = p->_next) {
    guarantee(p->TState == ObjectWaiter::TS_CXQ, "Invariant");
    p->TState = ObjectWaiter::TS_ENTER;
    p->_prev = q;
    q = p;
  }

  // In 1-0 mode we need: ST EntryList; MEMBAR #storestore; ST _owner = nullptr
  // The MEMBAR is satisfied by the release_store() operation in ExitEpilog().

  // See if we can abdicate to a spinner instead of waking a thread.
  // A primary goal of the implementation is to reduce the
  // context-switch rate.
  if (_succ != nullptr) continue;

  w = _EntryList;
  if (w != nullptr) {
    guarantee(w->TState == ObjectWaiter::TS_ENTER, "invariant");
    //正式的处理在这里
    ExitEpilog(current, w);
    return;
  }
}

对于 ExitEpilog 正式唤醒对应的线程,要做两件事情,第一件是判断是否为 vthread,第二件事情是获取到对应的 ParkEvent

在过去的版本实现中,只需要 ParkEvent 来使用 condition 唤醒对应线程即可,但是这里为了适配 vthread 做了一些看起来并不对称的操作。这里的实现是将 vthread 入队,然后唤醒一个在 vthread 类初始化时开启的线程来执行 submitRunContinuation 方法,让对应调度器来重新执行 Continuation(这部分后面能看到)

void ObjectMonitor::ExitEpilog(JavaThread* current, ObjectWaiter* Wakee) {
  assert(owner_raw() == owner_for(current), "invariant");

  // Exit protocol:
  // 1. ST _succ = wakee
  // 2. membar #loadstore|#storestore;
  // 2. ST _owner = nullptr
  // 3. unpark(wakee)

  oop vthread = nullptr;
  //在 vthread 的场景下只需要塞入 vthread 反之塞入_thread 这两个值二选一且不会同时出现
  if (Wakee->_thread != nullptr) {
  // Platform thread case
  _succ = Wakee->_thread;
  } else {
  assert(Wakee->vthread() != nullptr, "invariant");
  vthread = Wakee->vthread();
  _succ = (JavaThread*)java_lang_Thread::thread_id(vthread);
  }
  ParkEvent * Trigger = Wakee->_event;
  Wakee  = nullptr;
  release_clear_owner(current);
  OrderAccess::fence();

  if (vthread == nullptr) {
  // Platform thread case
  Trigger->unpark();
  } else if (java_lang_VirtualThread::set_onWaitingList(vthread, _vthread_cxq_head)) {
    //先入队再唤醒对应的线程
    //看这里这个 unpark 操作的是全局的那个 ParkEvent
  Trigger->unpark();
  }
  // Maintain stats and report events to JVMTI
  OM_PERFDATA_OP(Parks, inc());
}

java_lang_VirtualThread::set_onWaitingList

这里就有点 jvm 层和 java 层交界的味道了,简单来说这里就是一个介于 jvm 和 java 层之间的一个 mpsc 阻塞队列(BlockingQueue\<virtualthread\>),jvm 层面向里面塞入 vthread 对象,java 层面的线程不断 poll 出来 vthread 来 resume</virtualthread\>

bool java_lang_VirtualThread::set_onWaitingList(oop vthread, OopHandle& list_head) {
  //这里是指的 VirtualThread 中 onWaitingList 字段来指示是否在队列中
  //0 为不在
  uint8_t* addr = vthread->field_addr<uint8_t>(_onWaitingList_offset);
  uint8_t value = Atomic::load(addr);
  assert(value == 0x00 || value == 0x01, "invariant");
  if (value == 0x00) {
  value = Atomic::cmpxchg(addr, (uint8_t)0x00, (uint8_t)0x01);
  if (value == 0x00) {
    for (;;) {
      //从调用方可得 list_head 来自于一个全局字段_vthread_cxq_head
    oop head = list_head.resolve();
      //以下两句为头插法入队
    java_lang_VirtualThread::set_next(vthread, head);
    if (list_head.cmpxchg(head, vthread) == head) return true;
    }
  }
  }
  return false; // already on waiting list
}

void java_lang_VirtualThread::set_next(oop vthread, oop next_vthread) {
  vthread->obj_field_put(_next_offset, next_vthread);
}

Java 层面适配

在 VirtualThread 中我们能发现在类初始化的时候多了一些代码,而且多了一些字段。整体的逻辑就是通过 takeVirtualThreadListToUnblock 这个函数获取到 vthread 然后根据这个链依次唤醒对应的 vthread

  // has the value 1 when on the list of virtual threads waiting to be unblocked
  private volatile byte onWaitingList;

  // next virtual thread on the list of virtual threads waiting to be unblocked
  private volatile VirtualThread next;

private static void unblockVirtualThreads() {
  while (true) {
    VirtualThread vthread = takeVirtualThreadListToUnblock();
    while (vthread != null) {
      assert vthread.onWaitingList == 1;
      VirtualThread nextThread = vthread.next;

      // remove from list and unblock
      vthread.next = null;
      boolean changed = vthread.compareAndSetOnWaitingList((byte) 1, (byte) 0);
      assert changed;
      vthread.unblock();

      vthread = nextThread;
    }
  }
}

// takes the list of virtual threads that are waiting to be unblocked, waiting if
// necessary until a list becomes available
private static native VirtualThread takeVirtualThreadListToUnblock();

static {
  var unblocker = InnocuousThread.newThread("VirtualThread-unblocker",
      VirtualThread::unblockVirtualThreads);
  unblocker.setDaemon(true);
  unblocker.start();
}

 private void unblock() {
    assert !Thread.currentThread().isVirtual();
    unblocked = true;
    if (state() == BLOCKED && compareAndSetState(BLOCKED, UNBLOCKED)) {
      unblocked = false;
      submitRunContinuation();
    }
  }

那么具体就要去看看 takeVirtualThreadListToUnblock 这个 native 方法是如何实现的了

通过 VirtualThread.c 中的 Java_java_lang_VirtualThread_registerNatives 可知,其对应的真实函数为 JVM_TakeVirtualThreadListToUnblock。

JVM_ENTRY(jobject, JVM_TakeVirtualThreadListToUnblock(JNIEnv* env, jclass ignored))
  //这个就是之前提到的全局的那个 vthread_unparker_ParkEvent,会在 exit 的时候将 vthread 入队之后再唤醒这个
  ParkEvent* parkEvent = ObjectMonitor::vthread_unparker_ParkEvent();
  assert(parkEvent != nullptr, "not initialized");
//这个也是 exit 时挂载的那个队头元素,就是我们之前提到的那个全局的“阻塞队列”
  OopHandle& list_head = ObjectMonitor::vthread_cxq_head();
  oop vthread_head = nullptr;
  while (true) {
  if (list_head.peek() != nullptr) {
    for (;;) {
    oop head = list_head.resolve();
      //相当于把整个链摘下来 并把队头返回给 java 层
    if (list_head.cmpxchg(head, nullptr) == head) {  
      return JNIHandles::make_local(THREAD, head);
    }
    }
  }
  ThreadBlockInVM tbivm(THREAD);
  parkEvent->park();
  }
JVM_END

这样整体的兼容就做完了

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

上一篇:

下一篇:

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据

关于作者

文章
评论
26 人气
更多

推荐作者

alipaysp_qCPZes5aGh

文章 0 评论 0

BeginEnd

文章 0 评论 0

温柔一刀

文章 0 评论 0

qq_eW9dqv

文章 0 评论 0

cz003

文章 0 评论 0

mb_y5iXe1gw

文章 0 评论 0

    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
    原文