多线程面试题,多线程是什么意思
为什么在序言中需要一个无锁队列?众所周知,多核优化是游戏开发中的重要课题,将工作并行化到多线程是非常常见的场景,无论是工程实践还是算法研究。对于这种场景,我们通常使用线程池命令队列,其中的命令队列会使用互斥或无锁队列。而且由于命令队列的读写是一个较轻的操作,所以使用无锁队列的性能要高于锁操作。因此,无锁队列等无锁数据结构的实现可以视为多线程编程的基石。
推荐视频:https://www.bilibili.com/video/BV1354y1b7nz/
问题无锁队列的典型应用场景是同时存在单(多)线程写和单(多)线程读。那么,为什么原来的队列会有问题呢?我们来看一下原始队列的入队和出队伪代码(省略了边界判断)。
结构节点{
void *数据;
节点*下一个;
};
void排队(节点*节点){
m _ Tail-next=node;
m_Tail=节点;
}
节点*出列(){
Node * res=m _ Head
m _ Head=m _ Head-next;
返回res
}入队和出队有两步。如果有两个以上相同的线程同时写入或读取,可能会发生在第一步之后,其他线程修改头或尾指针,导致完全不可预测的结果。比如下面这种情况,两个线程同时写,导致尾指针失去与队列的链接,添加的节点无法从头访问。
//线程a和线程b的实际情况
nodeC tail=nodeC
head-next=nodeA nodeC-nodeA tail=nodeC
tail-next=nodeB nodeC-nodeB tail=nodeC
tail=nodeB nodeC-nodeB tail=nodeB
Tail=nodeanotec-nodetail=nodea解决方法是在读写之前先加锁,保证只有一个线程在同时读写,或者利用CPU提供的原子操作一次性完成头或尾指针的读写,实现无锁同步。
原子的操作在质子和中子发现之前,人们认为原子是世界上最基本的粒子,原子一词有“不可分”的含义。顾名思义,原子操作是指不可分的操作。当CPU的一个线程执行原子操作时,它不会被其他线程中断或抢占。
典型的原子操作和示意代码如下:
加载/存储:读取和保存。测试和设置:对于bool变量,如果为真,则返回真;如果是false,它会将变量设置为true并返回false。bool TestAndSet(bool * flag){
bool res=* flag
* flag=true
返回res
}Clear:将bool变量设置为false。Exchange:将指定位置的值设置为传入值,并返回其旧值。模板类型名T
T Exchange(T* addr,const T newVal){
T oldVal=* addr
* addr=newVal
返回oldVal
}比较和交换(CAS):将指定位置的值与预期值进行比较,如果相等,则分配一个新值,如果不相等,则将预期值设置为自身。返回设置是否成功。
模板类型名T
bool CompareAndSwap(T* addr,T expected,const T desired){
if(*addr==expected){
*addr=所需;
返回true
}
应为=* addr
返回false
}请注意,有两种类型的ca:弱ca和强ca。弱版本在某些情况下即使满足条件也会返回false,所以一般只用于循环判断,强版本用于其他情况。取和加减乘除数列:用传入的参数对指定位置的值进行加减乘除运算,返回旧值。这里用最简单的I操作来比较和解释为什么需要原子操作。C中的一条I语句将生成三条汇编指令。
int I=0;
我;
Mov eax,dword ptr [i] //将I载入eax寄存器
加eax,1//eax中的值加1。
MOV ORD PTR [I],eax//将eax中的值赋给I的地址,如果两个线程同时对一个变量i=0进行I运算,最后的结果很可能是1而不是2,因为当多个线程并行时,I的值会被加载到不同的寄存器中,然后寄存器中的值会被分别相加和取出,导致后向线程覆盖了前导线程的结果。这种现象被称为竞争条件。如果我们使用windows提供的来自increasing function _ interlocked increment的atom,生成的汇编代码如下
int I=0;
_ interlocked increment((volatile long *)
Mov eax,1 //eax负载1
lockdddword ptr [I],eax//xadd的作用是交换两个操作数的值,并将加法结果保存到前者。
//对I所在的地址数据完成自增操作。一条指令完成I的操作,lock指令也会锁定该操作的内存地址,从而避免可能出现的竞争情况。(同时也可以看出,原子操作是在CPU中实现的,或者是锁定的,只是硬件层面的锁定,成本更少。)
用CAS实现无锁队列。现在让我们开始实现无锁队列。首先定义数据结构。
#杂注一次
#包含windows.h
#包含windef.h
#包含intrin.h
#包含emmintrin.h
使用atomic word=int ptr _ t;
结构原子代码
{
volatile AtomicWord _ next
void*数据;
};
类原子队列
{
volatile AtomicWord _ tail
volatile AtomicWord _ head
公共:
atomic queue();
~ AtomcQueue();
void入队(atomic node * node);
atomic node * Dequeue();
}使用intptr_t保存我们的指针变量,每个节点使用指针保存数据,队列包含一个头指针和一个尾指针。会用到两种原子操作(PS:本文基于64位环境,指针长度为64位。如果需要在32位环境下使用,请更换为32位版本功能):
静态内联atomic word atomic exchange explicit(可变AtomicWord* p,AtomicWord val)
{
return(atomic word)_ interlockedexchange 64((volatile LONGLONG *)p,(LONGLONG)val);
}
静态内联bool AtomicCompareExchangeStrongExplicit(可变AtomicWord* p,AtomicWord* oldval,AtomicWord newval)
{
return _ interlockedcompareexchange 64((volatile long long *)p,(LONGLONG)newval,(LONGLONG)*oldval)!=0;
}我们用一个虚拟节点,可以省去很多边界判断。
AtomicQueue() {
atomic node * dummy=new atomic node();
dummy-_ next=0;
_ tail=(atomic word)dummy;
_ head=(atomic word)dummy;
}
~AtomicQueue() {
atomic node * dummy=(atomic node *)_ head;
删除哑元;
}使用Exchange进行入队,使用CAS spin进行出队。
void排队(原子节点*节点){
AtomicNode * prev
node-_ next=0;
prev=(atomic node *)atomic exchange explicit(_ tail,(atomic word)node);
prev- _next=(AtomicWord)节点;
}
AtomicNode* Dequeue() {
AtomicNode* res,* next
void*数据;
AtomicWord head=_ head
AtomicWord newHead
做
{
//离队时,最后剩下的节点就是最后一个加入队伍的节点。
RES=(atomic node *)head;
next=(atomic node *)RES-_ next;
if (next==nullptr)
返回nullptr
数据=下一个数据;
newHead=(atomic word)next;
//比较_head指针是不是我们之前获取的。如果成功,设置newHead,如果失败,旋转它。
} while(!AtomicCompareExchangeStrongExplicit(_ head,head,new head));
res- data=数据;
返回res
}原则上并不难理解。入队是将新节点和原来的尾节点交换,出队是用CAS判断我们缓存的头节点和队列的头节点是否相同(如果不同,说明已经被其他线程修改了)。
然而,我们的头和尾指针有volatile关键字。这是什么意思?
易变和常量优化C编译器会为我们做很多优化,但是有时候这些优化会造成意想不到的结果。我们离队的时候用了循环判断。
做{
res=head
newHead=RES-next;
}
而(!CAS(_head,head,new head));也就是说,我们一直在判断_head是否等于head,head本来就等于_head。编译器不知道另一个线程可能正在修改_head的值,所以可能优化了_head和head的比较,只从内存中读取一次_head的值并存储在寄存器中,然后一直使用寄存器中的数据,使得我们的spin wait无效。这就是不断优化。
优化的原因是寄存器的读写速度远高于内存,编译器会减少从内存中读取数据的次数。volatile关键字告诉编译器不要不断地优化这个变量,而是每次都在内存中读取它。
ABA问题虽然上面我们已经实现了可用的无锁队列,但是还有一个潜在的问题没有解决,那就是ABA问题。ABA问题是什么?我们用一个例子来说明一下。
//线程A线程B队列
A(0x114)- B(0x514) Head=B
head=B,newhead=C
b队,删除b。
A组,删除A
新D(0x514),加入D队。
E (0x810),责成E (0x810)-D (0x514) Head=D
CAS(Head,Head,newHead)传递E (0x810)-D (0x514) head=C .因为我们用CAS来比较地址,用New和delete来管理内存,所以内存地址删除后可能会被操作系统回收重新分配,这样就先获得了Head地址。在CAS中,因为新的头是回收后重新分配的同一个地址,所以比较通过,但指向该地址的内容是。
有两种方法可以解决ABA问题。一种是使用环形缓冲区,实际上是一个预先分配内存的数组。头尾指针被替换为下标来移动,以避免内存的重复分配。另一种是使用双重铸造。
双CASDouble CAS的思想是增加地址的引用计数,使用双倍大小的头指针,将计数器附加到原始指针,并且每当计数器出队时递增1。即使这样出现ABA问题,CAS也不会通过,因为计数器不匹配。
//线程A线程B队列
A(0x114)- B(0x514) Head=(B,0)
head=(B,0),newhead=(C,1)
b队,删除b。
A组,删除A
新D(0x514),加入D队。
E (0x810),加入队伍E (0x810)-D (0x514) Head=(D,2)
CAS(Head,Head,newhead)失败双CAS无锁队列PS:以下实现基于64位环境。如果需要在32位环境下使用,请更换为64位版本功能。
//必须16字节对齐,否则_InterlockedCompareExchange128报错。
struct alignas(16) AtomicWord2
{
AtomicWord lo,嗨;
};
静态内联bool AtomicCompareExchangeStrongExplicit(可变AtomicWord2* p,AtomicWord2* oldval,AtomicWord2 newval)
{
return _ interlockedcompareexchange 128((volatile LONGLONG *)p,(LONGLONG)newval.hi,(LONGLONG)newval.lo,(LONGLONG*)oldval)!=0;
}
静态内联atomic word 2 atomic exchange explicit(可变AtomicWord2* p,AtomicWord2 newval)
{
AtomicWord2 oldval
old val . lo=0;
old val . hi=new val . hi-1;
//没有128位交换函数,只能用CAS封装。
而(!AtomicCompareExchangeStrongExplicit(p,oldval,new val));
返回oldval
}修改的队列操作
类原子队列
{
volatile AtomicWord _ tail
volatile AtomicWord2 _ head
.
}
AtomicQueue() {
atomic node * dummy=new atomic node();
atomic word 2 w;
w . lo=(atomic word)dummy;
w . hi=0;
dummy-_ next=0;
_ head=w;
_ tail=(atomic word)dummy;
}
void排队(原子节点*节点){
AtomicNode * prev
node-_ next=0;
prev=(atomic node *)atomic exchange explicit(_ tail,(atomic word)node);
prev- _next=(AtomicWord)节点;
}
AtomicNode* Dequeue() {
AtomicNode* res,* next
void*数据;
AtomicWord2 head=_ head
AtomicWord2 newHead
做
{
RES=(atomic node *)head . lo;
next=(atomic node *)RES-_ next;
if (next==nullptr)
返回nullptr
数据=下一个数据;
new head . lo=(atomic word)next;
newhead . hi=head . hi 1;
} while(!AtomicCompareExchangeStrongExplicit(_ head,head,new head));
res- data=数据;
返回res
}你完了!
还是有一些问题。我们使用虚拟节点,可以省去一些边界条件的判断。但是,代价是每个出队节点不是加入队列的节点,而是数据指针安装在前一个出队节点中。所以不能太快回收节点使用的内存,否则会造成访问冲突。更好的方法是使用对象池来缓存节点。不够用的时候,申请新的节点。每次出队后,将旧节点放回池中供下次使用。
结论实现了无锁队列,这只是向多线程发展的第一步。学了之后,最大的感受就是计算机基础知识真的很重要,尤其是计算机架构和操作系统的知识。
整理一些最新的LinuxC/C服务器开发者/架构师的面试问题、学习资料、教学视频、学习路线图(资料包括C/C、Linux、golang技术、Nginx、ZeroMQ、MySQL、Redis、fastdfs、MongoDB、ZK、流媒体、CDN、P2P、K8S、Docker、TCP/IP、protocol、DPDK、ffmpeg等),免费分享。有需要的可以加自己的学习交流群96094558或者VX:lingsheng_1314领取!~
希望这篇文章能帮到你。如有不妥,也请在评论区指出并交流。
勿忘你的倡议心灵雅原创作品,博主,
郑重声明:本文由网友发布,不代表盛行IT的观点,版权归原作者所有,仅为传播更多信息之目的,如有侵权请联系,我们将第一时间修改或删除,多谢。