farmerjohngit / myblog

有深度的Java技术博客
1.84k stars 287 forks source link

死磕Synchronized底层实现--重量级锁 #15

Open farmerjohngit opened 5 years ago

farmerjohngit commented 5 years ago

本文为死磕Synchronized底层实现第三篇文章,内容为重量级锁实现。

本系列文章将对HotSpot的synchronized锁实现进行全面分析,内容包括偏向锁、轻量级锁、重量级锁的加锁、解锁、锁升级流程的原理及源码分析,希望给在研究synchronized路上的同学一些帮助。主要包括以下几篇文章:

死磕Synchronized底层实现--概论

死磕Synchronized底层实现--偏向锁

死磕Synchronized底层实现--轻量级锁

死磕Synchronized底层实现--重量级锁

更多文章见个人博客:https://github.com/farmerjohngit/myblog

重量级的膨胀和加锁流程

当出现多个线程同时竞争锁时,会进入到synchronizer.cpp#slow_enter方法

void ObjectSynchronizer::slow_enter(Handle obj, BasicLock* lock, TRAPS) {
  markOop mark = obj->mark();
  assert(!mark->has_bias_pattern(), "should not see bias pattern here");
  // 如果是无锁状态
  if (mark->is_neutral()) {
    lock->set_displaced_header(mark);
    if (mark == (markOop) Atomic::cmpxchg_ptr(lock, obj()->mark_addr(), mark)) {
      TEVENT (slow_enter: release stacklock) ;
      return ;
    }
    // Fall through to inflate() ...
  } else
  // 如果是轻量级锁重入
  if (mark->has_locker() && THREAD->is_lock_owned((address)mark->locker())) {
    assert(lock != mark->locker(), "must not re-lock the same lock");
    assert(lock != (BasicLock*)obj->mark(), "don't relock with same BasicLock");
    lock->set_displaced_header(NULL);
    return;
  }

 ...

  // 这时候需要膨胀为重量级锁,膨胀前,设置Displaced Mark Word为一个特殊值,代表该锁正在用一个重量级锁的monitor
  lock->set_displaced_header(markOopDesc::unused_mark());
  //先调用inflate膨胀为重量级锁,该方法返回一个ObjectMonitor对象,然后调用其enter方法
  ObjectSynchronizer::inflate(THREAD, obj())->enter(THREAD);
}

inflate中完成膨胀过程。

ObjectMonitor * ATTR ObjectSynchronizer::inflate (Thread * Self, oop object) {
  ...

  for (;;) {
      const markOop mark = object->mark() ;
      assert (!mark->has_bias_pattern(), "invariant") ;

      // mark是以下状态中的一种:
      // *  Inflated(重量级锁状态)     - 直接返回
      // *  Stack-locked(轻量级锁状态) - 膨胀
      // *  INFLATING(膨胀中)    - 忙等待直到膨胀完成
      // *  Neutral(无锁状态)      - 膨胀
      // *  BIASED(偏向锁)       - 非法状态,在这里不会出现

      // CASE: inflated
      if (mark->has_monitor()) {
          // 已经是重量级锁状态了,直接返回
          ObjectMonitor * inf = mark->monitor() ;
          ...
          return inf ;
      }

      // CASE: inflation in progress
      if (mark == markOopDesc::INFLATING()) {
         // 正在膨胀中,说明另一个线程正在进行锁膨胀,continue重试
         TEVENT (Inflate: spin while INFLATING) ;
         // 在该方法中会进行spin/yield/park等操作完成自旋动作 
         ReadStableMark(object) ;
         continue ;
      }

      if (mark->has_locker()) {
          // 当前轻量级锁状态,先分配一个ObjectMonitor对象,并初始化值
          ObjectMonitor * m = omAlloc (Self) ;

          m->Recycle();
          m->_Responsible  = NULL ;
          m->OwnerIsThread = 0 ;
          m->_recursions   = 0 ;
          m->_SpinDuration = ObjectMonitor::Knob_SpinLimit ;   // Consider: maintain by type/class
          // 将锁对象的mark word设置为INFLATING (0)状态 
          markOop cmp = (markOop) Atomic::cmpxchg_ptr (markOopDesc::INFLATING(), object->mark_addr(), mark) ;
          if (cmp != mark) {
             omRelease (Self, m, true) ;
             continue ;       // Interference -- just retry
          }

          // 栈中的displaced mark word
          markOop dmw = mark->displaced_mark_helper() ;
          assert (dmw->is_neutral(), "invariant") ;

          // 设置monitor的字段
          m->set_header(dmw) ;
          // owner为Lock Record
          m->set_owner(mark->locker());
          m->set_object(object);
          ...
          // 将锁对象头设置为重量级锁状态
          object->release_set_mark(markOopDesc::encode(m));

         ...
          return m ;
      }

      // CASE: neutral

      // 分配以及初始化ObjectMonitor对象
      ObjectMonitor * m = omAlloc (Self) ;
      // prepare m for installation - set monitor to initial state
      m->Recycle();
      m->set_header(mark);
      // owner为NULL
      m->set_owner(NULL);
      m->set_object(object);
      m->OwnerIsThread = 1 ;
      m->_recursions   = 0 ;
      m->_Responsible  = NULL ;
      m->_SpinDuration = ObjectMonitor::Knob_SpinLimit ;       // consider: keep metastats by type/class
      // 用CAS替换对象头的mark word为重量级锁状态
      if (Atomic::cmpxchg_ptr (markOopDesc::encode(m), object->mark_addr(), mark) != mark) {
          // 不成功说明有另外一个线程在执行inflate,释放monitor对象
          m->set_object (NULL) ;
          m->set_owner  (NULL) ;
          m->OwnerIsThread = 0 ;
          m->Recycle() ;
          omRelease (Self, m, true) ;
          m = NULL ;
          continue ;
          // interference - the markword changed - just retry.
          // The state-transitions are one-way, so there's no chance of
          // live-lock -- "Inflated" is an absorbing state.
      }

      ...
      return m ;
  }
}

inflate中是一个for循环,主要是为了处理多线程同时调用inflate的情况。然后会根据锁对象的状态进行不同的处理:

1.已经是重量级状态,说明膨胀已经完成,直接返回

2.如果是轻量级锁则需要进行膨胀操作

3.如果是膨胀中状态,则进行忙等待

4.如果是无锁状态则需要进行膨胀操作

其中轻量级锁和无锁状态需要进行膨胀操作,轻量级锁膨胀流程如下:

1.调用omAlloc分配一个ObjectMonitor对象(以下简称monitor),在omAlloc方法中会先从线程私有的monitor集合omFreeList中分配对象,如果omFreeList中已经没有monitor对象,则从JVM全局的gFreeList中分配一批monitoromFreeList中。

2.初始化monitor对象

3.将状态设置为膨胀中(INFLATING)状态

4.设置monitor的header字段为displaced mark word,owner字段为Lock Record,obj字段为锁对象

5.设置锁对象头的mark word为重量级锁状态,指向第一步分配的monitor对象

无锁状态下的膨胀流程如下:

1.调用omAlloc分配一个ObjectMonitor对象(以下简称monitor)

2.初始化monitor对象

3.设置monitor的header字段为mark word,owner字段为null,obj字段为锁对象

4.设置锁对象头的mark word为重量级锁状态,指向第一步分配的monitor对象

至于为什么轻量级锁需要一个膨胀中(INFLATING)状态,代码中的注释是:

// Why do we CAS a 0 into the mark-word instead of just CASing the
// mark-word from the stack-locked value directly to the new inflated state?
// Consider what happens when a thread unlocks a stack-locked object.
// It attempts to use CAS to swing the displaced header value from the
// on-stack basiclock back into the object header.  Recall also that the
// header value (hashcode, etc) can reside in (a) the object header, or
// (b) a displaced header associated with the stack-lock, or (c) a displaced
// header in an objectMonitor.  The inflate() routine must copy the header
// value from the basiclock on the owner's stack to the objectMonitor, all
// the while preserving the hashCode stability invariants.  If the owner
// decides to release the lock while the value is 0, the unlock will fail
// and control will eventually pass from slow_exit() to inflate.  The owner
// will then spin, waiting for the 0 value to disappear.   Put another way,
// the 0 causes the owner to stall if the owner happens to try to
// drop the lock (restoring the header from the basiclock to the object)
// while inflation is in-progress.  This protocol avoids races that might
// would otherwise permit hashCode values to change or "flicker" for an object.
// Critically, while object->mark is 0 mark->displaced_mark_helper() is stable.
// 0 serves as a "BUSY" inflate-in-progress indicator.

我没太看懂,有知道的同学可以指点下~

膨胀完成之后,会调用enter方法获得锁

void ATTR ObjectMonitor::enter(TRAPS) {

  Thread * const Self = THREAD ;
  void * cur ;
  // owner为null代表无锁状态,如果能CAS设置成功,则当前线程直接获得锁
  cur = Atomic::cmpxchg_ptr (Self, &_owner, NULL) ;
  if (cur == NULL) {
     ...
     return ;
  }
  // 如果是重入的情况
  if (cur == Self) {
     // TODO-FIXME: check for integer overflow!  BUGID 6557169.
     _recursions ++ ;
     return ;
  }
  // 当前线程是之前持有轻量级锁的线程。由轻量级锁膨胀且第一次调用enter方法,那cur是指向Lock Record的指针
  if (Self->is_lock_owned ((address)cur)) {
    assert (_recursions == 0, "internal state error");
    // 重入计数重置为1
    _recursions = 1 ;
    // 设置owner字段为当前线程(之前owner是指向Lock Record的指针)
    _owner = Self ;
    OwnerIsThread = 1 ;
    return ;
  }

  ...

  // 在调用系统的同步操作之前,先尝试自旋获得锁
  if (Knob_SpinEarly && TrySpin (Self) > 0) {
     ...
     //自旋的过程中获得了锁,则直接返回
     Self->_Stalled = 0 ;
     return ;
  }

  ...

  { 
    ...

    for (;;) {
      jt->set_suspend_equivalent();
      // 在该方法中调用系统同步操作
      EnterI (THREAD) ;
      ...
    }
    Self->set_current_pending_monitor(NULL);

  }

  ...

}
  1. 如果当前是无锁状态、锁重入、当前线程是之前持有轻量级锁的线程则进行简单操作后返回。
  2. 先自旋尝试获得锁,这样做的目的是为了减少执行操作系统同步操作带来的开销
  3. 调用EnterI方法获得锁或阻塞

EnterI方法比较长,在看之前,我们先阐述下其大致原理:

一个ObjectMonitor对象包括这么几个关键字段:cxq(下图中的ContentionList),EntryList ,WaitSet,owner。

其中cxq ,EntryList ,WaitSet都是由ObjectWaiter的链表结构,owner指向持有锁的线程。

1517900250327

当一个线程尝试获得锁时,如果该锁已经被占用,则会将该线程封装成一个ObjectWaiter对象插入到cxq的队列的队首,然后调用park函数挂起当前线程。在linux系统上,park函数底层调用的是gclib库的pthread_cond_wait,JDK的ReentrantLock底层也是用该方法挂起线程的。更多细节可以看我之前的两篇文章:关于同步的一点思考-下linux内核级同步机制--futex

当线程释放锁时,会从cxq或EntryList中挑选一个线程唤醒,被选中的线程叫做Heir presumptive即假定继承人(应该是这样翻译),就是图中的Ready Thread,假定继承人被唤醒后会尝试获得锁,但synchronized是非公平的,所以假定继承人不一定能获得锁(这也是它叫"假定"继承人的原因)。

如果线程获得锁后调用Object#wait方法,则会将线程加入到WaitSet中,当被Object#notify唤醒后,会将线程从WaitSet移动到cxq或EntryList中去。需要注意的是,当调用一个锁对象的waitnotify方法时,如当前锁的状态是偏向锁或轻量级锁则会先膨胀成重量级锁

synchronizedmonitor锁机制和JDK的ReentrantLockCondition是很相似的,ReentrantLock也有一个存放等待获取锁线程的链表,Condition也有一个类似WaitSet的集合用来存放调用了await的线程。如果你之前对ReentrantLock有深入了解,那理解起monitor应该是很简单。

回到代码上,开始分析EnterI方法:

void ATTR ObjectMonitor::EnterI (TRAPS) {
    Thread * Self = THREAD ;
    ...
    // 尝试获得锁
    if (TryLock (Self) > 0) {
        ...
        return ;
    }

    DeferredInitialize () ;

    // 自旋
    if (TrySpin (Self) > 0) {
        ...
        return ;
    }

    ...

    // 将线程封装成node节点中
    ObjectWaiter node(Self) ;
    Self->_ParkEvent->reset() ;
    node._prev   = (ObjectWaiter *) 0xBAD ;
    node.TState  = ObjectWaiter::TS_CXQ ;

    // 将node节点插入到_cxq队列的头部,cxq是一个单向链表
    ObjectWaiter * nxt ;
    for (;;) {
        node._next = nxt = _cxq ;
        if (Atomic::cmpxchg_ptr (&node, &_cxq, nxt) == nxt) break ;

        // CAS失败的话 再尝试获得锁,这样可以降低插入到_cxq队列的频率
        if (TryLock (Self) > 0) {
            ...
            return ;
        }
    }

    // SyncFlags默认为0,如果没有其他等待的线程,则将_Responsible设置为自己
    if ((SyncFlags & 16) == 0 && nxt == NULL && _EntryList == NULL) {
        Atomic::cmpxchg_ptr (Self, &_Responsible, NULL) ;
    }

    TEVENT (Inflated enter - Contention) ;
    int nWakeups = 0 ;
    int RecheckInterval = 1 ;

    for (;;) {

        if (TryLock (Self) > 0) break ;
        assert (_owner != Self, "invariant") ;

        ...

        // park self
        if (_Responsible == Self || (SyncFlags & 1)) {
            // 当前线程是_Responsible时,调用的是带时间参数的park
            TEVENT (Inflated enter - park TIMED) ;
            Self->_ParkEvent->park ((jlong) RecheckInterval) ;
            // Increase the RecheckInterval, but clamp the value.
            RecheckInterval *= 8 ;
            if (RecheckInterval > 1000) RecheckInterval = 1000 ;
        } else {
            //否则直接调用park挂起当前线程
            TEVENT (Inflated enter - park UNTIMED) ;
            Self->_ParkEvent->park() ;
        }

        if (TryLock(Self) > 0) break ;

        ...

        if ((Knob_SpinAfterFutile & 1) && TrySpin (Self) > 0) break ;

        ...
        // 在释放锁时,_succ会被设置为EntryList或_cxq中的一个线程
        if (_succ == Self) _succ = NULL ;

        // Invariant: after clearing _succ a thread *must* retry _owner before parking.
        OrderAccess::fence() ;
    }

   // 走到这里说明已经获得锁了

    assert (_owner == Self      , "invariant") ;
    assert (object() != NULL    , "invariant") ;

    // 将当前线程的node从cxq或EntryList中移除
    UnlinkAfterAcquire (Self, &node) ;
    if (_succ == Self) _succ = NULL ;
    if (_Responsible == Self) {
        _Responsible = NULL ;
        OrderAccess::fence();
    }
    ...
    return ;
}

主要步骤有3步:

  1. 将当前线程插入到cxq队列的队首
  2. 然后park当前线程
  3. 当被唤醒后再尝试获得锁

这里需要特别说明的是_Responsible_succ两个字段的作用:

当竞争发生时,选取一个线程作为_Responsible_Responsible线程调用的是有时间限制的park方法,其目的是防止出现搁浅现象。

_succ线程是在线程释放锁是被设置,其含义是Heir presumptive,也就是我们上面说的假定继承人。

重量级锁的释放

重量级锁释放的代码在ObjectMonitor::exit

void ATTR ObjectMonitor::exit(bool not_suspended, TRAPS) {
   Thread * Self = THREAD ;
   // 如果_owner不是当前线程
   if (THREAD != _owner) {
     // 当前线程是之前持有轻量级锁的线程。由轻量级锁膨胀后还没调用过enter方法,_owner会是指向Lock Record的指针。
     if (THREAD->is_lock_owned((address) _owner)) {
       assert (_recursions == 0, "invariant") ;
       _owner = THREAD ;
       _recursions = 0 ;
       OwnerIsThread = 1 ;
     } else {
       // 异常情况:当前不是持有锁的线程
       TEVENT (Exit - Throw IMSX) ;
       assert(false, "Non-balanced monitor enter/exit!");
       if (false) {
          THROW(vmSymbols::java_lang_IllegalMonitorStateException());
       }
       return;
     }
   }
   // 重入计数器还不为0,则计数器-1后返回
   if (_recursions != 0) {
     _recursions--;        // this is simple recursive enter
     TEVENT (Inflated exit - recursive) ;
     return ;
   }

   // _Responsible设置为null
   if ((SyncFlags & 4) == 0) {
      _Responsible = NULL ;
   }

   ...

   for (;;) {
      assert (THREAD == _owner, "invariant") ;

      // Knob_ExitPolicy默认为0
      if (Knob_ExitPolicy == 0) {
         // code 1:先释放锁,这时如果有其他线程进入同步块则能获得锁
         OrderAccess::release_store_ptr (&_owner, NULL) ;   // drop the lock
         OrderAccess::storeload() ;                         // See if we need to wake a successor
         // code 2:如果没有等待的线程或已经有假定继承人
         if ((intptr_t(_EntryList)|intptr_t(_cxq)) == 0 || _succ != NULL) {
            TEVENT (Inflated exit - simple egress) ;
            return ;
         }
         TEVENT (Inflated exit - complex egress) ;

         // code 3:要执行之后的操作需要重新获得锁,即设置_owner为当前线程
         if (Atomic::cmpxchg_ptr (THREAD, &_owner, NULL) != NULL) {
            return ;
         }
         TEVENT (Exit - Reacquired) ;
      } 
      ...

      ObjectWaiter * w = NULL ;
      // code 4:根据QMode的不同会有不同的唤醒策略,默认为0
      int QMode = Knob_QMode ;

      if (QMode == 2 && _cxq != NULL) {
          // QMode == 2 : cxq中的线程有更高优先级,直接唤醒cxq的队首线程
          w = _cxq ;
          assert (w != NULL, "invariant") ;
          assert (w->TState == ObjectWaiter::TS_CXQ, "Invariant") ;
          ExitEpilog (Self, w) ;
          return ;
      }

      if (QMode == 3 && _cxq != NULL) {
          // 将cxq中的元素插入到EntryList的末尾
          w = _cxq ;
          for (;;) {
             assert (w != NULL, "Invariant") ;
             ObjectWaiter * u = (ObjectWaiter *) Atomic::cmpxchg_ptr (NULL, &_cxq, w) ;
             if (u == w) break ;
             w = u ;
          }
          assert (w != NULL              , "invariant") ;

          ObjectWaiter * q = NULL ;
          ObjectWaiter * p ;
          for (p = w ; p != NULL ; p = p->_next) {
              guarantee (p->TState == ObjectWaiter::TS_CXQ, "Invariant") ;
              p->TState = ObjectWaiter::TS_ENTER ;
              p->_prev = q ;
              q = p ;
          }

          // Append the RATs to the EntryList
          // TODO: organize EntryList as a CDLL so we can locate the tail in constant-time.
          ObjectWaiter * Tail ;
          for (Tail = _EntryList ; Tail != NULL && Tail->_next != NULL ; Tail = Tail->_next) ;
          if (Tail == NULL) {
              _EntryList = w ;
          } else {
              Tail->_next = w ;
              w->_prev = Tail ;
          }

          // Fall thru into code that tries to wake a successor from EntryList
      }

      if (QMode == 4 && _cxq != NULL) {
          // 将cxq插入到EntryList的队首
          w = _cxq ;
          for (;;) {
             assert (w != NULL, "Invariant") ;
             ObjectWaiter * u = (ObjectWaiter *) Atomic::cmpxchg_ptr (NULL, &_cxq, w) ;
             if (u == w) break ;
             w = u ;
          }
          assert (w != NULL              , "invariant") ;

          ObjectWaiter * q = NULL ;
          ObjectWaiter * p ;
          for (p = w ; p != NULL ; p = p->_next) {
              guarantee (p->TState == ObjectWaiter::TS_CXQ, "Invariant") ;
              p->TState = ObjectWaiter::TS_ENTER ;
              p->_prev = q ;
              q = p ;
          }

          // Prepend the RATs to the EntryList
          if (_EntryList != NULL) {
              q->_next = _EntryList ;
              _EntryList->_prev = q ;
          }
          _EntryList = w ;

          // Fall thru into code that tries to wake a successor from EntryList
      }

      w = _EntryList  ;
      if (w != NULL) {
          // 如果EntryList不为空,则直接唤醒EntryList的队首元素
          assert (w->TState == ObjectWaiter::TS_ENTER, "invariant") ;
          ExitEpilog (Self, w) ;
          return ;
      }

      // EntryList为null,则处理cxq中的元素
      w = _cxq ;
      if (w == NULL) continue ;

      // 因为之后要将cxq的元素移动到EntryList,所以这里将cxq字段设置为null
      for (;;) {
          assert (w != NULL, "Invariant") ;
          ObjectWaiter * u = (ObjectWaiter *) Atomic::cmpxchg_ptr (NULL, &_cxq, w) ;
          if (u == w) break ;
          w = u ;
      }
      TEVENT (Inflated exit - drain cxq into EntryList) ;

      assert (w != NULL              , "invariant") ;
      assert (_EntryList  == NULL    , "invariant") ;

      if (QMode == 1) {
         // QMode == 1 : 将cxq中的元素转移到EntryList,并反转顺序
         ObjectWaiter * s = NULL ;
         ObjectWaiter * t = w ;
         ObjectWaiter * u = NULL ;
         while (t != NULL) {
             guarantee (t->TState == ObjectWaiter::TS_CXQ, "invariant") ;
             t->TState = ObjectWaiter::TS_ENTER ;
             u = t->_next ;
             t->_prev = u ;
             t->_next = s ;
             s = t;
             t = u ;
         }
         _EntryList  = s ;
         assert (s != NULL, "invariant") ;
      } else {
         // QMode == 0 or QMode == 2‘
         // 将cxq中的元素转移到EntryList
         _EntryList = w ;
         ObjectWaiter * q = NULL ;
         ObjectWaiter * p ;
         for (p = w ; p != NULL ; p = p->_next) {
             guarantee (p->TState == ObjectWaiter::TS_CXQ, "Invariant") ;
             p->TState = ObjectWaiter::TS_ENTER ;
             p->_prev = q ;
             q = p ;
         }
      }

      // _succ不为null,说明已经有个继承人了,所以不需要当前线程去唤醒,减少上下文切换的比率
      if (_succ != NULL) continue;

      w = _EntryList  ;
      // 唤醒EntryList第一个元素
      if (w != NULL) {
          guarantee (w->TState == ObjectWaiter::TS_ENTER, "invariant") ;
          ExitEpilog (Self, w) ;
          return ;
      }
   }
}

在进行必要的锁重入判断以及自旋优化后,进入到主要逻辑:

code 1 设置owner为null,即释放锁,这个时刻其他的线程能获取到锁。这里是一个非公平锁的优化;

code 2 如果当前没有等待的线程则直接返回就好了,因为不需要唤醒其他线程。或者如果说succ不为null,代表当前已经有个"醒着的"继承人线程,那当前线程不需要唤醒任何线程;

code 3 当前线程重新获得锁,因为之后要操作cxq和EntryList队列以及唤醒线程;

code 4根据QMode的不同,会执行不同的唤醒策略;

根据QMode的不同,有不同的处理方式:

  1. QMode = 2且cxq非空:取cxq队列队首的ObjectWaiter对象,调用ExitEpilog方法,该方法会唤醒ObjectWaiter对象的线程,然后立即返回,后面的代码不会执行了;
  2. QMode = 3且cxq非空:把cxq队列插入到EntryList的尾部;
  3. QMode = 4且cxq非空:把cxq队列插入到EntryList的头部;
  4. QMode = 0:暂时什么都不做,继续往下看;

只有QMode=2的时候会提前返回,等于0、3、4的时候都会继续往下执行:

1.如果EntryList的首元素非空,就取出来调用ExitEpilog方法,该方法会唤醒ObjectWaiter对象的线程,然后立即返回; 2.如果EntryList的首元素为空,就将cxq的所有元素放入到EntryList中,然后再从EntryList中取出来队首元素执行ExitEpilog方法,然后立即返回;

以上对QMode的归纳参考了这篇文章。另外说下,关于如何编译JVM,可以看看该博主的这篇文章,该博主弄了一个docker镜像,傻瓜编译~

QMode默认为0,结合上面的流程我们可以看这么个demo:

public class SyncDemo {

    public static void main(String[] args) {

        SyncDemo syncDemo1 = new SyncDemo();
        syncDemo1.startThreadA();
        try {
            Thread.sleep(100);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        syncDemo1.startThreadB();
        try {
            Thread.sleep(100);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        syncDemo1.startThreadC();

    }

    final Object lock = new Object();

    public void startThreadA() {
        new Thread(() -> {
            synchronized (lock) {
                System.out.println("A get lock");
                try {
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("A release lock");
            }
        }, "thread-A").start();
    }

    public void startThreadB() {
        new Thread(() -> {
            synchronized (lock) {
                System.out.println("B get lock");
            }
        }, "thread-B").start();
    }

    public void startThreadC() {
        new Thread(() -> {
            synchronized (lock) {

                System.out.println("C get lock");
            }
        }, "thread-C").start();
    }

}

默认策略下,在A释放锁后一定是C线程先获得锁。因为在获取锁时,是将当前线程插入到cxq的头部,而释放锁时,默认策略是:如果EntryList为空,则将cxq中的元素按原有顺序插入到到EntryList,并唤醒第一个线程。也就是当EntryList为空时,是后来的线程先获取锁。这点JDK中的Lock机制是不一样的。

Synchronized和ReentrantLock的区别

原理弄清楚了,顺便总结了几点Synchronized和ReentrantLock的区别:

  1. Synchronized是JVM层次的锁实现,ReentrantLock是JDK层次的锁实现;
  2. Synchronized的锁状态是无法在代码中直接判断的,但是ReentrantLock可以通过ReentrantLock#isLocked判断;
  3. Synchronized是非公平锁,ReentrantLock是可以是公平也可以是非公平的;
  4. Synchronized是不可以被中断的,而ReentrantLock#lockInterruptibly方法是可以被中断的;
  5. 在发生异常时Synchronized会自动释放锁(由javac编译时自动实现),而ReentrantLock需要开发者在finally块中显示释放锁;
  6. ReentrantLock获取锁的形式有多种:如立即返回是否成功的tryLock(),以及等待指定时长的获取,更加灵活;
  7. Synchronized在特定的情况下对于已经在等待的线程是后来的线程先获得锁(上文有说),而ReentrantLock对于已经在等待的线程一定是先来的线程先获得锁;

End

总的来说Synchronized的重量级锁和ReentrantLock的实现上还是有很多相似的,包括其数据结构、挂起线程方式等等。在日常使用中,如无特殊要求用Synchronized就够了。你深入了解这两者其中一个的实现,了解另外一个或其他锁机制都比较容易,这也是我们常说的技术上的相通性。

pigeonsoar commented 5 years ago

轻量级锁只要有竞争后就膨胀为重量级锁了嘛?好像没有先自旋一定次数失败再膨胀的逻辑,反而是膨胀后再自旋尝试。如果是这样的话,那么假如线程A获取了偏向锁,线程B再访问,然后偏向锁膨胀为轻量级锁,那线程B还是会继续尝试锁,那不就又直接膨胀为重量级锁了嘛,此时的轻量级锁的意义不就没了- -

farmerjohngit commented 5 years ago

@pigeonsoar

那线程B还是会继续尝试锁,那不就又直接膨胀为重量级锁了嘛

B获得轻量级锁后,如没有其他线程获取锁就一直是轻量级锁

May0302 commented 5 years ago

@pigeonsoar

那线程B还是会继续尝试锁,那不就又直接膨胀为重量级锁了嘛

B获得轻量级锁后,如没有其他线程获取锁就一直是轻量级锁

如果是A线程的偏向锁升级到了轻量级锁,A获取的轻量级锁还未解锁状态下,线程B继续尝试获取锁一次就会升级到重量级锁?

May0302 commented 5 years ago

自旋获取轻量级锁的代码是在哪,我怎么没找到。。

Mxhz commented 4 years ago

您好 看了您的文章受益匪浅 这里有一个小问题 重量级锁级锁时三个队列都没有线程,markword是不是会被替换为无锁状态001,我代码测试的结果是会替换为001,然后再次加锁会从轻量级锁开始,这属于锁降级吗?

ghost commented 4 years ago

线程 T2 是轻量锁并且正在执行, T3 执行会尝试获取锁, 那么是由 T3 升级锁, 还是 T2 来升级锁呢?

ghost commented 4 years ago

系列看完了,确实是良心作品!学习了!

coolxbin commented 4 years ago

至于为什么轻量级锁需要一个膨胀中(INFLATING)状态,代码中的注释是:

// Why do we CAS a 0 into the mark-word instead of just CASing the // mark-word from the stack-locked value directly to the new inflated state? ... 我没太看懂,有知道的同学可以指点下~

本人拙见,设置为0主要是为了避免如下两个场景发生:

the 0 causes the owner to stall if the owner happens to try to drop the lock (restoring the header from the basiclock to the object) while inflation is in-progress.

设置为0可以实现,当锁正在膨胀为重量级锁时,延迟拥有锁的线程释放锁。 这里说的锁,就是轻量级锁。 延迟的实现逻辑就是,在释放锁时进入inflate方法,判断到是膨胀中,就进行忙等

This protocol avoids races that might would otherwise permit hashCode values to change or "flicker" for an object.

另一方面,设置为0也避免了获取对象的hash code值与锁膨胀的竞争问题 举例场景:

  1. A线程先进入膨胀阶段,设置/不设置锁对象的mark word
  2. B线程来获取hash code,设置mark word中的hash code位
  3. A膨胀成功,设置mark word 这样就会出现hashCode的不稳定(change or flicker)
coolxbin commented 4 years ago

线程 T2 是轻量锁并且正在执行, T3 执行会尝试获取锁, 那么是由 T3 升级锁, 还是 T2 来升级锁呢?

T3来升级,会将锁的_owner设置为T2,这样就不会影响T2使用锁

coolxbin commented 4 years ago

看完重量级锁的膨胀过程后,一直有个问题没想明白: 如果在膨胀前A线程已经重入了3次轻量级锁,这时B线程来竞争锁,导致锁膨胀。膨胀过程中,会将_owner设置为A线程堆栈中的Lock Record(其实就是锁对象指向的Lock Record地址)。但是,这里没有处理锁重入次数(只是将monitor的__ecursions设置为了0),且在锁释放过程中,也没考虑这个问题。 那么这3次轻量级锁重入是否丢失了呢?

希望博主和一起学习的同志能指导一下

banyueban commented 4 years ago

看完重量级锁的膨胀过程后,一直有个问题没想明白: 如果在膨胀前A线程已经重入了3次轻量级锁,这时B线程来竞争锁,导致锁膨胀。膨胀过程中,会将_owner设置为A线程堆栈中的Lock Record(其实就是锁对象指向的Lock Record地址)。但是,这里没有处理锁重入次数(只是将monitor的__ecursions设置为了0),且在锁释放过程中,也没考虑这个问题。 那么这3次轻量级锁重入是否丢失了呢?

希望博主和一起学习的同志能指导一下

个人拙见:线程在释放重入的2,3次的锁时,只是简单的将lock record中的obj reference置为null,并不关心锁对象的状态,只有当最外层锁(也就是lock record中displace mark word记录着锁对象的mark word)释放时,会回写mark word到锁对象,这个时候失败了才会进入到膨胀环节.此时只有第一次的锁需要释放,所以在膨胀的时候重入计数并不需要处理.

aLibeccio commented 4 years ago

线程 T2 是轻量锁并且正在执行, T3 执行会尝试获取锁, 那么是由 T3 升级锁, 还是 T2 来升级锁呢?

T3来升级,会将锁的_owner设置为T2,这样就不会影响T2使用锁

感谢大佬解答!

Xiuyu1995 commented 4 years ago

问一下博主,JDK1.6之前的重量级锁实现,为什么是重量的呢?因为现在的重量级锁在加锁的时候不会无脑进行系统调用,而是会通过乐观锁判断时候存在竞争,有竞争才会加锁,其原理基本和reentrantlock一样了,何来“重量一说”?难道是1.6以后重量级锁的逻辑也改了?

WangZhen-Ddm commented 4 years ago

问一下博主,JDK1.6之前的重量级锁实现,为什么是重量的呢?因为现在的重量级锁在加锁的时候不会无脑进行系统调用,而是会通过乐观锁判断时候存在竞争,有竞争才会加锁,其原理基本和reentrantlock一样了,何来“重量一说”?难道是1.6以后重量级锁的逻辑也改了?

应该来说重量级锁就是利用了阻塞队列来实现了独占锁

1715277231 commented 4 years ago

博主你好,我发现这个地方在前后两篇文章中描述冲突了。 issues/12:当一个线程尝试获得锁时,如果该锁已经被占用,则会将该线程封装成一个ObjectWaiter对象插入到cxq的队列尾部 issues/15:当一个线程尝试获得锁时,如果该锁已经被占用,则会将该线程封装成一个ObjectWaiter对象插入到cxq的队列的队首

ghost commented 3 years ago

默认策略下,在A释放锁后一定是C线程先获得锁。因为在获取锁时,是将当前线程插入到cxq的头部,而释放锁时,默认策略是:如果EntryList为空,则将cxq中的元素按原有顺序插入到到EntryList,并唤醒第一个线程。也就是当EntryList为空时,是后来的线程先获取锁。这点JDK中的Lock机制是不一样的。

这个是导致synchronized是非公平锁的原因吗?

zhangmuwuge commented 3 years ago

@M0rnar // code 1:先释放锁,这时如果有其他线程进入同步块则能获得锁 OrderAccess::release_store_ptr (&_owner, NULL) ; // drop the lock OrderAccess::storeload() ; // See if we need to wake a successor 你说的算是一个原因,这里也有一个

Balvboy commented 3 years ago

自旋获取轻量级锁的代码是在哪,我怎么没找到。。

轻量级应该是没有自旋,只有一次CAS的机会,如果失败了就会升级。在重量级阶段里,有多处自旋获取重量级锁的逻辑tryLock方法就是。我感觉自旋获取轻量级锁是被其他文章误导了,在看源码之前我也一直认为自旋是获取轻量级锁来着

luffy0223 commented 3 years ago

博主你好,我发现这个地方在前后两篇文章中描述冲突了。 issues/12:当一个线程尝试获得锁时,如果该锁已经被占用,则会将该线程封装成一个ObjectWaiter对象插入到cxq的队列尾部 issues/15:当一个线程尝试获得锁时,如果该锁已经被占用,则会将该线程封装成一个ObjectWaiter对象插入到cxq的队列的队首

看源码如下 将该线程的ObjectWaiter的next指向cxq的头节点(猜的,不懂cpp),所以应该是插入到cxq的队首 // Push "Self" onto the front of the _cxq. // Once on cxq/EntryList, Self stays on-queue until it acquires the lock. // Note that spinning tends to reduce the rate at which threads // enqueue and dequeue on EntryList|cxq. ObjectWaiter * nxt; for (;;) { node._next = nxt = _cxq; ... }

puyongqqq commented 3 years ago

您好,我想请问一下,偏向锁的CAS和轻量级锁的CAS有没有什么不同,V、A、B分别都是什么值。

CuriousRookie commented 3 years ago

@pigeonsoar

那线程B还是会继续尝试锁,那不就又直接膨胀为重量级锁了嘛

B获得轻量级锁后,如没有其他线程获取锁就一直是轻量级锁

如果是A线程的偏向锁升级到了轻量级锁,A获取的轻量级锁还未解锁状态下,线程B继续尝试获取锁一次就会升级到重量级锁?

确实这里好像是这样,只是不知道具体流程是什么。

假设虚拟机启动一段时间后偏向锁模式已开启,线程A最初进入synchronized时完全没有竞争,这时候是偏向锁,但线程A这个同步代码块执行了很长很长的时间。 期间线程B开始进入synchronized,线程B是不是得在一次bytecodeInterpreter.cpp#CASE(_monitorenter)的执行中先撤销偏向锁升级到轻量级锁,然后再膨胀到重量级锁并进入获取锁的等待?

HuangZuShu commented 2 years ago

@pigeonsoar

那线程B还是会继续尝试锁,那不就又直接膨胀为重量级锁了嘛

B获得轻量级锁后,如没有其他线程获取锁就一直是轻量级锁

如果是A线程的偏向锁升级到了轻量级锁,A获取的轻量级锁还未解锁状态下,线程B继续尝试获取锁一次就会升级到重量级锁?

确实这里好像是这样,只是不知道具体流程是什么。

假设虚拟机启动一段时间后偏向锁模式已开启,线程A最初进入synchronized时完全没有竞争,这时候是偏向锁,但线程A这个同步代码块执行了很长很长的时间。 期间线程B开始进入synchronized,线程B是不是得在一次bytecodeInterpreter.cpp#CASE(_monitorenter)的执行中先撤销偏向锁升级到轻量级锁,然后再膨胀到重量级锁并进入获取锁的等待?

我打印了T1此时的锁状态,不明白为什么是重量级锁,既然线程A一直占用着锁,按照偏向锁撤销那一套逻辑,为什么不是轻量级锁呢?不理解output2为什么不是轻量级锁??

public static void main(String[] args) throws InterruptedException {
        Object d = new Object();
        new Thread(()->{
            synchronized (d) {
                System.out.println("output1\t" + ClassLayout.parseInstance(d).toPrintable());
                //ouput 1 : 偏向锁
                try {
                    Thread.sleep(6000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("output2\t" + ClassLayout.parseInstance(d).toPrintable());
                //output2 :重量级锁
            }
        }).start();

        new Thread(()->{
            try {
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            synchronized (d) {
                System.out.println("output3\t" + ClassLayout.parseInstance(d).toPrintable());
                // output3:重量级锁
            }
        }).start();
    }
output1 java.lang.Object object internals:
OFF  SZ   TYPE DESCRIPTION               VALUE
  0   8        (object header: mark)     0x000002be5298b805 (biased: 0x00000000af94a62e; epoch: 0; age: 0)
  8   8        (object header: class)    0x000002be4ff51c00
Instance size: 16 bytes
Space losses: 0 bytes internal + 0 bytes external = 0 bytes total

output2 java.lang.Object object internals:
OFF  SZ   TYPE DESCRIPTION               VALUE
  0   8        (object header: mark)     0x000002be51352f1a (fat lock: 0x000002be51352f1a)
  8   8        (object header: class)    0x000002be4ff51c00
Instance size: 16 bytes
Space losses: 0 bytes internal + 0 bytes external = 0 bytes total

output3 java.lang.Object object internals:
OFF  SZ   TYPE DESCRIPTION               VALUE
  0   8        (object header: mark)     0x000002be51352f1a (fat lock: 0x000002be51352f1a)
  8   8        (object header: class)    0x000002be4ff51c00
Instance size: 16 bytes
Space losses: 0 bytes internal + 0 bytes external = 0 bytes total

Process finished with exit code 0
HuangZuShu commented 2 years ago

@pigeonsoar

那线程B还是会继续尝试锁,那不就又直接膨胀为重量级锁了嘛

B获得轻量级锁后,如没有其他线程获取锁就一直是轻量级锁

如果是A线程的偏向锁升级到了轻量级锁,A获取的轻量级锁还未解锁状态下,线程B继续尝试获取锁一次就会升级到重量级锁?

A未解,但B已经膨胀,所以此时锁状态是怎样子的呢?轻量级锁还是重量级锁呢?

Balvboy commented 2 years ago

您好,我想请问一下,偏向锁的CAS和轻量级锁的CAS有没有什么不同,V、A、B分别都是什么值。

@pigeonsoar

那线程B还是会继续尝试锁,那不就又直接膨胀为重量级锁了嘛

B获得轻量级锁后,如没有其他线程获取锁就一直是轻量级锁

如果是A线程的偏向锁升级到了轻量级锁,A获取的轻量级锁还未解锁状态下,线程B继续尝试获取锁一次就会升级到重量级锁?

A未解,但B已经膨胀,所以此时锁状态是怎样子的呢?轻量级锁还是重量级锁呢?

此时锁已经升级成重量级了,A会变成锁的持有者,B在尝试获取几次后,会被加入到队列中(EntryList还是cxq记不太清了),等待A释放后B会继续尝试获取

lixinyu5183 commented 2 years ago

搁浅现象是啥,谁能科普一下

trireg commented 2 years ago

看了很多文章,关于偏向锁、轻量级锁和重量级锁,偏向锁基于该锁持有者长时间为单一线程,而轻量级锁则是多个线程交替使用,重量级则发生争夺,但说锁升级的时候都是偏向锁->轻量级,那如果再持有轻量级锁的过程中,另一个线程来争抢锁,这个锁是直接升级为重量级锁还是先升级为轻量级锁

598572 commented 1 year ago

图片连接失效了

DoctorDeng commented 1 year ago

搁浅现象是啥,谁能科普一下

搁浅(stranding)现象: monitor 已解锁,但所有争用线程仍处于休眠状态。此时如果没有新的线程来获取锁,已解锁的 monitor 永远无法被正在休眠的线程获取。为了避免这种问题,会选择一个线程作为负责线程(responsible thread),该线程通过调用 time park() 方法,让自己休眠固定时间后被唤醒,随后该线程会检查潜在搁浅问题并从中恢复

caimofei commented 1 year ago

锁膨胀过程中。若当前是轻量级锁。创建ObjectMonitor对象后,为什么可以简单地将其_recursions设置为0?如果是锁膨胀时,轻量级锁存在重入的情况怎么办?

DoctorDeng commented 1 year ago

。若当前是轻量级锁。创建ObjectMonitor对象后,为什么可以简单地将其_recursions设置为0?如果是锁膨胀时

轻量级锁升级为重量级锁,初始状态当然得为 0,因为没有发生重入。膨胀过程中,持有锁的线程重入时,会检测到锁在膨胀过程,不能走原有的轻量级锁重入加锁逻辑,会和其他线程一样其他线程一样进行等待(以自旋方式等待),直到膨胀完成,膨胀完成后,持有锁的线程走的就是正常的重量级锁重入逻辑了。

caimofei commented 1 year ago

。若当前是轻量级锁。创建ObjectMonitor对象后,为什么可以简单地将其_recursions设置为0?如果是锁膨胀时

轻量级锁升级为重量级锁,初始状态当然得为 0,因为没有发生重入。膨胀过程中,持有锁的线程重入时,会检测到锁在膨胀过程,不能走原有的轻量级锁重入加锁逻辑,会和其他线程一样其他线程一样进行等待(以自旋方式等待),直到膨胀完成,膨胀完成后,持有锁的线程走的就是正常的重量级锁重入逻辑了。

为什么不会存在锁重入的情况呢?分析以下情况: 1.当前obj是轻量级锁状态。线程A成功拿到轻量级锁,并且正在执行code1处的业务逻辑。 2.此时,线程B也执行到process方法。发生锁竞争,这时,线程B把obj膨胀为重量级锁(我分析锁膨胀的逻辑,是任何线程都可以负责锁膨胀的操作的,只要它能够把锁对象的markword设置为0,就是由它负责膨胀)。 3.线程B完成锁膨胀,把ObjectMonitor._recursions设置0。这难道没问题吗?如果这样,线程A执行完code1并退出内部的synchronized后,不是就会把锁释放了吗?

    public static Object obj = new Object();
    public static void process() {
        synchronized (obj) {
            synchronized (obj) {
                //code 1.业务处理过程1。
            }
               //code 2.业务处理过程2
        }
    }
DoctorDeng commented 1 year ago

。若当前是轻量级锁。创建ObjectMonitor对象后,为什么可以简单地将其_recursions设置为0?如果是锁膨胀时

轻量级锁升级为重量级锁,初始状态当然得为 0,因为没有发生重入。膨胀过程中,持有锁的线程重入时,会检测到锁在膨胀过程,不能走原有的轻量级锁重入加锁逻辑,会和其他线程一样其他线程一样进行等待(以自旋方式等待),直到膨胀完成,膨胀完成后,持有锁的线程走的就是正常的重量级锁重入逻辑了。

为什么不会存在锁重入的情况呢?分析以下情况: 1.当前obj是轻量级锁状态。线程A成功拿到轻量级锁,并且正在执行code1处的业务逻辑。 2.此时,线程B也执行到process方法。发生锁竞争,这时,线程B把obj膨胀为重量级锁(我分析锁膨胀的逻辑,是任何线程都可以负责锁膨胀的操作的,只要它能够把锁对象的markword设置为0,就是由它负责膨胀)。 3.线程B完成锁膨胀,把ObjectMonitor._recursions设置0。这难道没问题吗?如果这样,线程A执行完code1并退出内部的synchronized后,不是就会把锁释放了吗?

    public static Object obj = new Object();
    public static void process() {
        synchronized (obj) {
            synchronized (obj) {
                //code 1.业务处理过程1。
            }
               //code 2.业务处理过程2
        }
    }

轻量级锁释放时, 会判断 Lock Record 记录的 mark word 值是否为 0(即 NULL)

轻量级锁在重入多次后,发生了锁膨胀,此时当轻量级锁释放时,和正常的轻量级锁释放时流程一样,只不过在 Lock Record 记录的 mark word 为 0 的情况下会增加如下诊断:

对于轻量级锁膨胀后,之前的重入释放操作和之前一样,不会走 ObjectMonitor exit 操作,因此在轻量级锁多次重入,然后被膨胀时,ObjectMonitor 的重入计数没必要记录之前的重入次数,使用初始值 0 即可。

caimofei commented 1 year ago

。若当前是轻量级锁。创建ObjectMonitor对象后,为什么可以简单地将其_recursions设置为0?如果是锁膨胀时

轻量级锁升级为重量级锁,初始状态当然得为 0,因为没有发生重入。膨胀过程中,持有锁的线程重入时,会检测到锁在膨胀过程,不能走原有的轻量级锁重入加锁逻辑,会和其他线程一样其他线程一样进行等待(以自旋方式等待),直到膨胀完成,膨胀完成后,持有锁的线程走的就是正常的重量级锁重入逻辑了。

为什么不会存在锁重入的情况呢?分析以下情况: 1.当前obj是轻量级锁状态。线程A成功拿到轻量级锁,并且正在执行code1处的业务逻辑。 2.此时,线程B也执行到process方法。发生锁竞争,这时,线程B把obj膨胀为重量级锁(我分析锁膨胀的逻辑,是任何线程都可以负责锁膨胀的操作的,只要它能够把锁对象的markword设置为0,就是由它负责膨胀)。 3.线程B完成锁膨胀,把ObjectMonitor._recursions设置0。这难道没问题吗?如果这样,线程A执行完code1并退出内部的synchronized后,不是就会把锁释放了吗?

    public static Object obj = new Object();
    public static void process() {
        synchronized (obj) {
            synchronized (obj) {
                //code 1.业务处理过程1。
            }
               //code 2.业务处理过程2
        }
    }

轻量级锁释放时, 会判断 Lock Record 记录的 mark word 值是否为 0(即 NULL)

  • 如果为 0 则表示是重入的释放,不做任何处理.
  • 如果不是进行真正的释放操作,通过 CAS 更新锁对象 mark word 为 Lock Record 中记录的值, 如果失败则表示期间锁发生了膨胀,此时会走锁膨胀,然后释放膨胀后的重量级锁(ObjectMonitor)的逻辑

轻量级锁在重入多次后,发生了锁膨胀,此时当轻量级锁释放时,和正常的轻量级锁释放时流程一样,只不过在 Lock Record 记录的 mark word 为 0 的情况下会增加如下诊断:

  • ObjectMonitor 关联的锁对象的 mark word 必须与 exit 对应的锁对象相等,不是抛出异常.
  • ObjectMonitor 是否当前线程获取,不是抛出异常.

对于轻量级锁膨胀后,之前的重入释放操作和之前一样,不会走 ObjectMonitor exit 操作,因此在轻量级锁多次重入,然后被膨胀时,ObjectMonitor 的重入计数没必要记录之前的重入次数,使用初始值 0 即可。


明白了