悲观锁的实现方式java,java各种锁机制
引入aqsaqs(抽象队列同步器)是实现Java并行契约中各种同步组件的基础。例如
各种锁:ReentrantLock、ReadWriteLock、StampedLock。各种线程同步工具类:CountDownLatch,CyclicBarrier,Semaphore。线程池中WorkerLock接口的实现,基本上是通过聚合AQS的一个子类来完成线程访问控制。
道格李曾经介绍过AQS的设计初衷。原则上,同步组件通常可以由其他组件实现,比如信号量。但是,倾向于某种同步组件会导致实现逻辑复杂晦涩,所以他选择在AbstractQueuedSynchronizer中抽象出基本的同步相关操作,并使用AQS为我们构建同步组件提供了模型。
如何使用AQS来实现与AQS的同步组件,我们必须实现至少两个基本方法,即:
要获取资源,需要实现tryAcquire(int arg)方法来释放资源,以及tryRelease(int arg)方法。如果需要以共享的方式获取/释放资源,需要实现相应的tryaccacquire shared(int arg)和tryreleserved (intarg)。
AQS使用模板方法设计模式。AQ方法的修饰语是非常规则的。其中protected修改的方法比较抽象,通常需要子类来实现,从而实现不同的同步组件。用public修饰的方法基本上可以视为模板方法,不建议子类直接重写。
可以通过调用AQS的acquire(int arg)方法来获取资源,这个方法会调用受保护的修改后的tryAcquire(int arg)方法,所以我们需要在AQS的子类中实现tryAcquire(int arg),tryAcquire(int arg)方法的作用就是获取资源。
当前线程获取资源并执行相应逻辑后,需要释放资源,以便后续节点继续获取资源。您可以通过调用AQS的release(int arg)方法来释放资源,这将调用受保护的修改的tryRelease(int arg)方法。因此,我们需要在AQS的子类中实现tryRelease(int arg)。tryRelease(int arg)方法的作用是释放资源。
AQS的实现原理从实现的角度分析了AQS是如何完成线程访问控制的。
AQ的实现原理可以从同步阻塞队列、获取资源时执行流、释放资源时执行流三个方面来介绍。
同步阻塞队列AQS依靠内部同步阻塞队列(FIFO双向队列)来完成资源管理。
同步阻塞队列的工作机制:节点:同步阻塞队列中的节点用于保存获取资源失败的线程引用、等待状态、前任和继任节点。获取资源失败的线程将成为加入同步阻塞队列的节点的尾部,同时阻塞当前线程(Java线程处于等待状态,释放CPU使用权)。首节点:同步阻塞队列遵循FIFO(先进先出),首节点为成功获取资源的节点。当第一个节点的线程释放资源时,它会唤醒后续节点再次尝试获取资源,后续节点在成功获取资源后会将自己设置为第一个节点。静态最终类节点{
/**
*表示节点正在共享模式下等待的标记
*/
静态最终AbstractQueuedSynchronizer。节点共享=新AbstractQueuedSynchronizer。node();
/**
*表示节点正在独占模式下等待的标记
*/
静态最终AbstractQueuedSynchronizer。节点EXCLUSIVE=null
/**
* waitStatus值表示线程已取消
*/
静态final int CANCELLED=1;
/**
* waitStatus值,用于指示后继线程需要解锁
*/
静态最终int信号=-1;
/**
* waitStatus值表示线程正在等待条件
*/
静态最终int条件=-2;
/**
* waitStatus值指示下一个acquireShared应
*无条件传播
*/
静态最终int PROPAGATE=-3;
//等待状态
易变int waitStatus
//前置节点
易失性AbstractQueuedSynchronizer。节点prev
//后续节点
易失性AbstractQueuedSynchronizer。下一个节点;
/**
*将此节点排入队列的线程。初始化于
*施工和使用后作废。
*/
易变线程Thread;
//条件等待队列的后续节点
AbstractQueuedSynchronizer。节点nextWaiter
/**
*如果节点在共享模式下等待,则返回true。
*/
最终布尔值isShared() {
return nextWaiter==SHARED
}
/**
*返回上一个节点,如果为空,则引发NullPointerException。
*当前置任务不能为空时使用。空支票可以
*被省略,但存在以帮助虚拟机。
*
* @返回此节点的前任
*/
最终抽象队列同步器。节点前置任务()引发NullPointerException {
AbstractQueuedSynchronizer。节点p=prev
if (p==null)抛出新的NullPointerException();
否则返回p;
}
Node() { //用于建立初始头或共享标记
}
节点(线程Thread,AbstractQueuedSynchronizer。节点模式){ //由addWaiter使用
this.nextWaiter=mode
this.thread=线程;
}
节点(Thread thread,int waitStatus) { //由条件使用
this . wait status=wait status;
this.thread=线程;
}
}等待状态
在节点中,volatile int waitStatus属性用于指示节点的等待状态。
节点有以下等待状态:
Canceled,值为1,因为在同步阻塞队列中等待的线程已经超时或者被中断,需要从同步阻塞队列中取消等待。当节点进入这种状态时,信号不会改变,值为-1。后继节点的线程处于等待状态,如果当前节点的线程释放同步状态或者被取消,就会通知后继节点,让后继节点的线程运行条件。当值为-2时,节点在条件等待队列中,节点线程正在等待该条件。当其他线程对条件调用signal()方法时,节点将从条件等待队列转移到同步阻塞队列,并加入同步状态获取。值为-3的PROPAGATE表示将无条件传播下一个共享同步状态获取。INITIAL,值为0,初始状态获取和释放资源。
获取资源时,获取资源失败的线程会被加入同步阻塞队列,在队列中自旋;移出队列(或停止旋转)的条件是前一个节点为头节点,并成功获取资源。在释放资源时,AQS调用tryRelease(int arg)方法释放资源,然后唤醒头节点的后继节点。获取资源下面描述获取资源时的执行过程。
调用AQS的acquire(int arg)方法来获取资源。
acquire(int arg)方法是独占获取资源,其调用过程如下图所示。
用文字描述acquire(int arg)方法的调用流程:首先调用自定义AQS实现的tryAcquire(int arg)方法,用来尝试获取资源:
如果资源获取成功,它将直接从acquire(int arg)方法返回。如果资源获取不成功,节点将被构造并添加到同步阻塞队列的尾部。最后,将调用acquireQueued(Node node,int arg)方法,使节点尝试以“无限循环”的方式获取资源。只有当前节点的前任节点为头节点,才能尝试获取资源。如果当前节点的前任节点为头节点,且资源获取成功,则将当前节点设置为头节点,从获取的(node node,intarg)方法返回。如果当前节点的前任节点不是头节点或者资源获取失败,当前线程将被阻塞,线程被唤醒后将继续循环操作。被获取的(node node,intarg)方法的调用过程也叫“自旋过程”。
spin是什么意思?我的理解是:Spin是一个无限循环,执行一定的操作序列,直到满足一定的条件才退出循环。
/**
*以独占模式采集,忽略中断。执行
*至少调用一次{@link #tryAcquire},
*成功返回。否则,该线程可能被排队
*反复封锁和解除封锁,调用{@link
* #tryAcquire}直到成功。这种方法可以使用
*实现方法{@link Lock#lock}。
*
* @param arg获取参数。该值被传送到
* {@link #tryAcquire}但未被解释
*可以代表你喜欢的任何东西。
*/
public final void acquire(int arg){
如果(!try acquire(arg)acquire queued(addWaiter(Node。独家),arg))
self interrupt();
} acquire(intarg)的主要逻辑是:
首先,调用自定义AQS实现的tryAcquire(int arg)方法,该方法确保线程安全地获取资源:
如果资源获取成功,它将直接从acquire(int arg)方法返回。如果资源获取不成功,则为同步节点。独占节点,同一时刻只有一个线程可以成功获取资源)会被构造出来,通过addWaiter(Node node)方法将该节点添加到同步阻塞队列的尾部,最后调用acquireQueued(Node node,int arg)方法使该节点处于“无限循环”中,如果没有,则阻塞该节点中的线程,阻塞线程的唤醒主要通过前置节点的出队或阻塞线程的中断来实现。/**
*在独占的不可中断模式下获取线程
*排队。由条件等待方法和获取方法使用。
*
* @param node该节点
* @param arg获取参数
* @return {@code true}如果在等待时被打断
*/
最终布尔型acquireQueued(最终节点Node,int arg) {
布尔失败=真;
尝试{
布尔中断=假;
for(;) {
最终节点p=Node . predecessor();
if (p==head tryAcquire(arg)) {
setHead(节点);
p.next=null//帮助垃圾收集
失败=假;
返回中断;
}
if(shouldparkaftefailedacquire(p,node)
parkAndCheckInterrupt())
中断=真;
}
}最后{
如果(失败)
cancelAcquire(节点);
}
}在Acquired (final node node,int arg)方法中,当前线程在“无限循环”中尝试获取资源,但只有前任节点是头节点才能尝试获取资源。为什么?原因有二,如下。
第一,头节点是成功获取资源的节点,头节点的线程在释放资源后会唤醒它的继任者。后继节点的线程被唤醒后,需要检查其前任节点是否为头节点。第二,维持同步阻塞队列的FIFO原则。释放资源当前线程获取资源并执行相应逻辑后,需要释放资源,以便后续节点继续获取资源。
下面描述释放资源时的执行过程。
您可以通过调用AQS的release(int arg)方法来释放资源。该方法在释放资源后,会唤醒头节点的后继者,然后让后继者再次尝试获取资源。
/**
*以独占模式发布。通过解锁一个或
*如果{@link #tryRelease}返回true,则有更多线程。
*此方法可用于实现方法{@link Lock#unlock}。
*
* @param arg释放参数。该值被传送到
* {@link #tryRelease}但未被解释
*可以代表你喜欢的任何东西。
* @return从{@link #tryRelease}返回的值
*/
公共最终布尔发布(int arg) {
if (tryRelease(arg)) {
节点h=头;
如果(h!=null h.waitStatus!=0)
un park successor(h);
返回true
}
返回false
}执行release (intarg)方法时,会唤醒头节点的后继线程。unparksuccess (nodenode)方法使用LockSupport#unpark()方法唤醒处于等待状态的线程。
资源的共享获取和释放以上是关于资源的独占获取/释放。
共享获取和独占获取的主要区别在于多线程是否可以同时获取资源。以一个文件的读写为例。如果一个程序正在读一个文件,那么此时所有对该文件的写操作都被阻塞,读操作可以同时进行。写操作需要独占访问资源,而读操作可以共享访问。
当共享对资源的访问时,允许其他共享访问,而阻止独占访问。当对资源的独占访问被阻止时,其他访问也同时被阻止。共享资源访问。
调用AQS的acquireShared(int arg)方法以共享方式获取资源。
在acquireShared(int arg)方法中,AQS调用tryaccureshared(int arg)方法来尝试获取资源。tryaccureshared(intarg)方法的返回值是int类型,当返回值=0时,表示可以获取资源。
可以看出,在doacquereshared (intarg)方法的旋转过程中,只有当前节点的前任是头节点,才能尝试获取资源。如果资源获取成功(返回值=0),将当前节点设置为头节点,并退出旋转过程。
public final void acquire shared(int arg){
if (tryAcquireShared(arg) 0)
doAcquireShared(arg);
}
private void doAcquireShared(int arg){
最终节点node=addWaiter(Node。分享);
布尔失败=真;
尝试{
布尔中断=假;
for(;) {
最终节点p=Node . predecessor();
if (p==head) {
int r=try acquire shared(arg);
如果(r=0) {
setHeadAndPropagate(node,r);
p.next=null//帮助垃圾收集
如果(中断)
self interrupt();
失败=假;
返回;
}
}
if(shouldparkaftefailedacquire(p,node)
parkAndCheckInterrupt())
中断=真;
}
}最后{
如果(失败)
cancelAcquire(节点);
}
}资源的共享释放
调用releaseShared(int arg)方法来释放资源。这种方法在释放资源后,会唤醒头节点的后继者,然后让后继者再次尝试获取资源。
对于可以支持多线程同时访问的并发组件(比如信号量),它们和独占的主要区别在于tryreaseshared(int arg)方法必须保证资源的安全释放,因为释放资源的操作会同时来自多个线程。确保资源的安全释放一般是通过循环和CAS来保证的。
public final boolean release shared(int arg){
if(tryleaseshared(arg)){
doreleasshared();
返回true
}
返回false
}独占超时获取资源调用AQS的doacquerenos (Intarg,Long Nanos Timeout)方法及时获取资源,即在指定时间段内获取资源,如果获取资源成功则返回true,否则返回false。
这个方法提供了传统Java同步操作(比如synchronized关键字)所不具备的特性。
在分析这种方法的实现之前,先介绍一下响应中断获取资源的过程。
在Java 5之前,当一个线程不能获得锁并被阻止同步时,它就被中断了。此时,线程的中断标志位将被修改,但线程仍将被阻塞在synchronized上,等待获得锁。在Java 5中,AQS提供了Acquired interruptible(Intarg)方法,如果当前线程在等待获取资源时被中断,该方法将立即返回并抛出InterruptedException。acquire(int arg)方法对中断不敏感,也就是说,因为线程在获取资源失败后进入同步阻塞队列,所以当线程随后被中断时,线程不会被移出同步阻塞队列。
超时资源获取过程可以看作是响应中断的资源获取过程的“加强版”,DOACQUIRENOS (Intarg,Long Nanos Timeout)方法在支持中断响应的基础上增加了超时获取的特性。
对于超时采集,主要需要计算需要睡眠的时间间隔,NanoTimeout。为防止过早通知,NanoTimeout的计算公式为:nano time out-=now-lastTime,其中now为当前唤醒时间,last time为上次唤醒时间。如果NanoTimeout大于0,说明超时没有过期,需要继续休眠NanoTimeout纳秒;否则,意味着它已经超时。
public final boolean try acquire nanos(int arg,long nanosTimeout)
引发中断的异常{
if (Thread.interrupted())
抛出new interrupted exception();
return tryAcquire(arg)
doacquirenos(arg,nanos time out);
}
/**
*在独占定时模式下采集。
*
* @param arg获取参数
* @param nanosTimeout最大等待时间
* @return {@code true}如果获得
*/
private boolean doacquirenos(int arg,long nanosTimeout)抛出InterruptedException {
if(毫微秒超时=0L)
返回false
final long deadline=system . nano time()nanos time out;
最终节点node=addWaiter(Node。独家);
布尔失败=真;
尝试{
for(;) {
最终节点p=Node . predecessor();
if (p==head tryAcquire(arg)) {
setHead(节点);
p.next=null//帮助垃圾收集
失败=假;
返回true
}
nanos time out=deadline-system . nano time();
if(毫微秒超时=0L)
返回false
if(shouldparkaftefailedacquire(p,node)
nanos time out spinForTimeoutThreshold)
LockSupport.parkNanos(这个,nanos time out);
if (Thread.interrupted())
抛出new interrupted exception();
}
}最后{
如果(失败)
cancelAcquire(节点);
}
}在spin过程中,该方法在节点的前任节点为头节点时尝试获取资源,如果成功获取资源,则从该方法返回。这个过程类似于独占同步获取,但在资源获取失败的处理上有所不同。
如果当前线程无法获取资源,请确定它是否已超时(NanoTimeout小于或等于0,表示它已超时)。如果没有超时,重新计算超时间隔NanoTimeout,然后让当前线程等待NanoTimeout纳秒(当达到设置的超时时,线程将从locksupport.parkNanos(对象拦截器,Long Nanos)方法返回)。
如果nanosTimeout小于或等于spinForTimeoutThreshold(1000纳秒),线程将不会等待超时,而是进入快速旋转进程。原因是很短的超时不可能很准确,而如果这个时候再等,nanosTimeout的超时就整体不准确了。因此,在超时非常短的情况下,AQS将进入无条件快速旋转。
独占超时获取资源的过程如下。
从图中可以看出,独占超时获取资源doacquirenos(Intarg,Long Nanos Timeout)和独占获取资源acquire(int args)在流程上非常相似,主要区别在于未获取资源时的处理逻辑。
Acquire(int args)会让当前线程在得不到资源时一直等待,而doacquirenos(int arg,long Nanotimeout)会让当前线程一直等待Nanotimeout纳秒。如果当前线程没有在Nanotimeout纳秒内获得资源,它将自动从等待逻辑返回。
条件技术的实现原理是为解决问题而生的。通过条件,我们可以实现等待/通知功能。
ConditionObject是AQS的内部类。每个条件对象都包含一个条件等待队列,它是实现条件对象等待/通知功能的关键。
我们来分析一下条件的实现原理,包括条件等待队列、等待和通知。
以下条件,除非特别说明,均指ConditionObject。
条件等待队列条件依赖于内部条件等待队列(FIFO双向队列)来实现等待/通知功能。
条件等待队列的工作机制:节点:条件等待队列中的每个节点都包含一个线程引用,就是等待条件对象的线程。如果一个线程调用Condition.await()方法,该线程将释放其资源,并被构造为一个节点加入条件等待队列的尾部,同时,该线程的状态将变为等待状态。实际上,条件等待队列中节点的定义重用了AQS节点的定义,也就是说,同步阻塞队列和条件等待队列中节点的类型都是AQS的静态内部类abstractquedsynchronizer.node。
在对象的监视器模型上,一个对象有一个同步阻塞队列和一个条件等待队列,而包中的锁(更确切地说是AQS)有一个同步阻塞队列和几个条件等待队列。
下面介绍一下等待线程的执行过程。
调用条件的await()方法(或者以await开头的方法)会导致当前线程释放资源,成为加入条件等待队列的节点的尾部,线程状态会变为等待状态。
如果从队列(同步阻塞队列和条件等待队列)的角度来看await()方法,当调用await()方法时,同步阻塞队列的第一个节点(获得锁的节点)移动到条件等待队列。并且同步阻塞队列的第一个节点不会直接加入条件等待队列。相反,当前线程由addConditionWaiter()方法构造成一个新节点,并被添加到条件等待队列中。
/**
*实现可中断条件等待。
* ol
* li如果当前线程被中断,抛出InterruptedException。
* li保存{@link #getState}返回的锁定状态。
* li调用{@link #release},将保存状态作为参数,
*如果失败,抛出IllegalMonitorStateException。
* li阻塞,直到发出信号或中断。
* li通过调用专用版本的
* {@link #acquire}将保存的状态作为参数。
* li如果在步骤4中被阻塞时被中断,抛出InterruptedException。
* /ol
*/
public final void await()引发InterruptedException {
if (Thread.interrupted())
抛出new interrupted exception();
node node=addConditionWaiter();
int saved state=fully release(node);
int interrupt mode=0;
而(!isOnSyncQueue(节点)){
LockSupport.park(这个);
if((interrupt mode=checkInterruptWhileWaiting(node))!=0)
打破;
}
if (acquireQueued(node,savedState) interruptMode!=THROW_IE)
interruptMode=REINTERRUPT
if (node.nextWaiter!=null) //如果取消则清除
unlinkCancelledWaiters();
if (interruptMode!=0)
reportInterruptAfterWait(interrupt mode);
}注意这里有一个唤醒等待线程的执行过程介绍。
调用Condition的signal()方法将唤醒条件等待队列中等待时间最长的节点(第一个节点)。在唤醒节点之前,当前节点将从条件等待队列移动到同步阻塞队列。
在等待条件队列中的节点唤醒后,被唤醒的线程试图以“无限循环”的方式获取资源。成功获取资源后,被唤醒的线程将从之前调用的await()方法返回。
如果被唤醒的线程没有被另一个调用Condition.signal()方法的线程唤醒,而是中断了正在等待的线程,则会引发InterruptedException。
被唤醒的线程会在await()方法中退出while循环(isOnSyncQueue(Node node)方法返回true,节点已经在同步阻塞队列中),然后调用AQS的acquireQueued()方法,尝试以“无限循环”的方式获取资源。成功获取资源后,被唤醒的线程将从之前调用的await()方法返回。
Condition的signalAll()方法相当于对条件等待队列中的每个节点执行一次signal()方法,其作用是将条件等待队列中的所有节点移动到同步阻塞队列中,并唤醒每个节点的线程。
虽然唤醒了各个节点的线程,但是这些线程都需要尝试获取资源,但是只有一个线程能够成功获取资源,然后从await()方法返回;其他未能获取资源的线程将被添加到同步阻塞队列中,并在队列中旋转;移出队列(或停止旋转)的条件是前一个节点为头节点,并成功获取资源。
/**
*将等待时间最长的线程(如果存在)从
*此条件的等待队列为
*拥有锁。
*
* @抛出IllegalMonitorStateException if { @ link # is heldexclusive }
*返回{@code false}
*/
公共最终无效信号(){
如果(!isHeldExclusively())
抛出新的IllegalMonitorStateException();
Node first=firstWaiter
如果(先!=空)
多希纳尔(第一);
}
/**
*删除并转移节点,直到找到未取消的节点或
*空。从信号中分离出来,部分是为了鼓励编译器
*内联无服务员的情况。
* @param first(非空)条件队列中的第一个节点
*/
私有空剂量信号(节点优先){
做{
if((first waiter=first . next waiter)==null)
lastWaiter=null
first.nextWaiter=null
} while(!transferForSignal(第一个)
(first=firstWaiter)!=null);
}版权归作者所有:原创作品来自博主小二上九8,转载请联系作者获得授权,否则将追究法律责任。
郑重声明:本文由网友发布,不代表盛行IT的观点,版权归原作者所有,仅为传播更多信息之目的,如有侵权请联系,我们将第一时间修改或删除,多谢。