C++协程,c语言实现协程
协同学的概念很早就提出来了,但直到最近几年才在一些语言(如Lua)中广泛使用。
子程序或函数是所有语言中的层次调用。比如A调用B,B在执行过程中调用C。c执行完返回,B执行完返回,最后A执行完。
所以子程序调用是通过堆栈实现的,一个线程只是执行一个子程序。
子例程调用总是一个入口一个返回,调用顺序很明确。协程的调用不同于子程序。
协同进程看起来像一个子程序,但是在执行的过程中,可以在子程序内部中断,然后由其他子程序代替执行,然后在适当的时候返回。
注意,在一个子程序中中断执行其他子程序不是函数调用,有点类似CPU中断。例如,子程序A和B:
假设是由协进程执行的,在执行A的过程中,可以随时中断执行B,B也可能在执行的过程中中断,然后执行A,结果可能是:
但是,A中没有对B的调用,所以对协程的调用比对函数的调用更难理解一点。
看起来A和B的执行有点像多线程,但是协同线程的特点是由一个线程执行。与多线程相比,协同线程的优势在哪里?
最大的好处是协调流程的执行效率极高。因为子程序切换不是线程切换,而是由程序本身控制,所以没有线程切换的开销。与多线程相比,线程越多,协程的性能优势就越明显。
第二个好处是不需要多线程锁机制,因为只有一个线程,同时写变量不会有冲突。协调过程中只需要判断控制共享资源时的状态,所以执行效率远高于多线程。
因为协程是一个线程执行的,怎么才能利用多核CPU呢?最简单的方法就是多进程协程,既充分利用了多核,又充分发挥了协程的高效率,可以获得极高的性能。
对Python协程的支持仍然非常有限。生成器中使用的yield可以在一定程度上实现协同程序。虽然支持不全,但已经可以发挥相当大的威力了。
看看这个例子:
传统的生产者-消费者模型是一个线程写消息,另一个线程取消息,队列和等待由锁机制控制,但一不小心就可能发生死锁。
如果切换到协同学,生产者产生消息后,会直接通过yield跳转到消费者开始执行。消费者完成执行后,会切换回生产者继续生产,效率极高:
执行结果:
请注意,消费者函数是一个生成器。将消费者转化为产品后:
首先调用c.next()启动生成器;
然后一旦有东西产生,通过c.send(n)切换到消费者执行;
消费者通过yield获取消息,进行处理,然后通过yield发回结果;
生产者得到消费者处理的结果,并继续产生下一条消息;
Produce决定不生产,通过c.close()关闭了消费者,整个过程结束。
整个进程由一个线程解锁并执行。生产者和消费者合作完成任务,所以称为“协同进程”,而不是线程的抢占式多任务。
首先,需要声明C/C协同学。我不打算在这里花时间介绍什么是协同学,以及它们与线程有何不同。如果你对此有任何疑问,你可以自己谷歌一下。与Python不同,C/C语言本身无法自然支持协程。现有的C协同库基于两种方案:用汇编代码控制协同上下文切换,用操作系统提供的API实现协同上下文切换。典型的例子有:
Libco,Boost.context:基于汇编代码的上下文切换
Phxrpc:基于ucontext/Boost.context的上下文切换
Libmill:基于setjump/longjump的协调切换
一般来说,基于程序集的上下文切换比系统调用切换更高效,这也是phxrpc在使用Boost.context时比ucontext性能更好的原因。后面有空再介绍phxrpc和libmill的详细实现。
libco协同过程的创建和切换
在介绍协程的创建之前,我们先熟悉一下libco中用来表示一个协程的数据结构,也就是co_routine_inner.h中定义的stCoRoutine_t:
暂时我们只需要知道代表协程的最简单的参数,比如协程运行环境,协程上下文环境,协程运行函数和运行时栈空间。以下stack_sp、save_size和save_buffer与libco共享堆栈模式有关。我们稍后将讨论共享堆栈。
协作流程创建和操作
因为多个协同程序在一个线程中运行,所以当线程中的第一个协同程序被创建时,需要初始化协同程序所在的环境stcoroutinenv _ t。线程使用这个环境来管理协程。通过这种环境,线程可以知道当前创建了多少个协同程序,哪些协同程序当前正在运行,以及如何调度协同程序:
上面的代码显示,libco允许在一个线程中最多创建128个协同程序,其中位于堆栈顶部的pCallStack[iCallStackSize-1]协同程序表示当前正在运行的协同程序。调用函数co_create时,首先检查当前线程中的协程env结构是否创建。这里libco并没有使用线程本地的方法(比如phxrpc采用的gcc内置__thread)来管理每个线程中的stcoroutinenv _ t,而是预先定义了一个大数组,通过对应的PID来获取其协同环境。
初始化stcouroutinenv _ t时主要完成以下步骤:
要为stcoroutinenv _ t申请空间并初始化它,请设置协调调度程序pEpoll。
创建一个空的协程,初始化它的上下文(关于coctx的细节将在后面描述),将其添加到线程的协程环境中进行管理,并将其设置为主协程。这个主协程用于运行这个线程的主逻辑。
当协同环境被初始化时,调用函数co_create_env来创建具体的协同。该函数初始化一个协同结构stCoRoutine_t,并在该结构中设置各种字段,如运行函数pfn、运行时的堆栈地址等。需要注意的是,如果使用非共享栈模式,那么这个进程需要单独申请栈空间,否则需要从共享栈申请空间。堆栈空间表示如下:
用co_create创建协程后,将调用co_resume来激活协程:
函数co_swap类似于Unix提供的函数swapcontext:当前正在运行的协程的上下文和状态保存在结构lpCurrRoutine中,将co设置为要运行的协程,这样就可以切换协程。Co_swap完成了三项具体任务:
通过char c记录当前协同curr的运行栈顶指针;Curr_stack_sp=c,下次切换回Curr时,可以从栈顶指针指向的位置继续,执行curr后可以平滑释放栈。
处理与共享栈相关的操作,调用函数coctx_swap完成上下文切换。注意,在执行完coctx_swap之后,执行进程会跳转到新的协程,也就是pending_co,后续代码直到下一次切换回curr才会执行。
下次切换回curr时,处理与共享栈相关的操作。
与co_resume函数相对应的是,当协同进程主动放弃执行权时,调用co_yield函数。Co_yield函数调用co_yield_env将当前协程与当前线程中记录的其他协程进行切换:
如前所述,pCallStack栈顶指向当前运行的corouth对应的结构,所以这个函数取出curr,将当前运行的corouth上下文保存到这个结构,切换到corouth last执行。接下来我们以32位系统为例,分析libco如何实现协同运行环境的切换。
协作上下文的创建和切换
Libco使用结构struct coctx_t来表示协调流程的上下文:
readwrite_coroutine函数将所有新创建的读写进程添加到外循环的队列g_readwrite中。此时,这些读写进程都没有具体对应一个套接字连接,因此队列g_readwrite可以被视为一个协同例程池。加入队列后,调用函数co_yield_ct放弃CPU,控制权回到主线程。
主线程中的函数co_eventloop监听网络事件,来自客户端的新连接由协调的accept_co处理,我们将在后面介绍co_eventloop如何唤醒accept _ co的细节。Accept_co调用accept_routine函数来接收新的连接。该功能的流程如下:
检查队列g_readwrite是否有空闲的读写协程,如果没有,调用函数poll将协程添加到Epoll管理的定时器队列中,也就是sleep(1000)的函数;
调用co_accept来接收新连接。如果连接失败,调用co_poll将服务器的listen_fd添加到Epoll中,以触发下一个连接事件。
一个成功的连接,从g_readwrite中拿出一个读写进程来处理读写;
再次回到函数readwrite_coroutine,这个函数会调用co_poll将新建立的连接的fd加入到Epoll监控中,并将控制流返回给主coroutine;当读或写事件发生时,Epoll将唤醒相应的协程,并继续执行读功能和写功能。
上面的过程大致解释了控制过程是如何在不同的协程中切换的。接下来介绍具体的实现细节,即如何通过Epoll管理协程,以及如何对系统功能进行改造以满足libco的调用。
通过Epoll管理和唤醒协议
Epoll监控FD
上一章介绍了Epoll可以使用co_poll函数管理fd,当Epoll的相应事件触发时,可以切换回来执行读或写操作,从而实现Epoll管理协进程的功能。co_poll函数的原型如下:
StCoEpoll_t是为libco定制的Epoll相关数据结构,fds是pollfd结构的文件句柄,nfds是fds数组的长度,最后一个参数表示定时器时间,即在毫秒级超时后触发这些文件句柄的处理。这里可以看到,co_poll可以同时向Epoll管理添加多个文件句柄。我们先来看看stcopepoll _ t结构:
以stTimeout_开头的数据结构与libco的定时器管理有关,我们将在后面介绍。Co_Epoll_res是Epoll事件数据结构的封装,即每次触发Epoll事件时返回的结果。在Unix和MaxOS下,libco会使用Kqueue而不是epoll,所以这里也保留了kevent数据结构。
Co_poll实际上是函数co_poll_inner的封装。我们把co_epoll_inner函数的结构分为上下两半。前半段,调用co_poll的协协议CC,将其需要监控的句柄数组FD全部添加到Epoll管理中,放弃CPU通过函数co _ yield _ env当CC对应的监控事件在主进程的事件循环co_eventloop中被触发时,CC的执行将被恢复。此时,CC将开始执行后半部分,即前半部分添加的句柄fds将从epoll中删除,剩余的数据结构将被清理。以下流程图简要说明了控制流的转移过程:
有了以上的基本概念,我们再来看看具体的实现细节。Co_poll首先在内部将传入的文件句柄数组fds转换成一个数据结构stPoll_t,主要是为了方便后续处理。该结构记录iEpollFd、ndfs、fds数组,以及由协程执行的函数和参数。有两点需要注意:
对于每个fd,申请一个STPollLitem _ T来管理对应的Epoll事件,记录回调参数。Libco在这里做了一个小小的优化。对于长度小于2的fds数组,直接在堆栈上定义对应的STPollLitem _ T数组,否则从堆中请求内存。这也是常见的优化。毕竟从堆中申请内存是需要时间的。
函数指针OnPollProcessEvent封装了协调过程的切换过程。当传入指定的stpolllitem _ t结构时,可以唤醒该结构对应的协程,将控制权交给它执行;
co_poll的第二步也是最关键的一步是将所有fd阵列添加到Epoll中进行监控。协作CC会将每个epoll_event的data.ptr字段设置为对应的stpolllitem _ t结构。当这样的事件被触发时,可以直接从相应的ptr中取出stpolllitem _ t结构,然后可以唤醒指定的协程。
如果为该操作提供了超时参数,co_poll还会将协调CC的该操作对应的stPoll_t添加到定时器队列中。这表明在超时的定时触发之后,协调CC的执行也将被唤醒。当整个前半部分结束后,co_poll立即调用co_yield_env放弃CPU,执行进程跳回主协进程。
从上面的流程图中也可以看出,当执行过程再次跳回时,表明已经触发了协同CC添加的读写等监控事件,即可以进行相应的读写操作。此时,CC首先从Epoll中删除会话前半部分添加的监听事件,清理剩余的数据结构,然后调用读写逻辑。
定时器实现
协同进程CC可以向Epoll添加一组FD,并为其设置超时。超时后,CC将再次被唤醒执行。Libco使用计时轮实现定时器。至于计时轮算法,我们可以参考一下。它的优点是O(1)插入和删除的复杂性,缺点是只有有限的长度,在某些情况下不能满足要求。
回头看看stcopepoll _ t结构,其中*pTimeout代表时间轮,它被函数AllocateTimeout初始化为一个固定大小(60 * 1000)的数组。根据Timing-Wheel的特性,libco只支持最长60s的计时事件。实际上,在添加计时器时,libco要求计时时间不能超过40s。成员pstTimeoutList记录co_eventloop中超时的事件,而pstActiveList记录当前活动的事件,包括超时事件。这两个结构都将在co_eventloop中处理。
下面简单分析一下添加定时器的实现:
定时器的超时检查在函数co_eventloop中执行。
EPOLL事件循环
主协程通过调用函数co_eventloop来监控Epoll事件,并在触发相应事件时切换到指定的协程执行。上一节的流程图已经明确了co_eventloop和应用协同流程之间的交互过程。这里我们主要介绍co_eventloop函数的实现:
如上所述,epoll_wait返回的所有事件都存储在stcopepoll _ t结构的co_epoll_res中。因此,co_eventloop首先为co_epoll_res申请空间,然后通过无限循环监听所有协程添加的所有事件:
对于每个触发的事件,co_eventloop首先通过指针字段data.ptr取出保存的stpolllitem _ t结构,并将其添加到pstActiveList列表中;之后,将所有已经超时的事件从计时器轮中取出,并全部添加到pstActiveList中,pstActiveList中的所有事件都被视为活动事件。
对于每一个活动事件,co_eventloop都会通过调用对应的pfnProcess,即上图中的OnPollProcessEvent函数,切换到事件对应的协程,将进程跳转到协程执行。
最后,co_eventloop还为调用者提供了一个额外的参数来传入一个函数指针pfn。该函数将在每个循环完成后执行;当这个函数返回-1时,整个事件循环将被终止。用户可以使用这个函数来控制主协程的终止或完成一些统计要求。
原创作品来自勿忘你的倡议心,的博主,
郑重声明:本文由网友发布,不代表盛行IT的观点,版权归原作者所有,仅为传播更多信息之目的,如有侵权请联系,我们将第一时间修改或删除,多谢。