悲观锁的实现方式java,java各种锁机制

  悲观锁的实现方式java,java各种锁机制

  引入aqsaqs(抽象队列同步器)是实现Java并行契约中各种同步组件的基础。例如

  各种锁:ReentrantLock、ReadWriteLock、StampedLock。各种线程同步工具类:CountDownLatch,CyclicBarrier,Semaphore。线程池中WorkerLock接口的实现,基本上是通过聚合AQS的一个子类来完成线程访问控制。

  道格李曾经介绍过AQS的设计初衷。原则上,同步组件通常可以由其他组件实现,比如信号量。但是,倾向于某种同步组件会导致实现逻辑复杂晦涩,所以他选择在AbstractQueuedSynchronizer中抽象出基本的同步相关操作,并使用AQS为我们构建同步组件提供了模型。

  如何使用AQS来实现与AQS的同步组件,我们必须实现至少两个基本方法,即:

  要获取资源,需要实现tryAcquire(int arg)方法来释放资源,以及tryRelease(int arg)方法。如果需要以共享的方式获取/释放资源,需要实现相应的tryaccacquire shared(int arg)和tryreleserved (intarg)。

  AQS使用模板方法设计模式。AQ方法的修饰语是非常规则的。其中protected修改的方法比较抽象,通常需要子类来实现,从而实现不同的同步组件。用public修饰的方法基本上可以视为模板方法,不建议子类直接重写。

  可以通过调用AQS的acquire(int arg)方法来获取资源,这个方法会调用受保护的修改后的tryAcquire(int arg)方法,所以我们需要在AQS的子类中实现tryAcquire(int arg),tryAcquire(int arg)方法的作用就是获取资源。

  当前线程获取资源并执行相应逻辑后,需要释放资源,以便后续节点继续获取资源。您可以通过调用AQS的release(int arg)方法来释放资源,这将调用受保护的修改的tryRelease(int arg)方法。因此,我们需要在AQS的子类中实现tryRelease(int arg)。tryRelease(int arg)方法的作用是释放资源。

  AQS的实现原理从实现的角度分析了AQS是如何完成线程访问控制的。

  AQ的实现原理可以从同步阻塞队列、获取资源时执行流、释放资源时执行流三个方面来介绍。

  同步阻塞队列AQS依靠内部同步阻塞队列(FIFO双向队列)来完成资源管理。

  同步阻塞队列的工作机制:节点:同步阻塞队列中的节点用于保存获取资源失败的线程引用、等待状态、前任和继任节点。获取资源失败的线程将成为加入同步阻塞队列的节点的尾部,同时阻塞当前线程(Java线程处于等待状态,释放CPU使用权)。首节点:同步阻塞队列遵循FIFO(先进先出),首节点为成功获取资源的节点。当第一个节点的线程释放资源时,它会唤醒后续节点再次尝试获取资源,后续节点在成功获取资源后会将自己设置为第一个节点。静态最终类节点{

  /**

  *表示节点正在共享模式下等待的标记

  */

  静态最终AbstractQueuedSynchronizer。节点共享=新AbstractQueuedSynchronizer。node();

  /**

  *表示节点正在独占模式下等待的标记

  */

  静态最终AbstractQueuedSynchronizer。节点EXCLUSIVE=null

  /**

  * waitStatus值表示线程已取消

  */

  静态final int CANCELLED=1;

  /**

  * waitStatus值,用于指示后继线程需要解锁

  */

  静态最终int信号=-1;

  /**

  * waitStatus值表示线程正在等待条件

  */

  静态最终int条件=-2;

  /**

  * waitStatus值指示下一个acquireShared应

  *无条件传播

  */

  静态最终int PROPAGATE=-3;

  //等待状态

  易变int waitStatus

  //前置节点

  易失性AbstractQueuedSynchronizer。节点prev

  //后续节点

  易失性AbstractQueuedSynchronizer。下一个节点;

  /**

  *将此节点排入队列的线程。初始化于

  *施工和使用后作废。

  */

  易变线程Thread;

  //条件等待队列的后续节点

  AbstractQueuedSynchronizer。节点nextWaiter

  /**

  *如果节点在共享模式下等待,则返回true。

  */

  最终布尔值isShared() {

  return nextWaiter==SHARED

  }

  /**

  *返回上一个节点,如果为空,则引发NullPointerException。

  *当前置任务不能为空时使用。空支票可以

  *被省略,但存在以帮助虚拟机。

  *

  * @返回此节点的前任

  */

  最终抽象队列同步器。节点前置任务()引发NullPointerException {

  AbstractQueuedSynchronizer。节点p=prev

  if (p==null)抛出新的NullPointerException();

  否则返回p;

  }

  Node() { //用于建立初始头或共享标记

  }

  节点(线程Thread,AbstractQueuedSynchronizer。节点模式){ //由addWaiter使用

  this.nextWaiter=mode

  this.thread=线程;

  }

  节点(Thread thread,int waitStatus) { //由条件使用

  this . wait status=wait status;

  this.thread=线程;

  }

  }等待状态

  在节点中,volatile int waitStatus属性用于指示节点的等待状态。

  节点有以下等待状态:

  Canceled,值为1,因为在同步阻塞队列中等待的线程已经超时或者被中断,需要从同步阻塞队列中取消等待。当节点进入这种状态时,信号不会改变,值为-1。后继节点的线程处于等待状态,如果当前节点的线程释放同步状态或者被取消,就会通知后继节点,让后继节点的线程运行条件。当值为-2时,节点在条件等待队列中,节点线程正在等待该条件。当其他线程对条件调用signal()方法时,节点将从条件等待队列转移到同步阻塞队列,并加入同步状态获取。值为-3的PROPAGATE表示将无条件传播下一个共享同步状态获取。INITIAL,值为0,初始状态获取和释放资源。

  获取资源时,获取资源失败的线程会被加入同步阻塞队列,在队列中自旋;移出队列(或停止旋转)的条件是前一个节点为头节点,并成功获取资源。在释放资源时,AQS调用tryRelease(int arg)方法释放资源,然后唤醒头节点的后继节点。获取资源下面描述获取资源时的执行过程。

  调用AQS的acquire(int arg)方法来获取资源。

  acquire(int arg)方法是独占获取资源,其调用过程如下图所示。

  用文字描述acquire(int arg)方法的调用流程:首先调用自定义AQS实现的tryAcquire(int arg)方法,用来尝试获取资源:

  如果资源获取成功,它将直接从acquire(int arg)方法返回。如果资源获取不成功,节点将被构造并添加到同步阻塞队列的尾部。最后,将调用acquireQueued(Node node,int arg)方法,使节点尝试以“无限循环”的方式获取资源。只有当前节点的前任节点为头节点,才能尝试获取资源。如果当前节点的前任节点为头节点,且资源获取成功,则将当前节点设置为头节点,从获取的(node node,intarg)方法返回。如果当前节点的前任节点不是头节点或者资源获取失败,当前线程将被阻塞,线程被唤醒后将继续循环操作。被获取的(node node,intarg)方法的调用过程也叫“自旋过程”。

  spin是什么意思?我的理解是:Spin是一个无限循环,执行一定的操作序列,直到满足一定的条件才退出循环。

  /**

  *以独占模式采集,忽略中断。执行

  *至少调用一次{@link #tryAcquire},

  *成功返回。否则,该线程可能被排队

  *反复封锁和解除封锁,调用{@link

  * #tryAcquire}直到成功。这种方法可以使用

  *实现方法{@link Lock#lock}。

  *

  * @param arg获取参数。该值被传送到

  * {@link #tryAcquire}但未被解释

  *可以代表你喜欢的任何东西。

  */

  public final void acquire(int arg){

  如果(!try acquire(arg)acquire queued(addWaiter(Node。独家),arg))

  self interrupt();

  } acquire(intarg)的主要逻辑是:

  首先,调用自定义AQS实现的tryAcquire(int arg)方法,该方法确保线程安全地获取资源:

  如果资源获取成功,它将直接从acquire(int arg)方法返回。如果资源获取不成功,则为同步节点。独占节点,同一时刻只有一个线程可以成功获取资源)会被构造出来,通过addWaiter(Node node)方法将该节点添加到同步阻塞队列的尾部,最后调用acquireQueued(Node node,int arg)方法使该节点处于“无限循环”中,如果没有,则阻塞该节点中的线程,阻塞线程的唤醒主要通过前置节点的出队或阻塞线程的中断来实现。/**

  *在独占的不可中断模式下获取线程

  *排队。由条件等待方法和获取方法使用。

  *

  * @param node该节点

  * @param arg获取参数

  * @return {@code true}如果在等待时被打断

  */

  最终布尔型acquireQueued(最终节点Node,int arg) {

  布尔失败=真;

  尝试{

  布尔中断=假;

  for(;) {

  最终节点p=Node . predecessor();

  if (p==head tryAcquire(arg)) {

  setHead(节点);

  p.next=null//帮助垃圾收集

  失败=假;

  返回中断;

  }

  if(shouldparkaftefailedacquire(p,node)

  parkAndCheckInterrupt())

  中断=真;

  }

  }最后{

  如果(失败)

  cancelAcquire(节点);

  }

  }在Acquired (final node node,int arg)方法中,当前线程在“无限循环”中尝试获取资源,但只有前任节点是头节点才能尝试获取资源。为什么?原因有二,如下。

  第一,头节点是成功获取资源的节点,头节点的线程在释放资源后会唤醒它的继任者。后继节点的线程被唤醒后,需要检查其前任节点是否为头节点。第二,维持同步阻塞队列的FIFO原则。释放资源当前线程获取资源并执行相应逻辑后,需要释放资源,以便后续节点继续获取资源。

  下面描述释放资源时的执行过程。

  您可以通过调用AQS的release(int arg)方法来释放资源。该方法在释放资源后,会唤醒头节点的后继者,然后让后继者再次尝试获取资源。

  /**

  *以独占模式发布。通过解锁一个或

  *如果{@link #tryRelease}返回true,则有更多线程。

  *此方法可用于实现方法{@link Lock#unlock}。

  *

  * @param arg释放参数。该值被传送到

  * {@link #tryRelease}但未被解释

  *可以代表你喜欢的任何东西。

  * @return从{@link #tryRelease}返回的值

  */

  公共最终布尔发布(int arg) {

  if (tryRelease(arg)) {

  节点h=头;

  如果(h!=null h.waitStatus!=0)

  un park successor(h);

  返回true

  }

  返回false

  }执行release (intarg)方法时,会唤醒头节点的后继线程。unparksuccess (nodenode)方法使用LockSupport#unpark()方法唤醒处于等待状态的线程。

  资源的共享获取和释放以上是关于资源的独占获取/释放。

  共享获取和独占获取的主要区别在于多线程是否可以同时获取资源。以一个文件的读写为例。如果一个程序正在读一个文件,那么此时所有对该文件的写操作都被阻塞,读操作可以同时进行。写操作需要独占访问资源,而读操作可以共享访问。

  当共享对资源的访问时,允许其他共享访问,而阻止独占访问。当对资源的独占访问被阻止时,其他访问也同时被阻止。共享资源访问。

  调用AQS的acquireShared(int arg)方法以共享方式获取资源。

  在acquireShared(int arg)方法中,AQS调用tryaccureshared(int arg)方法来尝试获取资源。tryaccureshared(intarg)方法的返回值是int类型,当返回值=0时,表示可以获取资源。

  可以看出,在doacquereshared (intarg)方法的旋转过程中,只有当前节点的前任是头节点,才能尝试获取资源。如果资源获取成功(返回值=0),将当前节点设置为头节点,并退出旋转过程。

  public final void acquire shared(int arg){

  if (tryAcquireShared(arg) 0)

  doAcquireShared(arg);

  }

  private void doAcquireShared(int arg){

  最终节点node=addWaiter(Node。分享);

  布尔失败=真;

  尝试{

  布尔中断=假;

  for(;) {

  最终节点p=Node . predecessor();

  if (p==head) {

  int r=try acquire shared(arg);

  如果(r=0) {

  setHeadAndPropagate(node,r);

  p.next=null//帮助垃圾收集

  如果(中断)

  self interrupt();

  失败=假;

  返回;

  }

  }

  if(shouldparkaftefailedacquire(p,node)

  parkAndCheckInterrupt())

  中断=真;

  }

  }最后{

  如果(失败)

  cancelAcquire(节点);

  }

  }资源的共享释放

  调用releaseShared(int arg)方法来释放资源。这种方法在释放资源后,会唤醒头节点的后继者,然后让后继者再次尝试获取资源。

  对于可以支持多线程同时访问的并发组件(比如信号量),它们和独占的主要区别在于tryreaseshared(int arg)方法必须保证资源的安全释放,因为释放资源的操作会同时来自多个线程。确保资源的安全释放一般是通过循环和CAS来保证的。

  public final boolean release shared(int arg){

  if(tryleaseshared(arg)){

  doreleasshared();

  返回true

  }

  返回false

  }独占超时获取资源调用AQS的doacquerenos (Intarg,Long Nanos Timeout)方法及时获取资源,即在指定时间段内获取资源,如果获取资源成功则返回true,否则返回false。

  这个方法提供了传统Java同步操作(比如synchronized关键字)所不具备的特性。

  在分析这种方法的实现之前,先介绍一下响应中断获取资源的过程。

  在Java 5之前,当一个线程不能获得锁并被阻止同步时,它就被中断了。此时,线程的中断标志位将被修改,但线程仍将被阻塞在synchronized上,等待获得锁。在Java 5中,AQS提供了Acquired interruptible(Intarg)方法,如果当前线程在等待获取资源时被中断,该方法将立即返回并抛出InterruptedException。acquire(int arg)方法对中断不敏感,也就是说,因为线程在获取资源失败后进入同步阻塞队列,所以当线程随后被中断时,线程不会被移出同步阻塞队列。

  超时资源获取过程可以看作是响应中断的资源获取过程的“加强版”,DOACQUIRENOS (Intarg,Long Nanos Timeout)方法在支持中断响应的基础上增加了超时获取的特性。

  对于超时采集,主要需要计算需要睡眠的时间间隔,NanoTimeout。为防止过早通知,NanoTimeout的计算公式为:nano time out-=now-lastTime,其中now为当前唤醒时间,last time为上次唤醒时间。如果NanoTimeout大于0,说明超时没有过期,需要继续休眠NanoTimeout纳秒;否则,意味着它已经超时。

  public final boolean try acquire nanos(int arg,long nanosTimeout)

  引发中断的异常{

  if (Thread.interrupted())

  抛出new interrupted exception();

  return tryAcquire(arg)

  doacquirenos(arg,nanos time out);

  }

  /**

  *在独占定时模式下采集。

  *

  * @param arg获取参数

  * @param nanosTimeout最大等待时间

  * @return {@code true}如果获得

  */

  private boolean doacquirenos(int arg,long nanosTimeout)抛出InterruptedException {

  if(毫微秒超时=0L)

  返回false

  final long deadline=system . nano time()nanos time out;

  最终节点node=addWaiter(Node。独家);

  布尔失败=真;

  尝试{

  for(;) {

  最终节点p=Node . predecessor();

  if (p==head tryAcquire(arg)) {

  setHead(节点);

  p.next=null//帮助垃圾收集

  失败=假;

  返回true

  }

  nanos time out=deadline-system . nano time();

  if(毫微秒超时=0L)

  返回false

  if(shouldparkaftefailedacquire(p,node)

  nanos time out spinForTimeoutThreshold)

  LockSupport.parkNanos(这个,nanos time out);

  if (Thread.interrupted())

  抛出new interrupted exception();

  }

  }最后{

  如果(失败)

  cancelAcquire(节点);

  }

  }在spin过程中,该方法在节点的前任节点为头节点时尝试获取资源,如果成功获取资源,则从该方法返回。这个过程类似于独占同步获取,但在资源获取失败的处理上有所不同。

  如果当前线程无法获取资源,请确定它是否已超时(NanoTimeout小于或等于0,表示它已超时)。如果没有超时,重新计算超时间隔NanoTimeout,然后让当前线程等待NanoTimeout纳秒(当达到设置的超时时,线程将从locksupport.parkNanos(对象拦截器,Long Nanos)方法返回)。

  如果nanosTimeout小于或等于spinForTimeoutThreshold(1000纳秒),线程将不会等待超时,而是进入快速旋转进程。原因是很短的超时不可能很准确,而如果这个时候再等,nanosTimeout的超时就整体不准确了。因此,在超时非常短的情况下,AQS将进入无条件快速旋转。

  独占超时获取资源的过程如下。

  从图中可以看出,独占超时获取资源doacquirenos(Intarg,Long Nanos Timeout)和独占获取资源acquire(int args)在流程上非常相似,主要区别在于未获取资源时的处理逻辑。

  Acquire(int args)会让当前线程在得不到资源时一直等待,而doacquirenos(int arg,long Nanotimeout)会让当前线程一直等待Nanotimeout纳秒。如果当前线程没有在Nanotimeout纳秒内获得资源,它将自动从等待逻辑返回。

  条件技术的实现原理是为解决问题而生的。通过条件,我们可以实现等待/通知功能。

  ConditionObject是AQS的内部类。每个条件对象都包含一个条件等待队列,它是实现条件对象等待/通知功能的关键。

  我们来分析一下条件的实现原理,包括条件等待队列、等待和通知。

  以下条件,除非特别说明,均指ConditionObject。

  条件等待队列条件依赖于内部条件等待队列(FIFO双向队列)来实现等待/通知功能。

  条件等待队列的工作机制:节点:条件等待队列中的每个节点都包含一个线程引用,就是等待条件对象的线程。如果一个线程调用Condition.await()方法,该线程将释放其资源,并被构造为一个节点加入条件等待队列的尾部,同时,该线程的状态将变为等待状态。实际上,条件等待队列中节点的定义重用了AQS节点的定义,也就是说,同步阻塞队列和条件等待队列中节点的类型都是AQS的静态内部类abstractquedsynchronizer.node。

  在对象的监视器模型上,一个对象有一个同步阻塞队列和一个条件等待队列,而包中的锁(更确切地说是AQS)有一个同步阻塞队列和几个条件等待队列。

  下面介绍一下等待线程的执行过程。

  调用条件的await()方法(或者以await开头的方法)会导致当前线程释放资源,成为加入条件等待队列的节点的尾部,线程状态会变为等待状态。

  如果从队列(同步阻塞队列和条件等待队列)的角度来看await()方法,当调用await()方法时,同步阻塞队列的第一个节点(获得锁的节点)移动到条件等待队列。并且同步阻塞队列的第一个节点不会直接加入条件等待队列。相反,当前线程由addConditionWaiter()方法构造成一个新节点,并被添加到条件等待队列中。

  /**

  *实现可中断条件等待。

  * ol

  * li如果当前线程被中断,抛出InterruptedException。

  * li保存{@link #getState}返回的锁定状态。

  * li调用{@link #release},将保存状态作为参数,

  *如果失败,抛出IllegalMonitorStateException。

  * li阻塞,直到发出信号或中断。

  * li通过调用专用版本的

  * {@link #acquire}将保存的状态作为参数。

  * li如果在步骤4中被阻塞时被中断,抛出InterruptedException。

  * /ol

  */

  public final void await()引发InterruptedException {

  if (Thread.interrupted())

  抛出new interrupted exception();

  node node=addConditionWaiter();

  int saved state=fully release(node);

  int interrupt mode=0;

  而(!isOnSyncQueue(节点)){

  LockSupport.park(这个);

  if((interrupt mode=checkInterruptWhileWaiting(node))!=0)

  打破;

  }

  if (acquireQueued(node,savedState) interruptMode!=THROW_IE)

  interruptMode=REINTERRUPT

  if (node.nextWaiter!=null) //如果取消则清除

  unlinkCancelledWaiters();

  if (interruptMode!=0)

  reportInterruptAfterWait(interrupt mode);

  }注意这里有一个唤醒等待线程的执行过程介绍。

  调用Condition的signal()方法将唤醒条件等待队列中等待时间最长的节点(第一个节点)。在唤醒节点之前,当前节点将从条件等待队列移动到同步阻塞队列。

  在等待条件队列中的节点唤醒后,被唤醒的线程试图以“无限循环”的方式获取资源。成功获取资源后,被唤醒的线程将从之前调用的await()方法返回。

  如果被唤醒的线程没有被另一个调用Condition.signal()方法的线程唤醒,而是中断了正在等待的线程,则会引发InterruptedException。

  被唤醒的线程会在await()方法中退出while循环(isOnSyncQueue(Node node)方法返回true,节点已经在同步阻塞队列中),然后调用AQS的acquireQueued()方法,尝试以“无限循环”的方式获取资源。成功获取资源后,被唤醒的线程将从之前调用的await()方法返回。

  Condition的signalAll()方法相当于对条件等待队列中的每个节点执行一次signal()方法,其作用是将条件等待队列中的所有节点移动到同步阻塞队列中,并唤醒每个节点的线程。

  虽然唤醒了各个节点的线程,但是这些线程都需要尝试获取资源,但是只有一个线程能够成功获取资源,然后从await()方法返回;其他未能获取资源的线程将被添加到同步阻塞队列中,并在队列中旋转;移出队列(或停止旋转)的条件是前一个节点为头节点,并成功获取资源。

  /**

  *将等待时间最长的线程(如果存在)从

  *此条件的等待队列为

  *拥有锁。

  *

  * @抛出IllegalMonitorStateException if { @ link # is heldexclusive }

  *返回{@code false}

  */

  公共最终无效信号(){

  如果(!isHeldExclusively())

  抛出新的IllegalMonitorStateException();

  Node first=firstWaiter

  如果(先!=空)

  多希纳尔(第一);

  }

  /**

  *删除并转移节点,直到找到未取消的节点或

  *空。从信号中分离出来,部分是为了鼓励编译器

  *内联无服务员的情况。

  * @param first(非空)条件队列中的第一个节点

  */

  私有空剂量信号(节点优先){

  做{

  if((first waiter=first . next waiter)==null)

  lastWaiter=null

  first.nextWaiter=null

  } while(!transferForSignal(第一个)

  (first=firstWaiter)!=null);

  }版权归作者所有:原创作品来自博主小二上九8,转载请联系作者获得授权,否则将追究法律责任。

郑重声明:本文由网友发布,不代表盛行IT的观点,版权归原作者所有,仅为传播更多信息之目的,如有侵权请联系,我们将第一时间修改或删除,多谢。

留言与评论(共有 条评论)
   
验证码: