多线程面试题,多线程是什么意思

  多线程面试题,多线程是什么意思

  为什么在序言中需要一个无锁队列?众所周知,多核优化是游戏开发中的重要课题,将工作并行化到多线程是非常常见的场景,无论是工程实践还是算法研究。对于这种场景,我们通常使用线程池命令队列,其中的命令队列会使用互斥或无锁队列。而且由于命令队列的读写是一个较轻的操作,所以使用无锁队列的性能要高于锁操作。因此,无锁队列等无锁数据结构的实现可以视为多线程编程的基石。

  推荐视频:https://www.bilibili.com/video/BV1354y1b7nz/

  问题无锁队列的典型应用场景是同时存在单(多)线程写和单(多)线程读。那么,为什么原来的队列会有问题呢?我们来看一下原始队列的入队和出队伪代码(省略了边界判断)。

  结构节点{

  void *数据;

  节点*下一个;

  };

  void排队(节点*节点){

  m _ Tail-next=node;

  m_Tail=节点;

  }

  节点*出列(){

  Node * res=m _ Head

  m _ Head=m _ Head-next;

  返回res

  }入队和出队有两步。如果有两个以上相同的线程同时写入或读取,可能会发生在第一步之后,其他线程修改头或尾指针,导致完全不可预测的结果。比如下面这种情况,两个线程同时写,导致尾指针失去与队列的链接,添加的节点无法从头访问。

  //线程a和线程b的实际情况

  nodeC tail=nodeC

  head-next=nodeA nodeC-nodeA tail=nodeC

  tail-next=nodeB nodeC-nodeB tail=nodeC

  tail=nodeB nodeC-nodeB tail=nodeB

  Tail=nodeanotec-nodetail=nodea解决方法是在读写之前先加锁,保证只有一个线程在同时读写,或者利用CPU提供的原子操作一次性完成头或尾指针的读写,实现无锁同步。

  原子的操作在质子和中子发现之前,人们认为原子是世界上最基本的粒子,原子一词有“不可分”的含义。顾名思义,原子操作是指不可分的操作。当CPU的一个线程执行原子操作时,它不会被其他线程中断或抢占。

  典型的原子操作和示意代码如下:

  加载/存储:读取和保存。测试和设置:对于bool变量,如果为真,则返回真;如果是false,它会将变量设置为true并返回false。bool TestAndSet(bool * flag){

  bool res=* flag

  * flag=true

  返回res

  }Clear:将bool变量设置为false。Exchange:将指定位置的值设置为传入值,并返回其旧值。模板类型名T

  T Exchange(T* addr,const T newVal){

  T oldVal=* addr

  * addr=newVal

  返回oldVal

  }比较和交换(CAS):将指定位置的值与预期值进行比较,如果相等,则分配一个新值,如果不相等,则将预期值设置为自身。返回设置是否成功。

  模板类型名T

  bool CompareAndSwap(T* addr,T expected,const T desired){

  if(*addr==expected){

  *addr=所需;

  返回true

  }

  应为=* addr

  返回false

  }请注意,有两种类型的ca:弱ca和强ca。弱版本在某些情况下即使满足条件也会返回false,所以一般只用于循环判断,强版本用于其他情况。取和加减乘除数列:用传入的参数对指定位置的值进行加减乘除运算,返回旧值。这里用最简单的I操作来比较和解释为什么需要原子操作。C中的一条I语句将生成三条汇编指令。

  int I=0;

  我;

  Mov eax,dword ptr [i] //将I载入eax寄存器

  加eax,1//eax中的值加1。

  MOV ORD PTR [I],eax//将eax中的值赋给I的地址,如果两个线程同时对一个变量i=0进行I运算,最后的结果很可能是1而不是2,因为当多个线程并行时,I的值会被加载到不同的寄存器中,然后寄存器中的值会被分别相加和取出,导致后向线程覆盖了前导线程的结果。这种现象被称为竞争条件。如果我们使用windows提供的来自increasing function _ interlocked increment的atom,生成的汇编代码如下

  int I=0;

  _ interlocked increment((volatile long *)

  Mov eax,1 //eax负载1

  lockdddword ptr [I],eax//xadd的作用是交换两个操作数的值,并将加法结果保存到前者。

  //对I所在的地址数据完成自增操作。一条指令完成I的操作,lock指令也会锁定该操作的内存地址,从而避免可能出现的竞争情况。(同时也可以看出,原子操作是在CPU中实现的,或者是锁定的,只是硬件层面的锁定,成本更少。)

  用CAS实现无锁队列。现在让我们开始实现无锁队列。首先定义数据结构。

  #杂注一次

  #包含windows.h

  #包含windef.h

  #包含intrin.h

  #包含emmintrin.h

  使用atomic word=int ptr _ t;

  结构原子代码

  {

  volatile AtomicWord _ next

  void*数据;

  };

  类原子队列

  {

  volatile AtomicWord _ tail

  volatile AtomicWord _ head

  公共:

  atomic queue();

  ~ AtomcQueue();

  void入队(atomic node * node);

  atomic node * Dequeue();

  }使用intptr_t保存我们的指针变量,每个节点使用指针保存数据,队列包含一个头指针和一个尾指针。会用到两种原子操作(PS:本文基于64位环境,指针长度为64位。如果需要在32位环境下使用,请更换为32位版本功能):

  静态内联atomic word atomic exchange explicit(可变AtomicWord* p,AtomicWord val)

  {

  return(atomic word)_ interlockedexchange 64((volatile LONGLONG *)p,(LONGLONG)val);

  }

  静态内联bool AtomicCompareExchangeStrongExplicit(可变AtomicWord* p,AtomicWord* oldval,AtomicWord newval)

  {

  return _ interlockedcompareexchange 64((volatile long long *)p,(LONGLONG)newval,(LONGLONG)*oldval)!=0;

  }我们用一个虚拟节点,可以省去很多边界判断。

  AtomicQueue() {

  atomic node * dummy=new atomic node();

  dummy-_ next=0;

  _ tail=(atomic word)dummy;

  _ head=(atomic word)dummy;

  }

  ~AtomicQueue() {

  atomic node * dummy=(atomic node *)_ head;

  删除哑元;

  }使用Exchange进行入队,使用CAS spin进行出队。

  void排队(原子节点*节点){

  AtomicNode * prev

  node-_ next=0;

  prev=(atomic node *)atomic exchange explicit(_ tail,(atomic word)node);

  prev- _next=(AtomicWord)节点;

  }

  AtomicNode* Dequeue() {

  AtomicNode* res,* next

  void*数据;

  AtomicWord head=_ head

  AtomicWord newHead

  做

  {

  //离队时,最后剩下的节点就是最后一个加入队伍的节点。

  RES=(atomic node *)head;

  next=(atomic node *)RES-_ next;

  if (next==nullptr)

  返回nullptr

  数据=下一个数据;

  newHead=(atomic word)next;

  //比较_head指针是不是我们之前获取的。如果成功,设置newHead,如果失败,旋转它。

  } while(!AtomicCompareExchangeStrongExplicit(_ head,head,new head));

  res- data=数据;

  返回res

  }原则上并不难理解。入队是将新节点和原来的尾节点交换,出队是用CAS判断我们缓存的头节点和队列的头节点是否相同(如果不同,说明已经被其他线程修改了)。

  然而,我们的头和尾指针有volatile关键字。这是什么意思?

  易变和常量优化C编译器会为我们做很多优化,但是有时候这些优化会造成意想不到的结果。我们离队的时候用了循环判断。

  做{

  res=head

  newHead=RES-next;

  }

  而(!CAS(_head,head,new head));也就是说,我们一直在判断_head是否等于head,head本来就等于_head。编译器不知道另一个线程可能正在修改_head的值,所以可能优化了_head和head的比较,只从内存中读取一次_head的值并存储在寄存器中,然后一直使用寄存器中的数据,使得我们的spin wait无效。这就是不断优化。

  优化的原因是寄存器的读写速度远高于内存,编译器会减少从内存中读取数据的次数。volatile关键字告诉编译器不要不断地优化这个变量,而是每次都在内存中读取它。

  ABA问题虽然上面我们已经实现了可用的无锁队列,但是还有一个潜在的问题没有解决,那就是ABA问题。ABA问题是什么?我们用一个例子来说明一下。

  //线程A线程B队列

  A(0x114)- B(0x514) Head=B

  head=B,newhead=C

  b队,删除b。

  A组,删除A

  新D(0x514),加入D队。

  E (0x810),责成E (0x810)-D (0x514) Head=D

  CAS(Head,Head,newHead)传递E (0x810)-D (0x514) head=C .因为我们用CAS来比较地址,用New和delete来管理内存,所以内存地址删除后可能会被操作系统回收重新分配,这样就先获得了Head地址。在CAS中,因为新的头是回收后重新分配的同一个地址,所以比较通过,但指向该地址的内容是。

  有两种方法可以解决ABA问题。一种是使用环形缓冲区,实际上是一个预先分配内存的数组。头尾指针被替换为下标来移动,以避免内存的重复分配。另一种是使用双重铸造。

  双CASDouble CAS的思想是增加地址的引用计数,使用双倍大小的头指针,将计数器附加到原始指针,并且每当计数器出队时递增1。即使这样出现ABA问题,CAS也不会通过,因为计数器不匹配。

  //线程A线程B队列

  A(0x114)- B(0x514) Head=(B,0)

  head=(B,0),newhead=(C,1)

  b队,删除b。

  A组,删除A

  新D(0x514),加入D队。

  E (0x810),加入队伍E (0x810)-D (0x514) Head=(D,2)

  CAS(Head,Head,newhead)失败双CAS无锁队列PS:以下实现基于64位环境。如果需要在32位环境下使用,请更换为64位版本功能。

  //必须16字节对齐,否则_InterlockedCompareExchange128报错。

  struct alignas(16) AtomicWord2

  {

  AtomicWord lo,嗨;

  };

  静态内联bool AtomicCompareExchangeStrongExplicit(可变AtomicWord2* p,AtomicWord2* oldval,AtomicWord2 newval)

  {

  return _ interlockedcompareexchange 128((volatile LONGLONG *)p,(LONGLONG)newval.hi,(LONGLONG)newval.lo,(LONGLONG*)oldval)!=0;

  }

  静态内联atomic word 2 atomic exchange explicit(可变AtomicWord2* p,AtomicWord2 newval)

  {

  AtomicWord2 oldval

  old val . lo=0;

  old val . hi=new val . hi-1;

  //没有128位交换函数,只能用CAS封装。

  而(!AtomicCompareExchangeStrongExplicit(p,oldval,new val));

  返回oldval

  }修改的队列操作

  类原子队列

  {

  volatile AtomicWord _ tail

  volatile AtomicWord2 _ head

  .

  }

  AtomicQueue() {

  atomic node * dummy=new atomic node();

  atomic word 2 w;

  w . lo=(atomic word)dummy;

  w . hi=0;

  dummy-_ next=0;

  _ head=w;

  _ tail=(atomic word)dummy;

  }

  void排队(原子节点*节点){

  AtomicNode * prev

  node-_ next=0;

  prev=(atomic node *)atomic exchange explicit(_ tail,(atomic word)node);

  prev- _next=(AtomicWord)节点;

  }

  AtomicNode* Dequeue() {

  AtomicNode* res,* next

  void*数据;

  AtomicWord2 head=_ head

  AtomicWord2 newHead

  做

  {

  RES=(atomic node *)head . lo;

  next=(atomic node *)RES-_ next;

  if (next==nullptr)

  返回nullptr

  数据=下一个数据;

  new head . lo=(atomic word)next;

  newhead . hi=head . hi 1;

  } while(!AtomicCompareExchangeStrongExplicit(_ head,head,new head));

  res- data=数据;

  返回res

  }你完了!

  还是有一些问题。我们使用虚拟节点,可以省去一些边界条件的判断。但是,代价是每个出队节点不是加入队列的节点,而是数据指针安装在前一个出队节点中。所以不能太快回收节点使用的内存,否则会造成访问冲突。更好的方法是使用对象池来缓存节点。不够用的时候,申请新的节点。每次出队后,将旧节点放回池中供下次使用。

  结论实现了无锁队列,这只是向多线程发展的第一步。学了之后,最大的感受就是计算机基础知识真的很重要,尤其是计算机架构和操作系统的知识。

  整理一些最新的LinuxC/C服务器开发者/架构师的面试问题、学习资料、教学视频、学习路线图(资料包括C/C、Linux、golang技术、Nginx、ZeroMQ、MySQL、Redis、fastdfs、MongoDB、ZK、流媒体、CDN、P2P、K8S、Docker、TCP/IP、protocol、DPDK、ffmpeg等),免费分享。有需要的可以加自己的学习交流群96094558或者VX:lingsheng_1314领取!~

  希望这篇文章能帮到你。如有不妥,也请在评论区指出并交流。

  勿忘你的倡议心灵雅原创作品,博主,

郑重声明:本文由网友发布,不代表盛行IT的观点,版权归原作者所有,仅为传播更多信息之目的,如有侵权请联系,我们将第一时间修改或删除,多谢。

相关文章阅读

  • pthread线程包常用函数,pthread多线程编程
  • c 多线程编程,c多线程编程实例
  • c 多线程编程,c多线程编程实例,VC多线程编程详解
  • cas 并发,java多线程cas
  • cas 并发,java多线程cas,java并发编程之cas详解
  • android中实现多线程操作的几种方式是什么,android 的多线程 实现方法
  • android中实现多线程操作的几种方式是什么,android 的多线程 实现方法,Android中实现多线程操作的几种方式
  • ,,详解Java多线程tryLock()方法使用
  • python多线程详解,python中的线程和进程
  • python多线程详解,python中的线程和进程,Python线程详解
  • ,,python多线程高级锁condition简单用法示例
  • ,,.NET Windows 多线程thread编程
  • python多线程是不是真正的并发,python的多进程是并发还是并行
  • python 多线程并行处理,python多线程运行
  • python 多线程编程,python多线程有几种实现方法,都是什么
  • 留言与评论(共有 条评论)
       
    验证码: