HenryChenV / my-notes

个人学习笔记
7 stars 2 forks source link

synchronized源码分析-轻量级锁 #4

Open HenryChenV opened 3 years ago

HenryChenV commented 3 years ago

synchronized源码分析

  1. 入口
  2. 偏向锁
  3. 轻量级锁
  4. 重量级锁
  5. 结束

轻量级锁数据结构 (stack-based lock)

image

和偏向锁类似,轻量级锁的实现也是基于LockRecord的,在HotSpot中叫stack-based lock,也就是基于栈的锁,很形象。基于栈的这个点在偏向锁中的体验尤为明细,栈结构和它的重入特性息息相关。 它的MarkWord是轻量级锁状态,MarkWord会指向LockRecord,LockRecord中会包含无锁的MarkWord,称为dispalced_header,用于轻量级锁释放时还原MarkWord为无锁状态。 图中有两个轻量级锁的LockRecord,一个是obj1的第一次加锁,另一个是obj1的重入。他俩的LockRecord的dispalced_header是不同的,具体细节在后面会说

加锁 (重入加锁)

入口处,如果obj的类型关闭了偏向锁标志,会在撤销偏向锁后,尝试加一次轻量级锁

...
      CASE(_monitorenter): {
      ...
            else if ((anticipated_bias_locking_value & markOopDesc::biased_lock_mask_in_place) != 0) {
              // 偏向锁标志位不同, 而刚才的mark通过了偏向锁检查, 所以是klass中的偏向锁模式关闭
              // 没法继续使用偏向锁了, 需要撤销当前的偏向锁, 然后升级为轻量级锁或者重量级锁
              // try revoke bias
              // 尝试撤销偏向锁
              // 此时的lockee->klass()->prototype_header()应该是无锁状态的MarkWord
              markOop header = lockee->klass()->prototype_header();
              // 重置MarkWord, 如果有hash, 带上hash
              if (hash != markOopDesc::no_hash) {
                header = header->copy_set_hash(hash);
              }
              if (Atomic::cmpxchg_ptr(header, lockee->mark_addr(), mark) == mark) {
                // 修改成功
                // 这里修改为无锁的MarkWord可能是想提前修改, 后面少几次判断, 算是个优化吧
                if (PrintBiasedLockingStatistics)
                  (*BiasedLocking::revoked_lock_entry_count_addr())++;
              }
              // 后面的代码会继续走轻量级锁或者重量级锁的获取逻辑
    ...
          // traditional lightweight locking
          // 处理非偏向锁的情况
          if (!success) {
            // 准备使用轻量级锁或者重量级锁
            // 用当前的MarkWord构造一个无锁的MarkWord作为DisplacedHeader
            markOop displaced = lockee->mark()->set_unlocked();
            // 保存到LockRecord中, 用于解锁时MarkWord的还原
            entry->lock()->set_displaced_header(displaced);
            bool call_vm = UseHeavyMonitors;
            // 如果参数强制指定使用重量级锁,
            // 或者CAS将MarkWord从无锁状态设置为轻量级锁失败
            if (call_vm || Atomic::cmpxchg_ptr(entry, lockee->mark_addr(), displaced) != displaced) {
              // Is it simple recursive case?
              // 判断是否轻量级锁重入
              if (!call_vm && THREAD->is_lock_owned((address) displaced->clear_lock_bits())) {
                // 轻量级锁重入时, 重入的锁的DisplacedMarkWord为NULL
                entry->lock()->set_displaced_header(NULL);
              } else {
                // 膨胀
                CALL_VM(InterpreterRuntime::monitorenter(THREAD, entry), handle_exception);
              }
            }
          }
...

在后面的 InterpreterRuntime::monitorenter 中不允许偏向锁以及 fast_enter 中加锁失败,也会进轻量级锁获取流程:

IRT_ENTRY_NO_ASYNC(void, InterpreterRuntime::monitorenter(JavaThread* thread, BasicObjectLock* elem))
#ifdef ASSERT
  thread->last_frame().interpreter_frame_verify_monitor(elem);
#endif
  ...
  // 如果允许偏向锁
  if (UseBiasedLocking) {
    ...
  } else {
    // 加轻量级锁
    ObjectSynchronizer::slow_enter(h_obj, elem->lock(), CHECK);
  }
  assert(Universe::heap()->is_in_reserved_or_null(elem->obj()),
         "must be NULL or an object");
#ifdef ASSERT
  thread->last_frame().interpreter_frame_verify_monitor(elem);
#endif
IRT_END
void ObjectSynchronizer::fast_enter(Handle obj, BasicLock* lock, bool attempt_rebias, TRAPS) {
  ...
 if (UseBiasedLocking) {
    ...
 }

 // 很显然到这里, 偏向锁肯定被撤销了
 // 加轻量级锁
 slow_enter (obj, lock, THREAD) ;
}

在进入轻量级锁加锁流程前,如果之前是偏向锁,会先撤销偏向锁,这个逻辑在偏向锁中已经说了。

接下来说下加轻量级锁的主逻辑

// 加轻量级锁
void ObjectSynchronizer::slow_enter(Handle obj, BasicLock* lock, TRAPS) {
  // 获取Mark Word
  markOop mark = obj->mark();
  assert(!mark->has_bias_pattern(), "should not see bias pattern here");

  if (mark->is_neutral()) {
    // 无锁(001)的情况
    // Anticipate successful CAS -- the ST of the displaced mark must
    // be visible <= the ST performed by the CAS.
    // 尝试升级为轻量级锁
    lock->set_displaced_header(mark);
    if (mark == (markOop) Atomic::cmpxchg_ptr(lock, obj()->mark_addr(), mark)) {
      // 升级成功, 结束
      TEVENT (slow_enter: release stacklock) ;
      return ;
    }
    // 如果失败, 说明存在竞争, 膨胀为重量级锁
    // Fall through to inflate() ...
  } else
  if (mark->has_locker() && THREAD->is_lock_owned((address)mark->locker())) {
    // 轻量级锁重入的情况
    assert(lock != mark->locker(), "must not re-lock the same lock");
    assert(lock != (BasicLock*)obj->mark(), "don't relock with same BasicLock");
    // 重入的轻量级锁,LockRecord的displaced_header会被置为NULL
    lock->set_displaced_header(NULL);
    return;
  }

  // The object header will never be displaced to this lock,
  // so it does not matter what the value is, except that it
  // must be non-zero to avoid looking like a re-entrant lock,
  // and must not look locked either.
  // 以上都失败, 轻量级锁膨胀为重量级锁
  //
  // 标记MarkWord为unused_mark(0b011),
  // 其实这里标记为11只是为了设置一个有别于轻量级锁的特殊值
  // 如果没改, 后面没法判断是轻量级锁重入还是轻量级锁膨胀
  lock->set_displaced_header(markOopDesc::unused_mark());
  // 调用重量级锁方法
  ObjectSynchronizer::inflate(THREAD, obj())->enter(THREAD);
}

逻辑很简单,能成功加偏向锁的情况有两种:

  1. 无锁状态
  2. 同一线程重入轻量级锁

如果失败,会膨胀为重量级锁。由于这两部分代码和重量级锁的数据结构关系很大,所以放到重量级锁中分析。

在重入时有个细节需要注意,对于重入的轻量级锁displaced_header为NULL,也就是说,只有第一次进入时的轻量级锁的displaced_header为无锁的MarkWord,为什么?请看解锁流程。

解锁

这部分代码和偏向锁解锁重合度很高,这是因为偏向锁和轻量级锁是基于相同的数据结构实现的,

      CASE(_monitorexit): {
        ...
        // 按enter时相同的顺序遍历LockRecord, 可以达到栈的效果
        while (most_recent != limit ) {
          if ((most_recent)->obj() == lockee) {
            // 找到当前对象持有的LockRecord
            BasicLock* lock = most_recent->lock();
            // 从LockRecord中取出刚开始的header
            markOop header = lock->displaced_header();
            // 解除关联, 偏向锁和重入的轻量级锁只需要这一步就够了
            most_recent->set_obj(NULL);
            if (!lockee->mark()->has_bias_pattern()) {
              // 当前不是偏向锁的情况, 可能之前膨胀了
              bool call_vm = UseHeavyMonitors;
              // If it isn't recursive we either must swap old header or call the runtime
              if (header != NULL || call_vm) {
                // header != NULL说明不是轻量级锁的重入, call_vm==true说明是强制使用重量级锁
                // 对于不是重入的轻量级锁, 从DisplacedMarkWord还原回去就行,
                //  如果失败, 说明存在竞争, 可能在某个地方膨胀了,
                //  需要走InterpreterRuntime::monitorexit
                // 对于call_vm==true, 也就是强制走重量级锁的情况, 直接InterpreterRuntime::monitorexit
                if (call_vm || Atomic::cmpxchg_ptr(header, lockee->mark_addr(), lock) != lock) {
                  ...
                }
              }
            }
            // 只需要找到一个就退出, 对于重入的情况肯定是后获取的锁先释放
            UPDATE_PC_AND_TOS_AND_CONTINUE(1, -1);
          }
          // 下一个LockRecord
          most_recent++;
        }

和偏向锁一样,都会将LockRecord中的obj置为NULL,表示这个LockRecord和obj没关系了,这个很好理解。 对于轻量级锁这样还没完,还会取出LockRecord的dispalced_header判断,如果不为NULL,就会将这个header恢复到MarkWord中。因为对于重入的场景,第一次进入时获取的LockRecord的displaced_header的是无锁的MarkWord,后面进入的都是NULL,所以不为NULL表示这是第一次进入时的LockRecord,需要恢复回去,表示所有的锁都解锁了。这里就严格依赖栈结构,基于栈实现了重入特性。

总结

轻量级锁的场景比偏向锁宽松点,可以允许多个线程交替进入,但是不允许有竞争。从MarkWord状态看就是不断在无锁和轻量级锁之间切换。

login1995 commented 2 years ago

跪了跪了

haluowei commented 1 year ago

对象头中的指针应该是指向 BasicObjecLock