c语言实现线程池,线程池实现多线程并发
高并发线程池设计并发的基本概念所谓并发编程是指在同一台计算机上‘同时’处理多个任务。是同一实体上的多个并发事件。在事件处理过程中有一个长时间的CPU密集型处理。读取文件,但是文件没有缓存,所以从硬盘读取很慢。必须等待才能获得某个资源:硬件驱动的互斥等待同步调用的数据库响应网络上的请求,响应多线程的缺陷。单个进程或线程一次只能处理一个任务。如果有很多请求需要同时处理怎么办?解决方案——采用多进程或多线程技术解决。缺陷:花费在创建和销毁线程上的时间和消耗的系统资源甚至可能比花费在处理实际用户请求上的时间和资源多得多。活动线程需要消耗系统资源。如果启动的线程太多,系统会因为内存消耗过多或者“过度切换”而导致系统资源不足。当线程切换时,线程执行的相关信息会保存在相应的上下文中。线程越多,切换时间就越长。解决方案:3354使用线程池技术。线程池线程池由一个任务队列和一组处理任务队列的线程组成。一旦一个工作进程需要处理一个可能的“阻塞”操作,它将作为一个任务被放入线程池的队列中,然后由一个空闲线程取出并处理。注意:一个线程中的所有线程都要从任务队列中获取任务(只有一个线程可以获取相同的任务),这样会修改任务队列的链表。流程向其添加新任务也会修改任务队列的链表,不能同时修改。因此,任务队列是一个关键资源,所以这里应该实现同步和互斥。
任务——,线程的核心组件,是挂起的工作,通常由标识、上下文和处理函数组成。队列——按顺序存储要处理的任务序列,并等待线程中线程组的处理。线程池——由多个已启动的线程组成。条件变量3354一种同步机制,允许线程暂停,直到满足共享数据的某些条件。互斥锁3354保证在任何时候只有一个线程可以访问这个对象。Nginx线程池分析注:下面这段代码是我看的一个视频中的数据,是Nginx中从C中提取的。好像和最新的Nginx源代码不太一样,因为被删了,也不是最新版本,但是大致意思差不多,我觉得。)执行进程创建一个线程池并初始化它。打开空间进行初始化,做好相关的默认设置和属性。创建互斥体和条件变量。初始化任务队列。在线程池中创建线程。并开始线程。这涉及到互斥和条件变量,等待任务并取出。详情请见代码中的注释。这里是核心。分配任务存储器任务结构分配有其任务执行函数的参数存储器。指定任务的执行功能。将任务放入线程池。使用后销毁线程池。在任务队列中放几个自杀任务,等待线程来接,然后依次自杀。然后销毁互斥和条件变量。最后,解放自己。主数据结构任务结构线程任务结构线程任务结构线程任务结构
thread _ task _ t * next//下一个任务
uint _ t id//任务ID
void * ctx//上下文,任务要采用的参数
void(* handler)(void * data);//函数指针,要执行的具体任务。
};
//别名
typedef结构thread _ task _ s thread _ task _ t;分配任务内存thread _ task _ alloct thread _ task _ t *
线程任务分配
{
线程_任务_ t *任务;
//任务记忆功能参数记忆
task=calloc(1,sizeof(thread _ task _ t)size);
if (task==NULL) {
返回NULL
}
//task是thread_task_t指针。
//指针被添加到常数中
task- ctx=任务1;//task 1,指向任务函数的参数所在的内存。
返回任务;
}任务队列结构thread _ pool _ queue _ ttypedef struct {
thread _ task _ t * first//指向第一个元素
thread _ task _ t * *最后;//指向最后一个节点
}线程池队列t;//任务队列,单链表结构。补充:这个单链表和我们学习数据结构时的定义是不一样的。我觉得在这里使用二级指针还是挺有意思的。相关的插入和取出操作在下面的相关线程池代码中。来,我们提前拿出来看看。任务定义:如上图,这里我们重复一遍,这样更便于顺序阅读。typedef struct { thread _ task _ t * first;//指向第一个元素thread _ task _ t * * last//指向最后一个节点,通过last链接。}线程池队列t;//任务队列,单链表结构。
操作:thread _ task _ t * task//task是任务,thread_task_t类型,next先空。task-next=NULL;//*last其实是first,即first=task * TP-queue . last=task;//注意last=task- next,即当前任务存储的是first之后节点的地址(注意这里是二级指针,这里我指的是一级指针是节点,二级指针是节点的地址,也就是下一个指针的地址)。TP-queue . last=task-next;/*然后我们模拟第二次插入然后是前一个任务的下一个=这次要链接的任务然后得到这次要链接的任务的下一个节点的地址,下一个指针的地址。*/
操作:task=TP-queue . first;//取出第一个TP-queue . first=task-next;//如果(tp- queue.first==NULL) {//任务队列为空,则第一个节点的指针向后移动。回到初始状态,重新准备链接。TP-queue . last=TP-queue . first;}
线程结构线程池结构线程池结构
pthread _ mutex _ t mtx//互斥体
线程池队列t队列;//任务队列
int_t等待;//线程池中还有多少任务没有处理?
pthread _ cond _ t cond//线程条件变量
char * name//线程池的名称
uint_t线程;//线程池中的线程数
int _ t max _ queue//任务队列最多能容纳多少个任务?
};
//别名
typedef结构thread _ pool _ s thread _ pool _ t;初始化线程池初始化()线程池初始化()
{
int err
pthread _ t tid
uint _ t n;
pthread _ attr _ t attr//线程属性设置结构
thread _ pool _ t * tp=NULL
tp=calloc(1,sizeof(thread _ pool _ t));
if(tp==NULL){
fprintf(stderr, thread_pool_init: calloc失败!\ n’);
}
thread_pool_init_default(tp,NULL);//线程池属性的默认设置
线程池队列初始化(tp队列);//线程池任务队列初始化
if (thread_mutex_create( tp- mtx)!=OK) {//创建互斥体
免费(TP);
返回NULL
}
if (thread_cond_create( tp- cond)!=OK) {//创建条件变量
(void)thread _ mutex _ destroy(TP-MTX);
免费(TP);
返回NULL
}
//线程属性初始化
err=pthread _ attr _ init(attr);
如果(错误){
fprintf(stderr, pthread_attr_init()失败,原因:%s\n ,strerror(errno));
免费(TP);
返回NULL
}
//创建线程时,将其属性设置为分离状态。
//主线程不能使用pthread_join等待子线程。
//也就是说,无法再捕获该子线程的状态。
err=PTHREAD _ attr _ setdetachstate(attr,PTHREAD _ CREATE _ DETACHED);
如果(错误){
fprintf(stderr, pthread_attr_setdetachstate()失败,原因:%s\n ,strerror(errno));
免费(TP);
返回NULL
}
for(n=0;n个tp线程;n ) {
//线程的创建
err=pthread_create( tid,attr,thread_pool_cycle,TP);
如果(错误){
fprintf(stderr, pthread_create()失败,原因:%s\n ,strerror(errno));
免费(TP);
返回NULL
}
}
(void)pthread _ attr _ destroy(attr);
返回TP;
}线程池任务队列初始化thread _ pool _ queue _ init # define thread _ pool _ queue _ init(q)\
(q)-first=NULL;\
(q) q)- last=(q)-第一个线程池中的第一个线程thread_pool_cycle static void *
线程池周期(void *数据)
{
thread _ pool _ t * tp=data//它所在的线程池,在创建线程时传递
int err
线程_任务_ t *任务;
if(debug)fprintf(stderr,池 %s 中的线程已启动\n ,TP-name);
for(;) {
//锁定
if (thread_mutex_lock( tp- mtx)!=好){
返回NULL
}
tp-等待-;//等待线程-
While (tp- queue.first==NULL) {//无任务
//等待信号,先挂起,然后开锁。——等任务队列中有任务。
//被唤醒时,先上锁,然后正式被唤醒。
if (thread_cond_wait( tp- cond,tp- mtx
!=OK)
{
(void)thread _ mutex _ unlock(TP-MTX);//防御型编程,开锁。
返回空
}
}
//从任务队列中拿任务
任务=TP-队列。第一;
TP-队列。first=task-next;
//如果取出一个任务后,任务队列又空了,重新设置最后的指向。
if (tp- queue.first==NULL) {
TP-队列。最后=TP-队列。第一;
}
//开锁
if (thread_mutex_unlock( tp- mtx)!=好){
返回空
}
if(debug) fprintf(stderr,运行线程池" %s "中的任务# % Lu \ n ,
任务标识,任务名称);
任务处理器(任务——CTX);//当前执行任务函数,任务ctx为函数参数
if(debug) fprintf(stderr,完成线程池 %s\n 中的任务#%lu ,任务id,TP-name);
task-next=NULL;
免费(任务);
}
}往线程池中投递任务线程_任务_发布int_t
线程_任务_发布(线程_池_t *tp,线程_任务_ t *任务)
{
if (thread_mutex_lock( tp- mtx)!=好){//上锁
返回错误;
}
//任务队列尾邻接资源,进行互斥访问。
if (tp- waiting=tp- max_queue) {//线程池等待任务队列是否达到极限
(void)thread _ mutex _ unlock(TP-MTX);
fprintf(stderr,线程池“%s”队列溢出:%ld个任务正在等待\n ,
tp- name,TP-waiting);
返回错误;
}
//任务-事件。active=1;
任务标识=线程池任务标识;//任务编号
task-next=NULL;
//发送一个信号,唤醒一个线程,之后该线程就能从任务队列中获取任务,进行执行。
if (thread_cond_signal( tp- cond)!=好){
(void)thread _ mutex _ unlock(TP-MTX);
返回错误;
}
//一开始的时候最后,默认指向的值第一的地址。
//所以此时给*tp- queue.last赋值后,first=tast
//返回最后的为空,还是尾插法。
* TP-队列。last=任务;
TP-队列。last=task-next;
tp-等待;
(void)thread _ mutex _ unlock(TP-MTX);
if(debug)fprintf(stderr, task #%lu已添加到线程池%s\n ,
任务标识,任务名称);
退货OK;
}销毁线程池线程池销毁空的线程池销毁(线程池t *tp)
{
uint _ t n;
线程_任务_测试任务;
易失性uint_t锁;
memset( task, \0 ,sizeof(thread _ task _ t));
任务。handle=thread _ pool _ exit _ handle;//给一个自杀任务
任务。CTX=(void *)锁;//参数
for(n=0;n个射击练习(同Target Projectile)线程;n ) {
lock=1;
if (thread_task_post(tp,task)!=好){//投递任务
返回;
}
while (lock) {//自杀任务中,会将锁置为0,终止循环。
sched _ yield();//当前线程放弃中央处理器的优先权,让出中央处理器的执行权,让别的线程得到更多的执行机会。
}
}
(void)thread _ cond _ destroy(TP-cond);//清理条件变量
(void)thread _ mutex _ destroy(TP-MTX);//清理互斥锁
免费(TP);//释放线程池
}线程自杀任务线程池退出句柄静态无效
线程池退出处理程序(void *数据)
{
uint _ t * lock=data
* lock=0;
pthread _ exit(0);
}示例#include thread_pool.h
结构测试{
int arg1
int arg2
};
void task_handler1(void* data){
静态int index=0;
printf(您好,这是第一次测试,index=%d\r\n ,index);
}
void task_handler2(void* data){
静态int index=0;
printf(您好,这是第2次测试,index=%d\r\n ,index);
}
void task_handler3(void* data){
静态int index=0;
结构测试*t=(结构测试*)数据;
printf(您好,这是第三次测试,index=%d\r\n ,index);
printf(arg1: %d,arg2: %d\n ,t- arg1,t-arg 2);
}
(同国际组织)国际组织
main(int argc,char **argv)
{
thread _ pool _ t * tp=NULL//定义一个线程池指针
int I=0;
tp=线程池初始化();//线程池初始化
//分配任务内存
thread _ task _ t * test1=thread _ task _ alloc(0);
thread _ task _ t * test2=thread _ task _ alloc(0);
thread _ task _ t * test3=thread _ task _ alloc(sizeof(struct test));
//指定任务
test1-handler=task _ handler 1;
测试2-handler=task _ handler 2;
test3-handler=task _ handler 3;
//通过结构体指定参数
((struct test *)test3-CTX)-arg 1=666;
((struct test *)test3-CTX)-arg 2=888;
//将任务放入线程池
thread_task_post(tp,test1);
thread_task_post(tp,test2);
thread_task_post(tp,test3);
睡眠(10);
线程池销毁(TP);
}
补充volatile关键字:有些变量是用volatile关键字声明的。当两个线程都需要使用一个变量,并且该变量的值会改变时,应该用volatile声明。该关键字的作用是防止优化编译器将变量从内存加载到CPU寄存器中。如果将变量加载到寄存器中,就有可能两个线程同时使用内存中的变量和寄存器中的变量,从而导致程序的错误执行。Volatile意味着每次编译器操作变量时,都必须从内存中取出变量,而不是使用已经在寄存器中的值。源码-菜鸟教程-多线程下C/C-volatile中volatile关键字的详细讲解。
转载请联系作者取得转载授权,否则将追究法律责任。
郑重声明:本文由网友发布,不代表盛行IT的观点,版权归原作者所有,仅为传播更多信息之目的,如有侵权请联系,我们将第一时间修改或删除,多谢。