多线程编程的核心思想是,多线程编程的核心思想有哪些
尊重原创版权:https://www.gewuweb.com/hot/12138.html
多线程编程核心思想目录
多线程核心锁框架锁和条件接口可重入锁公平锁和不公平锁读写锁降级和锁升级队列同步器AQS在底层实现公平锁公平吗?条件实现原理自实现锁类原子类原子类介绍ABA问题及解决方案并发容器传统容器是线程安全的吗?并发容器介绍阻塞队列多线程编程的核心在前面我们学习了多线程的底层运行机制,最后才知道原来多线程环境存在这么多问题。
在JDK5之前,我们只能选择synchronized关键字来实现锁,但是在JDK5之后,由于volatile
关键字已升级,因此出现了并发框架包。与传统的synchronized关键字相比,我们对锁的实现有了更多的选择。
如果说IT的历史是靠人联系起来的,那么道格一定是不可或缺的。
李.这个鼻子上戴着眼镜,留着德国威廉二世国王的胡子,脸上带着谦逊腼腆的微笑的老人,一直在纽约州立大学奥斯威戈计算机科学系工作。
毫不夸张地说,他是全世界Java上最有影响力的人。因为Java历史上的两次巨变,他都间接或直接的起到了重要的作用。Tiger于2004年推出。Tiger采用了15个JSR(Java
规格
请求),其中之一是JSR-166。JSR-166来自道格写的util.concurrent包。
让我们感受一下JUC给我们带来了什么。
* * lock的锁框架
5.之后,在契约中添加一个新的锁接口(以及相关的实现类)来实现锁功能。lock接口提供了类似synchronized关键字的同步功能,但是在使用时需要手动获取和释放锁。
条件接口使用和发出的锁与我们传统的同步锁不同。这里的锁可以算是真正的锁。
每个锁都是一个对应的锁对象,我只需要从锁对象中获取或者释放锁。
让我们首先看看这个接口中定义了什么:
公共接口锁{
//获取锁。如果你拿不到锁,它就会阻塞。等待其他线程释放锁,然后在获得锁后返回。
void lock();
//同上,但等待时会响应中断。
void lockInterruptibly()引发InterruptedException
//尝试获取锁,但锁不会阻塞。如果可以获得,则返回true,而不是false。
布尔tryLock();
//尝试获取锁,但可以限制超时。如果超时后还没有获得锁,返回false,否则返回true,就可以响应中断了。
布尔tryLock(long time,TimeUnit单位)抛出InterruptedException
//释放锁定
void unlock();
//暂时可以理解为替代Object的传统wait()和notify()操作的工具。
condition new condition();
}
* *在这里,我们可以演示如何使用Lock类来锁定和释放锁:
公共类Main {
private static int I=0;
公共静态void main(String[] args)引发InterruptedException {
//ReentrantLock类是Lock类的实现。
lock testLock=new reentrant lock();
可运行的操作=() - {
for(int j=0;j 100000J) {//以自动递增运算为例。
//锁定。如果其他线程在锁成功锁定后想要获取锁,它们将阻塞并等待当前线程释放。
test lock . lock();
我;
//Unlock,释放锁后,其他线程可以获取锁(注意,在此之前必须锁定锁,否则会报错)
test lock . unlock();
}
};
新线程(操作)。start();
新线程(操作)。start();
thread . sleep(1000);//等待上述两个线程运行完毕
system . out . println(I);
}
}
如您所见,与我们之前使用的synchronized相比,我们在这里实际上是在操作一个“锁”对象,
需要加锁的时候只需要调用lock()方法,需要解锁的时候只需要调用unlock()方法。
运行程序的最终结果和使用同步锁是一样的。
* *那么,我们如何像传统的锁定一样调用对象的wait()和notify()方法,并契约出契约来提供条件接口:
公共接口条件{
//就像调用锁对象的wait方法一样,会进入等待状态,
//但是这里需要调用条件的signal或者signalAll方法来唤醒,
//在等待状态下,可以响应中断。
void await()引发InterruptedException
//同上,但不响应中断(从名字就能猜到)
void awaitUninterruptibly();
//等待指定的时间。如果在指定时间(纳秒)内被唤醒,将返回剩余时间。如果超时,它将返回0或负数,这可以响应中断。
long awaitNanos(long nanos time out)抛出InterruptedException
//等待指定的时间(可以指定时间单位)。如果在等待时间内醒来,返回true否则,返回false,您可以响应中断。
boolean await(long time,TimeUnit单位)引发InterruptedException。
//可以指定显式的时间点。如果在时间点之前醒来,返回true,否则返回false,可以响应中断。
boolean awaitUntil(日期截止时间)引发InterruptedException
//唤醒一个等待的线程。请注意,在运行它之前,您仍然需要获得一个锁。
void signal();
//同上,但它唤醒所有等待的线程。
void signal all();
}
* *演示:
公共静态void main(String[] args)引发InterruptedException {
lock testLock=new reentrant lock();
condition condition=testlock . new condition();
新线程(()- {
test lock . lock();//就像synchronized一样,在使用await之前必须持有锁。
System.out.println(线程1进入等待状态!);
尝试{
condition . await();//进入等待状态
} catch (InterruptedException e) {
e . printstacktrace();
}
线程1正在等待结束!);
test lock . unlock();
}).start();
thread . sleep(100);//阻止线程2先运行
新线程(()- {
test lock . lock();
System.out.println(线程2开始唤醒其他等待的线程);
condition . signal();//唤醒线程1,但是线程1还是要获取锁才能继续运行。
System.out.println(线程2结束);
test lock . unlock();//在这里释放锁后,线程1可以获得锁,继续运行。
}).start();
}
可以发现,条件对象使用方法与传统的对象使用方法差别不大。
* *思考:下面的情况和上面有什么不同?
公共静态void main(String[] args)引发InterruptedException {
lock testLock=new reentrant lock();
新线程(()- {
test lock . lock();
System.out.println(线程1进入等待状态!);
尝试{
testLock.newCondition()。await();
} catch (InterruptedException e) {
e . printstacktrace();
}
线程1正在等待结束!);
test lock . unlock();
}).start();
thread . sleep(100);
新线程(()- {
test lock . lock();
System.out.println(线程2开始唤醒其他等待的线程);
testLock.newCondition()。signal();
System.out.println(线程2结束);
test lock . unlock();
}).start();
}
根据分析,调用newCondition()后,会生成一个新的条件对象。
并且同一个锁中可以有多个条件对象(实际上在原来的锁机制中只能有一个等待队列,这里可以创建很多条件来实现多个等待队列)。
在上面的例子中,实际使用了不同的条件对象,只有等待和唤醒同一个条件对象才会有效,而不同的条件对象是分开计算的。
* *最后,让我们再次解释一下时间单位,它是一个枚举类,也位于java.util.concurrent包下:
公共枚举时间单位{
/**
表示千分之一微秒的时间单位*/
纳秒{
public long toNanos(long d){ return d;}
公龙toMicros(龙d) {回程d/(C1/C0);}
公long toMillis(long d){ return d/(C2/C0);}
public long to seconds(long d){ return d/(C3/C0);}
public long to minutes(long d){ return d/(C4/C0);}
公共long to hours(long d){ return d/(C5/C0);}
公long toDays(long d){ return d/(C6/C0);}
public long convert(long d,time unit u){ return u . tonanos(d);}
int excessNanos(long d,long m){ return(int)(d-(m * C2));}
},
//.
你可以看到有许多时间单位,如日、秒和分钟。
等等。我们可以直接用它作为时间单位。例如,如果我们想让一个线程等待3秒钟,可以写成如下形式:
公共静态void main(String[] args)引发InterruptedException {
lock testLock=new reentrant lock();
新线程(()- {
test lock . lock();
尝试{
System.out.println(等待不超时: testlock.newcondition()。await (1,time unit . seconds));
} catch (InterruptedException e) {
e . printstacktrace();
}
test lock . unlock();
}).start();
}
当然,Lock类的tryLock方法也支持使用时间单位,大家可以自己测试一下。
除了表示为时间单位之外,时间单位还可以在不同单位之间转换:
公共静态void main(String[] args)引发InterruptedException {
System.out.println(60秒=时间单位.秒.到分(60)分);
System.out.println(365天= TimeUnit。DAYS.toSeconds(365)“秒”);
}
还可以更方便地使用对象的wait()方法:
公共静态void main(String[] args)引发InterruptedException {
同步(Main.class) {
System.out.println(开始等待);
时间单位。SECONDS.timedWait(Main.class,3);//只需等待3秒。
System.out.println(等待结束);
}
}
我们也可以直接用它来冬眠:
公共静态void main(String[] args)引发InterruptedException {
时间单位。seconds . sleep;//睡眠1秒钟
}
在重新进入锁之前,我们解释了锁框架的两个核心接口,那么我们来看看锁接口的具体实现类。
我们前面用了ReentrantLock,其实是一种叫做reentrant lock的锁。那么这个可重入锁意味着什么呢?
简单来说,就是同一个线程可以重复锁定:
公共静态void main(String[] args)引发InterruptedException {
reentrant lock lock=new reentrant lock();
lock . lock();
lock . lock();//连续锁定两次。
新线程(()- {
System.out.println(线程2要获取锁);
lock . lock();
System.out.println(线程2成功获取锁);
}).start();
lock . unlock();
System.out.println(线程1释放锁一次);
时间单位。seconds . sleep;
lock . unlock();
System.out.println(线程1再次释放锁);//在其他线程锁定它之前释放它两次
}
可以看到主线程已经连续锁了两次(这个操作不会被阻塞),
如果当前线程持有锁,它将不会被阻止继续锁定。而且锁锁了几次,就必须解锁几次,否则线程还是会持有锁的。
我们可以使用getHoldCount()方法来检查当前线程的锁定次数:
公共静态void main(String[] args)引发InterruptedException {
reentrant lock lock=new reentrant lock();
lock . lock();
lock . lock();
System.out.println(当前锁定次数: lock.getHoldCount(),是否锁定: lock . is locked());
时间单位。seconds . sleep;
lock . unlock();
System.out.println(当前锁定次数: lock.getHoldCount(),是否锁定: lock . is locked());
时间单位。seconds . sleep;
lock . unlock();
System.out.println(当前锁定次数: lock.getHoldCount(),是否锁定: lock . is locked());
}
可以看到,当锁不再被任何线程持有时,值为0,isLocked()方法的查询结果为false。
事实上,如果有一个线程持有当前锁,其他线程在获取锁时会临时进入等待队列。我们可以使用getQueueLength()
方法来获取等待线程的估计数量:
公共静态void main(String[] args)引发InterruptedException {
reentrant lock lock=new reentrant lock();
lock . lock();
线程t1=新线程(锁:锁),t2=新线程(锁:锁);
t1 . start();
T2 . start();
时间单位。seconds . sleep;
System.out.println(当前等待锁释放的线程数: lock . getqueuelength());
System.out.println(是等待队列中的线程1: lock . hasqueuedthread(t1));
System.out.println(是等待队列中的线程2: lock . hasqueuedthread(T2));
System.out.println(是等待队列中的当前线程: lock . hasqueuedthread(thread . current thread())));
}
我们可以通过hasQueuedThread()方法判断一个线程是否在等待获取锁状态。
同样,病情也可以这样判断:
公共静态void main(String[] args)引发InterruptedException {
reentrant lock lock=new reentrant lock();
condition condition=lock . new condition();
新线程(()- {
lock . lock();
尝试{
condition . await();
} catch (InterruptedException e) {
e . printstacktrace();
}
lock . unlock();
}).start();
时间单位。seconds . sleep;
lock . lock();
System.out.println(当前条件的等待线程数: lock . getwaitqueuelength(Condition));
condition . signal();
System.out.println(当前条件的等待线程数: lock . getwaitqueuelength(Condition));
lock . unlock();
}
getWaitQueueLength()方法可用于检查当前有多少线程正在等待相同的条件。
公平锁和不公平锁我们前面学过,如果线程竞争同一个锁,它们会暂时进入等待队列。
那么,多线程获取锁的顺序是否一定是由线程调用lock()方法的时间决定的呢?
我们可以看到,在ReentrantLock的构造方法中,是这样写的:
public ReentrantLock() {
sync=new NonfairSync();//看名字好像不太公平。
}
其实锁分公平锁和不公平锁。默认情况下,我们创建的ReentrantLock使用不公平锁作为底层锁机制。
* *那么什么是公平锁,什么是不公平锁呢?
公平锁:多个线程按照申请锁的顺序获取锁。线程会直接进入队列进行排队,它们永远是队列中第一个获得锁的。不公平锁:当多个线程试图获取锁时,会试图直接获取。如果他们不能,他们将进入等待队列。如果他们能获得它,他们将直接获得锁。简单来说,公平锁不允许人插队,都是老老实实排队;
让人插队不公平,但是插队的人会不会让你插队就是另一回事了。
让我们测试一下公平锁和不公平锁的性能:
public ReentrantLock(布尔公平){
同步=公平?new FairSync():新的NonfairSync();
}
这里我们选择使用第二种构造方法,我们可以选择它是否是一个公平锁实现:
公共静态void main(String[] args)引发InterruptedException {
reentrant lock lock=new reentrant lock(false);
可运行的操作=() - {
system . out . println( thread thread . current thread()。getName()开始获取锁.);
lock . lock();
system . out . println( thread thread . current thread()。getName()成功获取锁!);
lock . unlock();
};
for(int I=0;I){//建立10个线程
新线程(动作,“T i”)。start();
}
}
这里,我们只需要比较将在1秒钟内开始的锁获取.随着锁的成功收购!
顺序是否一致,如果一致,说明所有线程都在排队以便获取锁,如果不一致,说明肯定有线程插队。
运行结果表明,在公平模式下,是按顺序完成的,而在不公平模式下,通常会出现这种情况:线程刚开始获取锁就可以抢占,很早以前就开始的线程还在等待,所以插队是很明显的。
那么,我们进入下一个问题。公平锁在任何情况下都是公平的吗?
去队列同步器讨论。
读写锁是队列同步器AQS。
* *读写锁除了可重入锁,还有一种锁叫读写锁。当然不是专门用于读写操作的锁。
与可重入锁不同,可重入锁是一种排他锁。当一个线程获得锁时,另一个线程必须等待它释放锁,否则不允许它获得锁。
同时,读写锁允许多个线程获取锁,这实际上是针对读写场景而出现的。
* *读写锁维护了一个读锁和一个写锁,这两个锁的机制是不同的。
读锁:在没有任何线程占用写锁的情况下,同一时间可以有多个线程加读锁。写锁:在没有任何线程占用读锁的情况下,同一时间只能有一个线程加写锁。读写锁也有一个专门的接口:
公共接口读写锁{
//获取读锁
lock read lock();
//获取写锁
lock writeLock();
}
此接口有一个实现类ReentrantReadWriteLock(实现的是读写锁接口,不是锁接口,它本身并不是锁),注意我们操作reentrantreadwritellock时,不能直接上锁,而是需要获取读锁或是写锁,再进行锁操作:
公共静态void main(String[] args)引发中断的异常{
reentrantreadwritellock=new reentrantreadwritellock();
lock.readLock().lock();
新线程(lock.readLock():lock).start();
}
这里我们对读锁加锁,可以看到可以多个线程同时对读锁加锁。
公共静态void main(String[] args)引发中断的异常{
reentrantreadwritellock=new reentrantreadwritellock();
lock.readLock().lock();
新线程(lock.writeLock():lock).start();
}
有读锁状态下无法加写锁,反之亦然:
公共静态void main(String[] args)引发中断的异常{
reentrantreadwritellock=new reentrantreadwritellock();
lock.writeLock().lock();
新线程(lock.readLock():lock).start();
}
并且,ReentrantReadWriteLock不仅具有读写锁的功能,还保留了可重入锁和公平/非公平机制,比如同一个线程可以重复为写锁加锁,并且必须全部解锁才真正释放锁:
公共静态void main(String[] args)引发中断的异常{
reentrantreadwritellock=new reentrantreadwritellock();
lock.writeLock().lock();
lock.writeLock().lock();
新线程(()- {
lock.writeLock().lock();
System.out.println(成功获取到写锁!);
}).start();
System.out.println(释放第一层锁!);
lock.writeLock().unlock();
时间单位10.25秒。睡眠;
System.out.println(释放第二层锁!);
lock.writeLock().unlock();
}
通过之前的例子来验证公平和非公平:
公共静态void main(String[] args)引发中断的异常{
ReentrantReadWriteLock lock=new ReentrantReadWriteLock(true);
可运行的操作=() - {
System.out.println(线程Thread.currentThread().getName()将在一秒后开始获取锁.);
lock.writeLock().lock();
System.out.println(线程Thread.currentThread().getName()成功获取锁!);
lock.writeLock().unlock();
};
for(int I=0;i i ) { //建立10个线程
新线程(动作,“T i”).start();
}
}
可以看到,结果是一致的。
* *锁降级和锁升级锁降级指的是写锁降级为读锁。
当一个线程持有写锁的情况下,虽然其他线程不能加读锁,但是线程自己是可以加读锁的:
公共静态void main(String[] args)引发中断的异常{
reentrantreadwritellock=new reentrantreadwritellock();
lock.writeLock().lock();
lock.readLock().lock();
System.out.println(成功加读锁!);
}
那么,如果我们在同时加了写锁和读锁的情况下,释放写锁,是否其他的线程就可以一起加读锁了呢?
公共静态void main(String[] args)引发中断的异常{
reentrantreadwritellock=new reentrantreadwritellock();
lock.writeLock().lock();
lock.readLock().lock();
新线程(()- {
System.out.println(开始加读锁!);
lock.readLock().lock();
System.out.println(读锁添加成功!);
}).start();
时间单位10.25秒。睡眠;
lock.writeLock().unlock();//如果释放写锁,会怎么样?
}
可以看到,一旦写锁被释放,那么主线程就只剩下读锁了,因为读锁可以被多个线程共享,所以这时第二个线程也添加了读锁。
这种操作被称为“锁降级”(注意,不是先释放写锁,然后添加读锁,而是申请读锁,然后在持有写锁时释放写锁)。
注意,仅用读锁申请写锁属于‘锁升级’。不支持ReentrantReadWriteLock:
公共静态void main(String[] args)引发InterruptedException {
reentrantreadwritellock lock=new reentrantreadwritellock();
lock.readLock()。lock();
lock.writeLock()。lock();
System.out.println(升级成功!);
}
可以看到线程直接卡在加锁这句话里。
* *队列同步器AQS前面我们学习了可重入锁和读写锁,那么它们的底层实现原理是什么呢?
比如我们执行ReentrantLock的lock()方法,它是如何在内部执行的?
公共void锁(){
sync . lock();
}
你可以看到,它的内部其实什么都没做,只是交给了Sync对象。此外,不仅此方法,还有许多其他方法都是由同步对象执行的:
公共void解锁(){
sync . release(1);
}
那么这个同步对象是做什么的呢?
可以看到,公平锁和不公平锁都继承自Sync,Sync继承自AbstractQueuedSynchronizer,简称队列同步器:
抽象静态类Sync扩展AbstractQueuedSynchronizer {
//.
}
静态最终类NonfairSync扩展Sync {}
静态最终类FairSync扩展了Sync {}
因此,为了理解它的底层是如何工作的,我们必须看看队列同步器。先从这里开始吧!
* * AbstractQueuedSynchronizer(以下简称AQS)的底层实现是锁机制的基础,其内部包包括锁的获取、释放和等待队列。
锁(例如排他锁)的基本功能是:
获取并释放锁,当锁被占用时,竞争它的其他线程将进入等待队列,
AQS打包了这些基本功能,
等待队列是核心内容,通过双向链表的数据结构来实现。
每个处于等待状态的线程都可以封装成一个节点,放入一个双向链表中,而双向链表是以队列的形式操作的,看起来是这样的:
在AQ有一个头场和一个尾场。
字段分别记录了双向链表的头节点和尾节点,一系列后续操作都是围绕这个队列进行的。我们先来了解一下每个节点包含的内容:
//每个等待线程都可以是一个节点,每个节点都有很多状态。
静态最终类节点{
//每个节点可以分为独占模式节点或共享模式节点,分别适用于独占锁和共享锁。
静态最终节点SHARED=new Node();
静态最终节点EXCLUSIVE=null
//等待状态,这里都定义了
//唯一大于0的状态表示已经过期。由于超时或中断,此节点可能被取消。
静态final int CANCELLED=1;
//该节点后面的节点被挂起(进入等待状态)
静态最终int信号=-1;
//条件队列中的节点处于这种状态。
静态最终int条件=-2;
//传播,通常用于共享锁
静态最终int PROPAGATE=-3;
易变int waitStatus//等待状态值
易变节点prev//双向链表基本操作
易变节点next
易变线程Thread;//每个线程可以封装成一个节点进入等待队列。
节点nextWaiter//表示等待队列中的模式,作为条件队列中下一个节点的指针。
最终布尔值isShared() {
return nextWaiter==SHARED
}
最终节点前置任务()引发NullPointerException {
节点p=prev
if (p==null)
抛出新的NullPointerException();
其他
返回p;
}
节点(){
}
节点(线程线程,节点模式){
this.nextWaiter=mode
this.thread=线程;
}
节点(线程Thread,int waitStatus) {
this . wait status=wait status;
this.thread=线程;
}
}
开始时,head和tail都是null,state是默认值0:
私有瞬态易变节点头;
私有瞬态易变节点尾;
私有易变int状态;
不要担心双向链表不会被初始化。初始化只有在实际使用时才开始。接下来我们来看看其他的初始化内容:
//使用不安全的类直接操作
private static final Unsafe Unsafe=Unsafe . get Unsafe();
//在内存中记录属性在类中的偏移地址,方便不安全类直接操作内存进行赋值(直接修改对应地址的内存)
私有静态最终长状态偏移量;//这对应于AQS类中的状态成员字段。
私有静态最终长头偏移;//这对应于AQS类中的头节点成员字段。
私有静态最终长尾偏移量;
private status final long waitStatusOffset;
私有静态最终长nextOffset
Static {//静态代码块,它会在加载类时自动获取偏移量地址。
尝试{
state offset=unsafe . objectfield offset
(abstractqueuedsynchronizer . class . getdeclaredfield( state ));
head offset=unsafe . objectfield offset
(abstractqueuedsynchronizer . class . getdeclaredfield( head ));
tail offset=unsafe . objectfield offset
(abstractqueuedsynchronizer . class . getdeclaredfield( tail ));
waitStatusOffset=unsafe . objectfield offset
(node . class . getdeclaredfield( wait status ));
next offset=unsafe . objectfield offset
(node . class . getdeclaredfield( next ));
} catch(Exception ex){ throw new Error(ex);}
}
//通过CAS操作修改头节点
private final boolean compareandsehead(节点更新){
//调用Unsafe类的compareAndSwapObject方法,用CAS算法比较替换对象。
返回unsafe . compareandswapobject(this,headOffset,null,update);
}
//同上,省略部分代码。
private final boolean compareAndSetTail(节点预期,节点更新){
private static final boolean compareAndSetWaitStatus(节点Node,int expect,int update) {
private static final boolean compareAndSetNext(节点Node,节点expect,节点update) {
可以发现,由于队列同步器使用CAS算法,直接使用了不安全的工具类,
Unsafe类提供CAS操作的方法(底层用C实现)。对AQS类中成员字段的所有修改都有相应的CAS操作封装。
* *现在我们对它的基本运行机制有了大致的了解,
然后我们来看看这个类是怎么用的。它提供了一些可重写的方法(可以根据不同的锁类型和机制自由定制规则,对于排他锁和非排他锁都提供了相应的方法)。
以及一些已经写好的模板方法(模板方法会调用这些可重写的方法)。要使用这个类,只需要重写可重写的方法,调用提供的模板方法,就可以实现锁功能(学习了设计模式就能更好的理解了)。
让我们先来看看可重写方法:
//独占获取同步状态,检查同步状态是否与参数一致。如果没有问题,CAS操作将用于设置同步状态并返回true。
受保护的布尔tryAcquire(int arg) {
抛出新的UnsupportedOperationException();
}
//同步状态的独占释放
受保护的布尔tryRelease(int arg) {
抛出新的UnsupportedOperationException();
}
//以共享的方式获取同步状态。如果返回值大于0,则表示成功;否则,它会失败。
protected int try acquire shared(int arg){
抛出新的UnsupportedOperationException();
}
//共享发布同步状态
受保护的布尔型tryleaseshared(int arg){
抛出新的UnsupportedOperationException();
}
//是否被当前线程以独占模式占用(锁是否被当前线程持有)
受保护的布尔值isHeldExclusively
抛出新的UnsupportedOperationException();
}
可以看到,默认情况下,这些需要被重写的方法会直接抛出UnsupportedOperationException。
也就是说,根据不同的锁类型,我们需要实现相应的方法,
让我们看看ReentrantLock中的公平锁(这个类是全局唯一的)是如何用AQS实现的:
静态最终类FairSync扩展Sync {
private static final long serialVersionUID=-3000897897090466540 l;
//调用模板方法acquire的锁定操作。
//为了防止大家晕头转向,请时刻记住,lock方法必须在一个线程下调用才能锁定,
//同时可能有其他线程调用此方法
最终无效锁(){
获取(1);
}
.
}
让我们来看看锁定操作做了什么。这里直接调用了AQS提供的模板方法acquire()。让我们看看它在AQS类中的实现细节:
@ReservedStackAccess
//这是JEP 270新增的注释。它将保护带注释的方法,
//通过添加一些额外空间来防止多线程运行时堆栈溢出。
public final void acquire(int arg){
如果(!tryAcquire(参数)
获得(添加服务员(节点。EXCLUSIVE)、arg)//该节点是独占模式节点。独家。
self interrupt();
}
首先调用TryAcquire()。
方法(此处由FairSync类实现)。如果添加排他锁的尝试失败(返回false),则意味着此时另一个线程可能持有排他锁,因此当前线程必须先等待,然后才会被调用。
addWaiter()方法将线程添加到等待队列中:
私有节点addWaiter(节点模式){
Node Node=new Node(thread . current thread(),mode);
//先尝试用CAS直接加入团队,
//如果此时有其他线程正在加入队列(也就是说,多个线程同时争用这个锁),请输入enq()
节点pred=tail
如果(pred!=null) {
node.prev=pred
if (compareAndSetTail(pred,node)) {
pred.next=node
返回节点;
}
}
//当CAS快速入队失败时调用此方法。
enq(节点);
返回节点;
}
专用节点查询(最终节点节点){
//以spin的形式加入队伍,可以看到这里有一个无限循环
for(;) {
节点t=尾部;
If (t==null) {//这种情况只能说明头节点和尾节点还没有初始化。
if(compareandsethead(new node())//初始化头节点和尾节点
尾巴=头;
}否则{
node . prev=t;
if (compareAndSetTail(t,node)) {
t.next=node
//只有CAS成功了,才算入队成功,
//如果CAS失败,说明其他线程也在同时加入队伍,手速比当前线程快,
//只是到了CAS操作的时候,其他线程会先加入团队,
//那么在这个时候,node.prev不是我们预期的节点,
//是另一个线程的新节点,所以下一个周期需要有另一个CAS。这种形式就是旋转。
return t;
}
}
}
}
* *在了解了addWaiter()方法会将节点添加到等待队列中之后,我们接着来看addWaiter()会返回已经加入的节点,
AcquireQueued()在得到返回的节点时也会进入自旋状态,等待唤醒(即开始进入取锁步骤):
@ReservedStackAccess
最终布尔型acquireQueued(最终节点Node,int arg) {
布尔失败=真;
尝试{
布尔中断=假;
for(;) {
最终节点p=Node . predecessor();
If (p head tryAcquire(arg)) {//可以看到,当这个节点在队列的头部(node.prev head)时,会再次调用tryAcquire方法来获取锁,如果获取成功,会返回进程是否中断的值。
setHead(节点);//将新的头节点设置为当前节点
p.next=null//原来的头节点已经没有存在的意义。
失败=假;//没有失败
返回中断;//直接返回等待过程是否中断。
}
//还是没有成功,
if(shoudparkafterfalledacquire(p,node)//将当前节点的前任节点的等待状态设置为SIGNAL。失败了就直接开始下一个循环,直到成功,成功了就继续往下。
pardcheckinterrupt()//挂起的线程进入等待状态,等待被唤醒。如果它在等待状态下被中断,它将返回true。直接设置中断标志为真,否则正常唤醒,继续旋转。
中断=真;
}
}最后{
如果(失败)
cancelAcquire(节点);
}
}
private final布尔型parkAndCheckInterrupt() {
LockSupport.park(th
is); //通过unsafe类操作底层挂起线程(会直接进入阻塞状态)
return Thread.interrupted();
}
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
return true; //已经是SIGNAL,直接true
if (ws 0) { //不能是已经取消的节点,必须找到一个没被取消的
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus
pred.next = node; //直接抛弃被取消的节点
} else {
//不是SIGNAL,先CAS设置为SIGNAL(这里没有返回true因为CAS不一定成功,需要下一轮再判断一次)
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false; //返回false,马上开启下一轮循环
}
所以, acquire()
中的if条件如果为true,那么只有一种情况,就是等待过程中被中断了,其他任何情况下都是成功获取到独占锁,所以当等待过程被中断时,会调用
selfInterrupt() 方法:
static void selfInterrupt() {
Thread.currentThread().interrupt();
}
这里就是直接向当前线程发送中断信号了。
上面提到了LockSupport类,它是一个工具类,我们也可以来玩一下这个 park 和 unpark :
public static void main(String[] args) throws InterruptedException {
Thread t = Thread.currentThread(); //先拿到主线程的Thread对象
new Thread(() - {
try {
TimeUnit.SECONDS.sleep(1);
System.out.println("主线程可以继续运行了!");
LockSupport.unpark(t);
//t.interrupt(); 发送中断信号也可以恢复运行
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
System.out.println("主线程被挂起!");
LockSupport.park();
System.out.println("主线程继续运行!");
}
* *接着我们来看公平锁的 tryAcquire() 方法:
static final class FairSync extends Sync {
//可重入独占锁的公平实现
@ReservedStackAccess
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread(); //先获取当前线程的Thread对象
int c = getState(); //获取当前AQS对象状态(独占模式下0为未占用,大于0表示已占用)
if (c == 0) { //如果是0,那就表示没有占用,现在我们的线程就要来尝试占用它
if (!hasQueuedPredecessors() //等待队列是否不为空且当前线程没有拿到锁,其实就是看看当前线程有没有必要进行排队,如果没必要排队,就说明可以直接获取锁
compareAndSetState(0, acquires)) { //CAS设置状态,如果成功则说明成功拿到了这把锁,失败则说明可能这个时候其他线程在争抢,并且还比你先抢到
setExclusiveOwnerThread(current); //成功拿到锁,会将独占模式所有者线程设定为当前线程(这个方法是父类AbstractOwnableSynchronizer中的,就表示当前这把锁已经是这个线程的了)
return true; //占用锁成功,返回true
}
}
else if (current == getExclusiveOwnerThread()) { //如果不是0,那就表示被线程占用了,这个时候看看是不是自己占用的,如果是,由于是可重入锁,可以继续加锁
int nextc = c + acquires; //多次加锁会将状态值进行增加,状态值就是加锁次数
if (nextc 0) //加到int值溢出了?
throw new Error("Maximum lock count exceeded");
setState(nextc); //设置为新的加锁次数
return true;
}
return false; //其他任何情况都是加锁失败
}
}
在了解了公平锁的实现之后,是不是感觉有点恍然大悟的感觉,虽然整个过程非常复杂,但是只要理清思路,还是比较简单的。
加锁过程已经OK,我们接着来看,它的解锁过程, unlock() 方法是在AQS中实现的:
public void unlock() {
sync.release(1); //直接调用了AQS中的release方法,参数为1表示解锁一次state值-1
}
@ReservedStackAccess
public final boolean release(int arg) {
if (tryRelease(arg)) { //和tryAcquire一样,也得子类去重写,释放锁操作
Node h = head; //释放锁成功后,获取新的头结点
if (h != null h.waitStatus != 0) //如果新的头结点不为空并且不是刚刚建立的结点(初始状态下status为默认值0,而上面在进行了shouldParkAfterFailedAcquire之后,会被设定为SIGNAL状态,值为-1)
unparkSuccessor(h); //唤醒头节点下一个节点中的线程
return true;
}
return false;
}
private void unparkSuccessor(Node node) {
// 将等待状态waitStatus设置为初始值0
int ws = node.waitStatus;
if (ws 0)
compareAndSetWaitStatus(node, ws, 0);
//获取下一个结点
Node s = node.next;
if (s == null s.waitStatus 0) { //如果下一个结点为空或是等待状态是已取消,那肯定是不能通知unpark的,这时就要遍历所有节点再另外找一个符合unpark要求的节点了
s = null;
for (Node t = tail; t != null t != node; t = t.prev) //这里是从队尾向前,因为enq()方法中的t.next = node是在CAS之后进行的,而 node.prev = t 是CAS之前进行的,所以从后往前一定能够保证遍历所有节点
if (t.waitStatus = 0)
s = t;
}
if (s != null) //要是找到了,就直接unpark,要是还是没找到,那就算了
LockSupport.unpark(s.thread);
}
那么我们来看看 tryRelease() 方法是怎么实现的,具体实现在Sync中:
@ReservedStackAccess
protected final boolean tryRelease(int releases) {
int c = getState() - releases; //先计算本次解锁之后的状态值
if (Thread.currentThread() != getExclusiveOwnerThread()) //因为是独占锁,那肯定这把锁得是当前线程持有才行
throw new IllegalMonitorStateException(); //否则直接抛异常
boolean free = false;
if (c == 0) { //如果解锁之后的值为0,表示已经完全释放此锁
free = true;
setExclusiveOwnerThread(null); //将独占锁持有线程设置为null
}
setState(c); //状态值设定为c
return free; //如果不是0表示此锁还没完全释放,返回false,是0就返回true
}
综上,我们来画一个完整的流程图:
这里我们只讲解了公平锁。
* *公平锁一定公平吗?前面我们讲解了公平锁的实现原理,那么,我们尝试分析一下,在并发的情况下,公平锁一定公平吗?
我们再次来回顾一下 tryAcquire() 方法的实现:
@ReservedStackAccess
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() //注意这里,公平锁的机制是,一开始会查看是否有节点处于等待
compareAndSetState(0, acquires)) { //如果前面的方法执行后发现没有等待节点,就直接进入占锁环节了
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
所以 hasQueuedPredecessors() 这个环节容不得半点闪失,否则会直接破坏掉公平性,假如现在出现了这样的情况:
线程1已经持有锁了,这时线程2来争抢这把锁,走到 hasQueuedPredecessors() ,判断出为 false
,线程2继续运行,然后线程2肯定获取锁失败(因为锁这时是被线程1占有的),因此就进入到等待队列中:
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // 线程2进来之后,肯定是要先走这里的,因为head和tail都是null
if (compareAndSetHead(new Node()))
tail = head; //这里就将tail直接等于head了,注意这里完了之后还没完,这里只是初始化过程
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
if (pred != null) { //由于一开始head和tail都是null,所以线程2直接就进enq()了
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node); //请看上面
return node;
}
而碰巧不巧,这个时候线程3也来抢锁了,按照正常流程走到了 hasQueuedPredecessors() 方法,而在此方法中:
public final boolean hasQueuedPredecessors() {
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
//这里直接判断h != t,而此时线程2才刚刚执行完 tail = head,所以直接就返回false了
return h != t
((s = h.next) == null s.thread != Thread.currentThread());
}
因此,线程3这时就紧接着准备开始CAS操作了,又碰巧,这时线程1释放锁了,现在的情况就是,线程3直接开始CAS判断,而线程2还在插入节点状态,结果可想而知,居然是线程3先拿到了锁,这显然是违背了公平锁的公平机制。
一张图就是:
因此公不公平全看 hasQueuedPredecessors()
,而此方法只有在等待队列中存在节点时才能保证不会出现问题。所以公平锁,只有在等待队列存在节点时,才是真正公平的。
* *Condition实现原理通过前面的学习,我们知道Condition类实际上就是用于代替传统对象的wait/notify操作的,
同样可以实现等待/通知模式,并且同一把锁下可以创建多个Condition对象。
那么我们接着来看看,它又是如何实现的呢,我们先从单个Condition对象进行分析:
* *在AQS中,Condition有一个实现类ConditionObject,而这里也是使用了链表实现了条件队列:
public class ConditionObject implements Condition, java.io.Serializable {
private static final long serialVersionUID = 1173984872572414699L;
/** 条件队列的头结点 */
private transient Node firstWaiter;
/** 条件队列的尾结点 */
private transient Node lastWaiter;
//...
这里是直接使用了AQS中的Node类,但是使用的是Node类中的nextWaiter字段连接节点,并且Node的status为CONDITION:
我们知道,当一个线程调用 await() 方法时,会进入等待状态,直到其他线程调用 signal()
方法将其唤醒,而这里的条件队列,正是用于存储这些处于等待状态的线程。
我们先来看看最关键的 await() 方法是如何实现的,为了防止一会绕晕,在开始之前,我们先明确此方法的目标:
只有已经持有锁的线程才可以使用此方法当调用此方法后,会直接释放锁,无论加了多少次锁只有其他线程调用 signal() 或是被中断时才会唤醒等待中的线程被唤醒后,需要等待其他线程释放锁,拿到锁之后才可以继续执行,并且会恢复到之前的状态(await之前加了几层锁唤醒后依然是几层锁)
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException(); //如果在调用await之前就被添加了中断标记,那么会直接抛出中断异常
Node node = addConditionWaiter(); //为当前线程创建一个新的节点,并将其加入到条件队列中
int savedState = fullyRelease(node); //完全释放当前线程持有的锁,并且保存一下state值,因为唤醒之后还得恢复
int interruptMode = 0; //用于保存中断状态
while (!isOnSyncQueue(node)) { //循环判断是否位于同步队列。
郑重声明:本文由网友发布,不代表盛行IT的观点,版权归原作者所有,仅为传播更多信息之目的,如有侵权请联系,我们将第一时间修改或删除,多谢。