本文主要介绍了对C语言编程中线程池使用的深入分析,包括线程池的封装和实现。有需要的朋友可以参考一下。
为什么需要线程池?
目前大部分的网络服务器,包括Web服务器、邮件服务器、数据库服务器,都有一个共同点,就是单位时间内要处理数量巨大的连接请求,但处理时间相对较短。
我们在系统多线程方案中采用的服务器模型是一旦接收到请求就创建一个新的线程,由线程来执行任务。任务完成后线程退出,这是“动态创建,动态销毁”的策略。虽然与创建过程相比,创建一个线程的时间已经大大缩短,但是如果提交给线程的任务执行时间很短,并且执行次数极其频繁,那么服务器就会处于不断创建和销毁线程的状态。
我们传统方案中的线程执行过程分为三个进程:T1、T2和T3。
T1:线程创建时间
T2:线程执行时间,包括线程同步时间。
T3:线程销毁时间
那么我们可以看到线程本身的开销占了(T1 T3)/(T1 T2 T3)。如果线程执行时间短,这个具体开销可能占20%-50%左右。如果任务执行时间频繁,这个成本就不可忽略。
此外,线程池可以减少创建的线程数量。通常,线程池中允许的并发线程数量有一个上限。如果并发线程的数量超过上限,一些线程将会等待。在传统的方案中,如果同时请求的数量是2000,那么在最坏的情况下,系统可能需要生成2000个线程。虽然这不是一个很大的数字,但有些机器可能达不到这个要求。
所以线程池的出现是为了降低线程池本身的成本。线程池采用预创建技术。应用程序启动后,将立即创建一定数量的线程(N1 ),并放入空闲队列。这些线程都处于挂起状态,不消耗CPU,占用内存空间更少。当任务到达时,缓冲池选择一个空闲线程,并将任务传递给这个线程运行。当N1线程都在处理任务时,缓冲池会自动创建一定数量的新线程来处理更多的任务。任务完成后,线程不会退出,而是留在池中等待下一个任务。当系统空闲时,大部分线程总是处于挂起状态,线程池会自动销毁部分线程,回收系统资源。
基于这种预创建技术,线程池将线程创建和销毁造成的开销分配给每个特定的任务。执行的次数越多,每个任务的开销越小。但是,我们可能还需要考虑线程间同步带来的开销。
构建线程池框架
一般线程池必须具有以下组件:
线程池管理器:用于创建和管理线程池。
工作线程:线程池中实际执行的线程。
任务接口:虽然线程池大多是用来支持网络服务器的,但是我们把线程执行的任务抽象出来形成一个任务接口,这样我们的线程池就和具体的任务无关了。
任务:线程池的概念是特定于其实现的,但可能是队列、链表之类的数据结构,执行线程存储在其中。
我们的通用线程池框架由五个重要部分组成:CThreadManage、CThreadPool、ctthread、CJob、CWorkerThread,此外,该框架还包括线程同步使用的CThreadMutex和CCondition类。
CJob是所有任务的基类,它提供了一个接口运行。所有任务类都必须从该类继承,并同时实现Run方法。具体的任务逻辑在这个方法中实现。
CThread是Linux中线程的包装器,封装了Linux线程最常用的属性和方法。它也是一个抽象类,是所有线程类的基类,并且有一个接口Run。
CWorkerThread是实际被调度和执行的线程类,其从CThread继承而来,实现了CThread中的奔跑方法。
CThreadPool是线程池类,其负责保存线程,释放线程以及调度线程。
CThreadManage是线程池与用户的直接接口,其屏蔽了内部的具体实现。
CThreadMutex用于线程之间的互斥。
条件则是条件变量的封装,用于线程之间的同步。
CThreadManage直接跟客户端打交道,其接受需要创建的线程初始个数,并接受客户端提交的任务。这儿的任务是具体的非抽象的任务CThreadManage的内部实际上调用的都是CThreadPool的相关操作CThreadPool .创建具体的线程,并把客户端提交的任务分发给CWorkerThread,CWorkerThread实际执行具体的任务。
理解系统组件
下面我们分开来了解系统中的各个组件。
CThreadManage
CThreadManage的功能非常简单,其提供最简单的方法,其类定义如下:
类CThreadManage
{
私人:
CThreadPool * m _ Pool
int m _ NumOfThread
受保护:
公共:
CThreadManage();
CThreadManage(int num);
virtual ~ CThreadManage();
void SetParallelNum(int num);
void Run(CJob* job,void * job data);
void terminate all(作废);
};
其中m _池指向实际的线程池;m_NumOfThread是初始创建时候允许创建的并发的线程个数。另外奔跑和终结所有方法也非常简单,只是简单的调用CThreadPool的一些相关方法而已。其具体的实现如下:
CThreadManage:CThreadManage()
{
m _ NumOfThread=10
m _ Pool=new CThreadPool(m _ NumOfThread);
}
CThreadManage:CThreadManage(int num)
{
m _ NumOfThread=num
m _ Pool=new CThreadPool(m _ NumOfThread);
}
CThreadManage:~CThreadManage()
{
if(NULL!=m_Pool)
删除m _池
}
void CThreadManage:SetParallelNum(int num)
{
m _ NumOfThread=num
}
void CThreadManage:Run(CJob * job,void* jobdata)
{
m_Pool-Run(作业,作业数据);
}
void CThreadManage:terminate all(void)
{
m _ Pool-termin ate all();
}
CThread
CThread类实现了对Linux操作系统操作系统中线程操作的封装,它是所有线程的基类,也是一个抽象类,提供了一个抽象接口快跑,所有的CThread都必须实现该奔跑方法CThread .的定义如下所示:
类CThread
{
私人:
int m _ ErrCode
信号量m _ ThreadSemaphore//内部信号量,用于实现
无符号长m _ ThreadID
bool m _ Detach//线程被分离
bool m _ CreateSuspended//如果创建后挂起
char * m _ ThreadName
ThreadState m _ ThreadState//线程的状态
受保护:
void set ErrCode(int ErrCode){ m _ ErrCode=ErrCode;}
静态void *线程函数(void *);
公共:
CThread();
CThread(bool createsuspended,bool detach);
virtual ~ CThread();
虚拟空运行(void)=0;
void SetThreadState(ThreadState state){ m _ ThreadState=state;}
弯曲件终止(作废);//终止威胁
bool Start(void);//开始执行线程
空的出口(作废);
bool唤醒(void);
ThreadState GetThreadState(void){ return m _ ThreadState;}
int GetLastError(void){ return m _ ErrCode;}
void SetThreadName(char * thrname){ strcpy(m _ ThreadName,thrname);}
char * GetThreadName(void){ return m _ ThreadName;}
int GetThreadID(void){ return m _ ThreadID;}
bool设置优先级(int priority);
int get priority(void);
int get并发(void);
void set并发(int num);
弯曲件分离(作废);
bool Join(void);
布尔收益率(作废);
int Self(void);
};
线程的状态可以分为四种,空闲、忙碌、挂起、终止(包括正常退出和非正常退出)。由于目前Linux操作系统操作系统线程库不支持挂起操作,因此,我们的此处的挂起操作类似于暂停。如果线程创建后不想立即执行任务,那么我们可以将其"暂停",如果需要运行,则唤醒。有一点必须注意的是,一旦线程开始执行任务,将不能被挂起,其将一直执行任务至完毕。
线程类的相关操作均十分简单。线程的执行入口是从开始()函数开始,其将调用函数线程函数,线程函数再调用实际的奔跑函数,执行实际的任务。
CThreadPool
CThreadPool是线程的承载容器,一般可以将其实现为堆栈、单向队列或者双向队列。在我们的系统中我们使用标准模板库矢量对线程进行保存CThreadPool .的实现代码如下:
类CThreadPool
{
友元类CWorkerThread
私人:
无符号整数m _ MaxNum//可以同时创建的最大线程数
无符号整数m _ AvailLow//应该保留的空闲线程的最小数量
无符号整数m _ AvailHigh//同时保持的空闲线程的最大数量
无符号整数m _ AvailNum//空闲数字的正常线程数;
无符号整数m _ InitNum//正常线程数;
受保护:
CWorkerThread * GetIdleThread(void);
void AppendToIdleList(CWorkerThread *作业线程);
void MoveToBusyList(CWorkerThread * idle thread);
void MoveToIdleList(CWorkerThread * busy thread);
void DeleteIdleThread(int num);
void CreateIdleThread(int num);
公共:
CThreadMutex m _ BusyMutex//访问忙列表时,使用忙碌互斥锁定和解锁
CThreadMutex m _ IdleMutex//访问空闲列表时,使用m _ idle互斥锁定和解锁
CThreadMutex m _ JobMutex//访问作业列表时,使用m _作业互斥锁定和解锁
CThreadMutex m _ VarMutex
条件m _ BusyCond//m_BusyCond用于同步忙线程列表
条件m _ IdleCond//m_IdleCond用于同步空闲线程列表
条件m _ IdleJobCond//m_JobCond用于同步作业列表
条件m _ MaxNumCond
vectorworkerthread * m _ ThreadList;
vectorworkerthread * m _ busy list//线程列表
vectorworkerthread * m _ idle list//空闲列表
CThreadPool();
CThreadPool(int initnum);
virtual ~ CThreadPool();
void set MaxNum(int MaxNum){ m _ MaxNum=MaxNum;}
int get MaxNum(void){ return m _ MaxNum;}
void set avail low num(int minnum){ m _ avail low=minnum;}
int GetAvailLowNum(void){ return m _ avail low;}
void set avail high num(int high num){ m _ avail high=high num;}
int GetAvailHighNum(void){ return m _ avail high;}
int GetActualAvailNum(void){ return m _ avail num;}
int GetAllNum(void){ return m _ thread list。size();}
int GetBusyNum(void){ return m _ busylist。size();}
void SetInitNum(int InitNum){ m _ InitNum=InitNum;}
int GetInitNum(void){ return m _ InitNum;}
void terminate all(作废);
void Run(CJob* job,void * job data);
};
CWorkerThread * CThreadPool:GetIdleThread(void)
{
while(m_IdleList.size()==0)
m_IdleCond .wait();
m_IdleMutex .lock();
if(m_IdleList.size() 0)
{
CWorkerThread * thr=(CWorkerThread *)m _ idle列表。front();
printf('Get Idle thread %d\n ',thr-GetThreadID());
m_IdleMutex .unlock();
返回thr
}
m_IdleMutex .unlock();
返回空
}
//创建数字个空闲线程并将它们放入idlelist
void CThreadPool:CreateIdleThread(int num)
{
for(int I=0;inumi ){
CWorkerThread * thr=new CWorkerThread();
thr-SetThreadPool(this);
AppendToIdleList(thr);
m_VarMutex .lock();
m _有效数量
m_VarMutex .unlock();
thr-Start();//开始线程,线程等待作业
}
}
void CThreadPool:Run(CJob* job,void* jobdata)
{
断言(作业!=NULL);
//如果忙线程数加到m_MaxNum,那么我们应该等待
if(GetBusyNum()==m_MaxNum)
m_MaxNumCond .wait();
if(m_IdleList.size()m_AvailLow)
{
if(GetAllNum()m _ InitNum-m _ idle列表。size()m _ MaxNum)
CreateIdleThread(m _ init num-m _ idle list。size());
其他
CreateIdleThread(m _ MaxNum-GetAllNum());
}
CWorkerThread * idle thr=getidle thread();
if(idlethr!=空)
{
idlethr-m_WorkMutex .lock();
MoveToBusyList(空闲thr);
idle thr-SetThreadPool(this);
job-SetWorkThread(空闲线程);
printf('作业设置为线程%d \n ',idle thr-GetThreadID());
idlethr-SetJob(作业,作业数据);
}
}
CThreadPool中有两个链表,一个是空闲列表,一个是忙碌列表。所有空闲进程都存储在空闲链表中。当一个线程执行一个任务时,它的状态变成忙碌,它从空闲链表中被删除并被移到忙碌链表中。在CThreadPool的构造函数中,我们将执行以下代码:
for(int I=0;im _ InitNum我)
{
CWorkerThread * thr=new CWorkerThread();
AppendToIdleList(thr);
thr-SetThreadPool(this);
thr-Start();//开始线程,线程等待作业
}
在这段代码中,我们将创建m_InitNum个线程,然后调用AppendToIdleList将它们放入空闲链表中。由于目前没有任务分配给这些线程,线程将在启动后自行挂起。
实际上,线程池中的线程数量并不是一成不变的,它会根据执行负载自动伸缩。为此,在CThreadPool中设置四个变量:
M_InitNum:创建生命时线程池中的线程数。
M_MaxNum:当前线程池中允许的最大并发线程数。
M_AvailLow:当前线程池中允许的最小空闲线程数。如果空闲数低于该值,则表明负载可能过重。这时候就需要增加空闲线程池的数量。在实现中,我们总是将线程数量调整为m_InitNum。
M_AvailHigh:当前线程池中允许的最大空闲线程数。如果空闲数高于该值,则表明当前负载可能较轻。此时冗余的空闲线程会被删除,删除后调整后的数也是m_InitNum。
M_AvailNum:当前线程池中的实际线程数,其值介于m_AvailHigh和m_AvailLow之间。如果线程数始终保持在m_AvailLow和m_AvailHigh之间,则不需要创建或删除线程,以保持平衡状态。因此,如何设置m_AvailLow和m_AvailHigh的值,使线程池尽可能保持平衡状态,是线程池设计中必须考虑的问题。
在线程池接收到一个新任务后,线程池首先检查是否有足够的空闲池可用。检查分为三个步骤:
(1)检查当前忙碌的线程是否已经达到设定的最大值m_MaxNum。如果有,说明当前没有空闲线程可用,不可能创建新的线程,所以必须等到一个线程执行完毕,返回空闲队列。
(2)如果当前的空闲线程数小于我们设置的最小空闲线程数m_AvailLow,我们必须创建新的线程。默认情况下,创建的线程数应该是m_InitNum,所以创建的线程数应该是(当前空闲线程数和m _ InitNum);但是有一种特殊情况必须考虑,那就是现有线程总数加上创建的线程数可能会超过m_MaxNum,所以我们必须区别对待线程的创建。
if(GetAllNum()m _ InitNum-m _ idle list . size()m _ MaxNum)
CreateIdleThread(m _ init num-m _ idle list . size());
其他
CreateIdleThread(m _ MaxNum-GetAllNum());
如果创建后的总数不超过m_MaxNum,则创建的线程为m _ initnum如果是这样,将只创建(m_MaxNum-当前线程的总数)。
(3)调用GetIdleThread方法找到空闲线程。如果当前没有空闲线程,则挂起;否则,将任务分配给线程,并将其移动到忙队列。
当线程完成执行时,它将调用MoveToIdleList方法移入空闲列表,并调用m_IdleCond。Signal()方法来唤醒GetIdleThread()中可能被阻塞的线程。
CJob
CJob类比较简单,封装了任务的基本属性和方法,其中最重要的是Run方法,代码如下:
CJob类
{
私人:
int m _ JobNonum被分配给作业
char * m _ JobName//作业名称
CThread * m _ pWorkThread//与作业相关联的线程
公共:
CJob(void);
virtual ~ CJob();
int GetJobNo(void)const { return m _ JobNo;}
void SetJobNo(int JobNo){ m _ JobNo=JobNo;}
char * GetJobName(void)const { return m _ job name;}
void SetJobName(char * job name);
CThread * getwork thread(void){ return m _ pWorkThread;}
void setwork thread(CThread * pWorkThread){
m _ pWorkThread=pWorkThread
}
虚拟空运行(void * ptr)=0;
};
线程池使用示例
到目前为止,我们给出了一个简单的线程池框架,与具体任务无关。使用这个框架非常简单。我们需要做的就是派生CJob类,实现Run方法中要完成的任务。那么该作业将由CThreadManage执行。让我们举一个简单的例子程序。
CXJob类:公共CJob
{
公共:
CX job(){ I=0;}
~CXJob(){}
void Run(void* jobdata) {
printf('作业来自CX Job \ n ');
睡眠(2);
}
};
类CYJob:公共CJob
{
公共:
cy job(){ I=0;}
~CYJob(){}
void Run(void* jobdata) {
printf('作业来自cy Job \ n ');
}
};
主()
{
CThreadManage * manage=new CThreadManage(10);
for(int I=0;i40我)
{
CXJob * job=new CXJob();
管理-运行(作业,空);
}
睡眠(2);
cy job * job=new cy job();
管理-运行(作业,空);
manage-terminate all();
}
CXJob和CYJob是从Job类继承而来的,两者都实现了Run接口。Cx只是简单的打印“作业来自CXJob”,CYJob只打印“作业来自CYJob”,然后两者睡眠2秒。在主程序中,我们最初创建了10个工作线程。然后分别执行CXJob 40次和CYJob一次。
C线程池的封装与实现
为了充分发挥多核的优势,我们使用多线程进行任务处理,但是线程也不能被滥用,这样会带来以下问题:
1)线程本身有开销,系统必须分配堆栈、TLS(线程本地存储)、寄存器等。到每个线程。
2)线程管理会给系统带来开销,上下文切换也会给系统带来成本。
3)线程本身就是一个可重用的资源,不需要每次都初始化。
所以在使用中,我们不需要线程和任务task一一对应,只需要提前初始化有限数量的线程来处理无限的task task,线程池就应运而生了,就是这个原理。
主要包含三个队列。
工作队列
工作线程队列
繁忙线程队列
工作队列是阻塞队列,task(模仿函数)任务不推入(notify阻塞获取的工作线程)。工作线程队列(保持不变)从这个队列中获取任务执行(等待获取,当任务队列为空时,阻塞通知)。如果任务被获取,线程将进入忙线程队列,执行任务的模仿功能,当工作完成后,再次移出工作线程队列。
定义特定于线程池的异常:
结构TC_ThreadPool_Exception:公共TC_Exception
{
TC_ThreadPool_Exception(常量字符串缓冲区):TC_Exception(缓冲区){ };
TC _ thread pool _ Exception(const string buffer,int err) : TC_Exception(buffer,err){}
~ TC _ thread pool _ Exception()throw(){ }
};
/**
* @brief使用线程池类与TC _ functor和TC _ functor包装器一起工作。
*
*使用说明:
* 1用tc_functorwrapper封装调用。
* 2用tc_threadpool执行调用
*具体示例代码见:test/test_tc_thread_pool.cpp。
*/
/* *线程池本身继承了自锁,可以帮助锁定* */
类TC_ThreadPool:公共TC_ThreadLock
{
公共:
/**
* @简短构造函数
*
*/
TC _ thread pool();
/**
* @短暂毁灭,将停止所有线程。
*/
~ TC _ ThreadPool();
/**
* @简短初始化。
*
* @param num工作线程数
*/
void init(size _ t num);
/**
* @brief获取线程数。
*
* @return size_t线程数
*/
size _ t getThreadNum(){ Lock sync(* this);return _jobthread。size();}
/**
* @brief获取线程池中的任务数(由exec添加)。
*
* @return size_t线程池中的任务数
*/
size _ t getJobNum(){ return _ job queue。size();}
/**
* @短暂停止所有线程
*/
void stop();
/**
* @短暂启动所有线程
*/
void start();
/**
* @brief启动所有线程并执行初始化对象。
*
* @param ParentFunctor
* @param tf
*/
templateclass ParentFunctor
void start(const TC _ functor wrapper parent functor TF)
{
for(size _ t I=0;我_作业线程.size();我)
{
_startqueue .push _ back(new TC _ FunctorWrapperParentFunctor(TF));
}
start();
}
/**
* @简短添加对象到线程池执行,该函数马上返回,
* 线程池的线程执行对象
*/
templateclass ParentFunctor
void exec(const TC _ functor wrapper parent functor TF)
{
_作业队列。push _ back(new TC _ FunctorWrapperParentFunctor(TF));
}
/**
* @简短等待所有工作全部结束(队列无任务,无空闲线程).
*
* @param millsecond等待的时间(毫秒),-1:永远等待
* @返回没错,所有工作都处理完毕
*错误,超时退出
*/
bool waitForAllDone(int millsecond=-1);
公共:
/**
* @简短线程数据基类,所有线程的私有数据继承于该类
*/
类线程数据
{
公共:
/**
* @简短构造
*/
ThreadData(){ };
/**
* @简短析够
*/
virtual ~ ThreadData(){ };
/**
* @简短生成数据。
*
* @ param T
* @返回ThreadData*
*/
模板类型名称T
静态T* makeThreadData()
{
返回新t;
}
};
/**
* @简短设置线程数据。
*
* @param p线程数据
*/
静态void setThreadData(ThreadData * p);
/**
* @简短获取线程数据。
*
* @返回ThreadData*线程数据
*/
静态ThreadData * getThreadData();
/**
* @简短设置线程数据,键需要自己维护。
*
* @param pkey线程私有数据键
* @param p线程指针
*/
静态void setThreadData(pthread _ key _ t pkey,ThreadData * p);
/**
* @简短获取线程数据,键需要自己维护。
*
* @param pkey线程私有数据键
* @返回指向线程的ThreadData*指针
*/
静态ThreadData * getThreadData(pthread _ key _ t pkey);
受保护:
/**
* @简短释放资源。
*
* @param p
*/
静态空的析构函数(void * p);
/**
* @简短初始化键
*/
类别密钥初始化
{
公共:
/**
* @简短初始化键
*/
KeyInitialize()
{
int ret=pthread _ key _ create(TC _ thread pool:g _ key,TC _ thread pool:destructor);
如果(ret!=0)
{
抛出TC _ thread pool _ Exception('[TC _ thread pool:key initialize]pthread _ key _ create error ',ret);
}
}
/**
* @简短释放键
*/
~KeyInitialize()
{
pthread _ key _ delete(TC _ thread pool:g _ key);
}
};
/**
* @简短初始化键的控制
*/
静态key初始化g _ key _ initialize
/**
* @简短数据键
*/
静态pthread _ key _ t g _ key
受保护:
/**
* @简短线程池中的工作线程
*/
类线程工作者:公共TC _线程
{
公共:
/**
* @简短工作线程构造函数。
*
* @ param tpool
*/
线程工作器(TC _ thread pool * t pool);
/**
* @简短通知工作线程结束
*/
void terminate();
受保护:
/**
* @简短运行
*/
虚拟空运行();
受保护:
/**
* 线程池指针
*/
TC _线程池* _ tpool
/**
* 是否结束线程
*/
bool _终止
};
受保护:
/**
* @简短清除
*/
void clear();
/**
* @简短获取任务,如果没有任务,则为空。
*
* @ return TC _ FunctorWrapperInterface *
*/
TC _ FunctorWrapperInterface * get(线程工作者* PTW);
/**
* @简短获取启动任务。
*
* @ return TC _ FunctorWrapperInterface *
*/
TC _ FunctorWrapperInterface * get();
/**
* @简短空闲了一个线程。
*
* @param ptw
*/
void idle(线程工作者* PTW);
/**
* @简短通知等待在任务队列上的工作线程醒来
*/
void notifyT();
/**
* @简短是否处理结束。
*
* @返回布尔值
*/
bool finish();
/**
* @简短线程退出时调用
*/
void exit();
友元类线程工人
受保护:
/**
* 任务队列
*/
TC _ thread queue TC _ FunctorWrapperInterface * _ job queue;
/**
* 启动任务
*/
TC _ thread queue TC _ FunctorWrapperInterface * _ start queue;
/**
* 工作线程
*/
STD:vectorThreadWorker * _作业线程;
/**
* 繁忙线程
*/
STD:setThreadWorker * _ busthread;
/**
* 任务队列的锁
*/
TC _ ThreadLock _ tmutex
/**
* 是否所有任务都执行完毕
*/
波尔_鲍尔东
};
工作线程设计如下:
TC _ thread pool:thread worker:thread worker(TC _ thread pool * tpool)
:_tpool (tpool)
,_bTerminate ( false)
{
}
void TC _ thread pool:thread worker:terminate()
{
_ bTerminate=true
_ tpool-notifyT();
}
void TC _ thread pool:thread worker:run()
{
//调用初始化部分
TC _ FunctorWrapperInterface * PST=_ t pool-get();
中频(太平洋标准时间)
{
尝试
{
(* PST)();
}
接住(.)
{
}
删除聚苯乙烯纤维
pst=空
}
//调用处理部分
而(!_ b终止)
{
TC _ functorwrapperiinterface * pfw=_ t pool-get(this);
如果(pfw!=空)
{
auto _ ptr TC _ FunctorWrapperInterface apfw(pfw);
尝试
{
(* pfw)();
}
接住(.)
{
}
_ tpool-idle(this);
}
}
//结束
_ tpool-exit();
}
每个工作线程在刚开始时都会执行一下初始化操作,并进入一个无限循环的部分//调用处理部分
而(!_ b终止)
{
TC _ functorwrapperiinterface * pfw=_ t pool-get(this);
如果(pfw!=空)
{
auto _ ptr TC _ FunctorWrapperInterface apfw(pfw);
尝试
{
(* pfw)();
}
接住(.)
{
}
_ tpool-idle(this);
}
}
该工作主要是无限的从线程池的工作队列中获取任务并执行,如果成功获取任务,则会将线程移进忙碌队列:
TC _ FunctorWrapperInterface * TC _ thread pool:get(线程工作器* PTW)
{
TC _ FunctorWrapperInterface * pFunctorWrapper=NULL;
如果(!_作业队列pop_front(pFunctorWrapper,1000))
{
返回空
}
{
锁同步(_ tmutex);
_busthread .插入(PTW);
}
返回pFunctorWrapper
}
执行完,移回工作线程队列:_ tpool-idle(this);
void TC _ thread pool:idle(线程工作者* PTW)
{
锁同步(_ tmutex);
_busthread .擦除(PTW);
//无繁忙线程,通知等待在线程池结束的线程醒过来
if( _busthread .empty())
{
_ bAllDone=true
_ tmutex。notify all();
}
}
此处作业线程队列初始化后不会改变(因为没有实现自增长功能),所以非线程安全的矢量队列即可,总线线程的忙碌线程队列会被移进移出,但是操作会自带锁定同步(_tmutex),该互斥量是线程池本身继承的,所以是共有的,也无需另外使用线程安全的TC _线程队列使用矢量即可。
TC _线程池:空闲中的
if( _busthread .empty())
{
_ bAllDone=true
_ tmutex。notify all();
}
主要用于当线程池工作起来后的waitForAllDone方法:
bool TC _ thread pool:waitForAllDone(int millsecond)
{
锁同步(_ tmutex);
开始1:
//任务队列和繁忙线程都是空的
if (finish())
{
返回真实的
}
//永远等待
如果(毫秒0)
{
_ tmutex。定时等待(1000);
转到开始1
}
int 64 _ t iNow=TC _ Common:现在2 ms();
int m=毫秒
开始2:
bool b=_ tmutex。定时等待(密尔秒);
//完成处理了
if(finish())
{
返回真实的
}
如果(!b)
{
返回错误的
}
millsecond=max((int64_t )0,m-(TC _ Common:now2ms()-iNow));
转到开始2
返回错误的
}
_tmutex.time
dWait(millsecond)方法唤醒。反复判断是否所有的工作是否完成: bool TC_ThreadPool:: finish() { return _startqueue. empty() && _jobqueue .empty() && _busthread. empty() && _bAllDone; }整体cpp实现如下:
TC_ThreadPool ::KeyInitialize TC_ThreadPool::g_key_initialize; pthread_key_t TC_ThreadPool::g_key ; void TC_ThreadPool::destructor( void *p) { ThreadData *ttd = ( ThreadData*)p; if(ttd) { delete ttd; } } void TC_ThreadPool::exit() { TC_ThreadPool:: ThreadData *p = getThreadData(); if(p) { delete p; int ret = pthread_setspecific( g_key, NULL ); if(ret != 0) { throw TC_ThreadPool_Exception ("[TC_ThreadPool::setThreadData] pthread_setspecific error", ret); } } _jobqueue. clear(); } void TC_ThreadPool::setThreadData( TC_ThreadPool:: ThreadData *p) { TC_ThreadPool:: ThreadData *pOld = getThreadData(); if(pOld != NULL && pOld != p) { delete pOld; } int ret = pthread_setspecific( g_key, ( void *)p); if(ret != 0) { throw TC_ThreadPool_Exception ("[TC_ThreadPool::setThreadData] pthread_setspecific error", ret); } } TC_ThreadPool ::ThreadData * TC_ThreadPool::getThreadData () { return ( ThreadData *) pthread_getspecific( g_key); } void TC_ThreadPool::setThreadData( pthread_key_t pkey, ThreadData *p) { TC_ThreadPool:: ThreadData *pOld = getThreadData(pkey); if(pOld != NULL && pOld != p) { delete pOld; } int ret = pthread_setspecific(pkey, ( void *)p); if(ret != 0) { throw TC_ThreadPool_Exception ("[TC_ThreadPool::setThreadData] pthread_setspecific error", ret); } } TC_ThreadPool ::ThreadData * TC_ThreadPool::getThreadData( pthread_key_t pkey) { return ( ThreadData *) pthread_getspecific(pkey); } TC_ThreadPool::TC_ThreadPool() : _bAllDone ( true) { } TC_ThreadPool::~TC_ThreadPool() { stop(); clear(); } void TC_ThreadPool::clear() { std::vector< ThreadWorker *>::iterator it = _jobthread. begin(); while(it != _jobthread. end()) { delete (*it); ++it; } _jobthread. clear(); _busthread. clear(); } void TC_ThreadPool::init( size_t num) { stop(); Lock sync(* this); clear(); for( size_t i = 0; i < num; i++) { _jobthread. push_back( new ThreadWorker( this)); } } void TC_ThreadPool::stop() { Lock sync(* this); std::vector< ThreadWorker *>::iterator it = _jobthread. begin(); while(it != _jobthread. end()) { if ((*it)-> isAlive()) { (*it)-> terminate(); (*it)-> getThreadControl().join (); } ++it; } _bAllDone = true; } void TC_ThreadPool::start() { Lock sync(* this); std::vector< ThreadWorker *>::iterator it = _jobthread. begin(); while(it != _jobthread. end()) { (*it)-> start(); ++it; } _bAllDone = false; } bool TC_ThreadPool:: finish() { return _startqueue. empty() && _jobqueue .empty() && _busthread. empty() && _bAllDone; } bool TC_ThreadPool::waitForAllDone( int millsecond) { Lock sync( _tmutex); start1: //任务队列和繁忙线程都是空的 if (finish ()) { return true; } //永远等待 if(millsecond < 0) { _tmutex.timedWait(1000); goto start1; } int64_t iNow = TC_Common:: now2ms(); int m = millsecond; start2: bool b = _tmutex.timedWait(millsecond); //完成处理了 if(finish ()) { return true; } if(!b) { return false; } millsecond = max((int64_t )0, m - (TC_Common ::now2ms() - iNow)); goto start2; return false; } TC_FunctorWrapperInterface *TC_ThreadPool::get( ThreadWorker *ptw) { TC_FunctorWrapperInterface *pFunctorWrapper = NULL; if(! _jobqueue. pop_front(pFunctorWrapper, 1000)) { return NULL; } { Lock sync( _tmutex); _busthread. insert(ptw); } return pFunctorWrapper; } TC_FunctorWrapperInterface *TC_ThreadPool::get() { TC_FunctorWrapperInterface *pFunctorWrapper = NULL; if(! _startqueue. pop_front(pFunctorWrapper)) { return NULL; } return pFunctorWrapper; } void TC_ThreadPool::idle( ThreadWorker *ptw) { Lock sync( _tmutex); _busthread. erase(ptw); //无繁忙线程, 通知等待在线程池结束的线程醒过来 if( _busthread. empty()) { _bAllDone = true; _tmutex.notifyAll(); } } void TC_ThreadPool::notifyT() { _jobqueue. notifyT(); }线程池使用后记 线程池适合场合 事 实上,线程池并不是万能的。它有其特定的使用场合。线程池致力于减少线程本身的开销对应用所产生的影响,这是有前提的,前提就是线程本身开销与线程执行任 务相比不可忽略。如果线程本身的开销相对于线程任务执行开销而言是可以忽略不计的,那么此时线程池所带来的好处是不明显的,比如对于FTP服务器以及Telnet服务器,通常传送文件的时间较长,开销较大,那么此时,我们采用线程池未必是理想的方法,我们可以选择“即时创建,即时销毁”的策略。 ?总之线程池通常适合下面的几个场合: ? (1)? 单位时间内处理任务频繁而且任务处理时间短 ? (2)? 对实时性要求较高。如果接受到任务后在创建线程,可能满足不了实时要求,因此必须采用线程池进行预创建。 ? (3)? 必须经常面对高突发性事件,比如Web服务器,如果有足球转播,则服务器将产生巨大的冲击。此时如果采取传统方法,则必须不停的大量产生线程,销毁线程。此时采用动态线程池可以避免这种情况的发生。
郑重声明:本文由网友发布,不代表盛行IT的观点,版权归原作者所有,仅为传播更多信息之目的,如有侵权请联系,我们将第一时间修改或删除,多谢。