commondef.h
//单位秒,监测空闲列表时间间隔,在空闲队列中超过TASK_DESTROY_INTERVAL时间的任务将被自动销毁 const int CHECK_IDLE_TASK_INTERVAL = 300; //单位秒,任务自动销毁时间间隔 const int TASK_DESTROY_INTERVAL = 60;
//监控线程池是否为空时间间隔,微秒 const int IDLE_CHECK_POLL_EMPTY = 500;
//线程池线程空闲自动退出时间间隔 ,5分钟 const int THREAD_WAIT_TIME_OUT = 300;
taskpool.cpp
#include "taskpool.h"
#include <string.h>
#include <stdio.h> #include <pthread.h>
TaskPool::TaskPool(const int & poolMaxSize) : m_poolSize(poolMaxSize) , m_taskListSize(0) , m_bStop(false) { pthread_mutex_init(&m_lock, NULL); pthread_mutex_init(&m_idleMutex, NULL); pthread_cond_init(&m_idleCond, NULL);
pthread_attr_t attr; pthread_attr_init( &attr ); pthread_attr_setdetachstate( &attr, PTHREAD_CREATE_JOINABLE ); // 让线程独立运行 pthread_create(&m_idleId, &attr, CheckIdleTask, this); //创建监测空闲任务进程 pthread_attr_destroy(&attr); }
TaskPool::~TaskPool() { if(!m_bStop) { StopPool(); } if(!m_taskList.empty()) { std::list<Task*>::iterator it = m_taskList.begin(); for(; it != m_taskList.end(); ++it) { if(*it != NULL) { delete *it; *it = NULL; } } m_taskList.clear(); m_taskListSize = 0; } if(!m_idleList.empty()) { std::list<Task*>::iterator it = m_idleList.begin(); for(; it != m_idleList.end(); ++it) { if(*it != NULL) { delete *it; *it = NULL; } } m_idleList.clear(); }
pthread_mutex_destroy(&m_lock); pthread_mutex_destroy(&m_idleMutex); pthread_cond_destroy(&m_idleCond); }
void * TaskPool::CheckIdleTask(void * arg) { TaskPool * pool = (TaskPool*)arg; while(1) { pool->LockIdle(); pool->RemoveIdleTask(); if(pool->GetStop()) { pool->UnlockIdle(); break; } pool->CheckIdleWait(); pool->UnlockIdle(); } }
void TaskPool::StopPool() { m_bStop = true; LockIdle(); pthread_cond_signal(&m_idleCond); //防止监控线程正在等待,而引起无法退出的问题 UnlockIdle(); pthread_join(m_idleId, NULL); }
bool TaskPool::GetStop() { return m_bStop; }
void TaskPool::CheckIdleWait() { struct timespec timeout; memset(&timeout, 0, sizeof(timeout)); timeout.tv_sec = time(0) + CHECK_IDLE_TASK_INTERVAL; timeout.tv_nsec = 0; pthread_cond_timedwait(&m_idleCond, &m_idleMutex, &timeout); }
int TaskPool::RemoveIdleTask() { int iRet = 0; std::list<Task*>::iterator it, next; std::list<Task*>::reverse_iterator rit = m_idleList.rbegin(); time_t curTime = time(0); for(; rit != m_idleList.rend(); ) { it = --rit.base(); if(difftime(curTime,((*it)->last_time)) >= TASK_DESTROY_INTERVAL) { iRet++; delete *it; *it = NULL; next = m_idleList.erase(it); rit = std::list<Task*>::reverse_iterator(next); } else { break; } } }
int TaskPool::AddTask(task_fun fun, void *arg) { int iRet = 0; if(0 != fun) { pthread_mutex_lock(&m_lock); if(m_taskListSize >= m_poolSize) { pthread_mutex_unlock(&m_lock); iRet = -1; //task pool is full; } else { pthread_mutex_unlock(&m_lock); Task * task = GetIdleTask(); if(NULL == task) { task = new Task; } if(NULL == task) { iRet = -2; // new failed } else { task->fun = fun; task->data = arg; pthread_mutex_lock(&m_lock); m_taskList.push_back(task); ++m_taskListSize; pthread_mutex_unlock(&m_lock); } } } return iRet; }
Task* TaskPool::GetTask() { Task *task = NULL; pthread_mutex_lock(&m_lock); if(!m_taskList.empty()) { task = m_taskList.front(); m_taskList.pop_front(); --m_taskListSize; } pthread_mutex_unlock(&m_lock); return task; }
void TaskPool::LockIdle() { pthread_mutex_lock(&m_idleMutex); }
void TaskPool::UnlockIdle() { pthread_mutex_unlock(&m_idleMutex); }
Task * TaskPool::GetIdleTask() { LockIdle(); Task * task = NULL; if(!m_idleList.empty()) { task = m_idleList.front(); m_idleList.pop_front(); } UnlockIdle(); return task; }
void TaskPool::SaveIdleTask(Task*task) { if(NULL != task) { task->fun = 0; task->data = NULL; task->last_time = time(0); LockIdle(); m_idleList.push_front(task); UnlockIdle(); } }
taskpool.h
#ifndef TASKPOOL_H #define TASKPOOL_H /* purpose @ 任务池,主要是缓冲外部高并发任务数,有manager负责调度任务 * 任务池可自动销毁长时间空闲的Task对象 * 可通过CHECK_IDLE_TASK_INTERVAL设置检查idle空闲进程轮训等待时间 * TASK_DESTROY_INTERVAL 设置Task空闲时间,超过这个时间值将会被CheckIdleTask线程销毁 * date @ 2013.12.23 * author @ haibin.wang */
#include <list> #include <pthread.h> #include "commondef.h"
//所有的用户操作为一个task, typedef void (*task_fun)(void *); struct Task { task_fun fun; //任务处理函数 void* data; //任务处理数据 time_t last_time; //加入空闲队列的时间,用于自动销毁 };
//任务池,所有任务会投递到任务池中,管理线程负责将任务投递给线程池 class TaskPool { public: /* pur @ 初始化任务池,启动任务池空闲队列自动销毁线程 * para @ maxSize 最大任务数,大于0 */ TaskPool(const int & poolMaxSize); ~TaskPool();
/* pur @ 添加任务到任务队列的尾部 * para @ task, 具体任务 * return @ 0 添加成功,负数 添加失败 */ int AddTask(task_fun fun, void* arg);
/* pur @ 从任务列表的头获取一个任务 * return @ 如果列表中有任务则返回一个Task指针,否则返回一个NULL */ Task* GetTask();
/* pur @ 保存空闲任务到空闲队列中 * para @ task 已被调用执行的任务 * return @ */ void SaveIdleTask(Task*task);
void StopPool(); public: void LockIdle(); void UnlockIdle(); void CheckIdleWait(); int RemoveIdleTask(); bool GetStop(); private: static void * CheckIdleTask(void *); /* pur @ 获取空闲的task * para @ * para @ * return @ NULL说明没有空闲的,否则从m_idleList中获取一个 */ Task* GetIdleTask(); int GetTaskSize(); private: int m_poolSize; //任务池大小 int m_taskListSize; // 统计taskList的大小,因为当List的大小会随着数量的增多而耗时增加 bool m_bStop; //是否停止 std::list<Task*> m_taskList;//所有待处理任务列表 std::list<Task*> m_idleList;//所有空闲任务列表 pthread_mutex_t m_lock; //对任务列表进行加锁,保证每次只能取一个任务 pthread_mutex_t m_idleMutex; //空闲任务队列锁 pthread_cond_t m_idleCond; //空闲队列等待条件 pthread_t m_idleId;; }; #endif
threadpool.cpp
/* purpose @ 线程池类,负责线程的创建与销毁,实现线程超时自动退出功能(半驻留) * date @ 2014.01.03 * author @ haibin.wang */
#include "threadpool.h" #include <errno.h> #include <string.h>
/* #include <iostream> #include <stdio.h> */
Thread::Thread(bool detach, ThreadPool * pool) : m_pool(pool) { pthread_attr_init(&m_attr); if(detach) { pthread_attr_setdetachstate(&m_attr, PTHREAD_CREATE_DETACHED ); // 让线程独立运行 } else { pthread_attr_setdetachstate(&m_attr, PTHREAD_CREATE_JOINABLE ); }
pthread_mutex_init(&m_mutex, NULL); //初始化互斥量 pthread_cond_init(&m_cond, NULL); //初始化条件变量 task.fun = 0; task.data = NULL; }
Thread::~Thread() { pthread_cond_destroy(&m_cond); pthread_mutex_destroy(&m_mutex); pthread_attr_destroy(&m_attr); }
ThreadPool::ThreadPool() : m_poolMax(0) , m_idleNum(0) , m_totalNum(0) , m_bStop(false) { pthread_mutex_init(&m_mutex, NULL); pthread_mutex_init(&m_runMutex,NULL); pthread_mutex_init(&m_terminalMutex, NULL); pthread_cond_init(&m_terminalCond, NULL); pthread_cond_init(&m_emptyCond, NULL); }
ThreadPool::~ThreadPool() { /*if(!m_threads.empty()) { std::list<Thread*>::iterator it = m_threads.begin(); for(; it != m_threads.end(); ++it) { if(*it != NULL) { pthread_cond_destroy( &((*it)->m_cond) ); pthread_mutex_destroy( &((*it)->m_mutex) ); delete *it; *it = NULL; } } m_threads.clear(); }*/ pthread_mutex_destroy(&m_runMutex); pthread_mutex_destroy(&m_terminalMutex); pthread_mutex_destroy(&m_mutex); pthread_cond_destroy(&m_terminalCond); pthread_cond_destroy(&m_emptyCond); }
int ThreadPool::InitPool(const int & poolMax, const int & poolPre) { if(poolMax < poolPre || poolPre < 0 || poolMax <= 0) { return -1; } m_poolMax = poolMax;
int iRet = 0; for(int i=0; i<poolPre; ++i) { Thread * thread = CreateThread(); if(NULL == thread) { iRet = -2; } }
if(iRet < 0) { std::list<Thread*>::iterator it = m_threads.begin(); for(; it!= m_threads.end(); ++it) { if(NULL != (*it) ) { delete *it; *it = NULL; } } m_threads.clear(); m_totalNum = 0; } return iRet; }
void ThreadPool::GetThreadRun(task_fun fun, void* arg) { //从线程池中获取一个线程 pthread_mutex_lock( &m_mutex); if(m_threads.empty()) { pthread_cond_wait(&m_emptyCond,&m_mutex); //阻塞等待有空闲线程 }
Thread * thread = m_threads.front(); m_threads.pop_front(); pthread_mutex_unlock( &m_mutex);
pthread_mutex_lock( &thread->m_mutex ); thread->task.fun = fun; thread->task.data = arg; pthread_cond_signal(&thread->m_cond); //触发线程WapperFun循环执行 pthread_mutex_unlock( &thread->m_mutex ); }
int ThreadPool::Run(task_fun fun, void * arg) { pthread_mutex_lock(&m_runMutex); //保证每次只能由一个线程执行 int iRet = 0; if(m_totalNum <m_poolMax) // { if(m_threads.empty() && (NULL == CreateThread()) ) { iRet = -1;//can not create new thread! } else { GetThreadRun(fun, arg); } } else { GetThreadRun(fun, arg); } pthread_mutex_unlock(&m_runMutex); return iRet; }
void ThreadPool::StopPool(bool bStop) { m_bStop = bStop; if(bStop) { //启动监控所有空闲线程是否退出的线程 Thread thread(false, this); pthread_create(&thread.m_threadId,&thread.m_attr, ThreadPool::TerminalCheck , &thread); //启动监控所有线程退出线程 //阻塞等待所有空闲线程退出 pthread_join(thread.m_threadId, NULL); } /*if(bStop) { pthread_mutex_lock(&m_terminalMutex); //启动监控所有空闲线程是否退出的线程 Thread thread(true, this); pthread_create(&thread.m_threadId,&thread.m_attr, ThreadPool::TerminalCheck , &thread); //启动监控所有线程退出线程 //阻塞等待所有空闲线程退出 pthread_cond_wait(&m_terminalCond, & m_terminalMutex); pthread_mutex_unlock(&m_terminalMutex); }*/ }
bool ThreadPool::GetStop() { return m_bStop; }
Thread * ThreadPool::CreateThread() { Thread * thread = NULL; thread = new Thread(true, this); if(NULL != thread) { int iret = pthread_create(&thread->m_threadId,&thread->m_attr, ThreadPool::WapperFun , thread); //通过WapperFun将线程加入到空闲队列中 if(0 != iret) { delete thread; thread = NULL; } } return thread; }
void * ThreadPool::WapperFun(void*arg) { Thread * thread = (Thread*)arg; if(NULL == thread || NULL == thread->m_pool) { return NULL; } ThreadPool * pool = thread->m_pool; pool->IncreaseTotalNum(); struct timespec abstime; memset(&abstime, 0, sizeof(abstime)); while(1) { if(0 != thread->task.fun) { thread->task.fun(thread->task.data); }
if( true == pool->GetStop() ) { break; //确定当前任务执行完毕后再判定是否退出线程 } pthread_mutex_lock( &thread->m_mutex ); pool->SaveIdleThread(thread); //将线程加入到空闲队列中 abstime.tv_sec = time(0) + THREAD_WAIT_TIME_OUT; abstime.tv_nsec = 0; if(ETIMEDOUT == pthread_cond_timedwait( &thread->m_cond, &thread->m_mutex, &abstime )) //等待线程被唤醒 或超时自动退出 { pthread_mutex_unlock( &thread->m_mutex ); break; } pthread_mutex_unlock( &thread->m_mutex ); }
pool->LockMutex(); pool->DecreaseTotalNum(); if(thread != NULL) { pool->RemoveThread(thread); delete thread; thread = NULL; } pool->UnlockMutex(); return 0; }
void ThreadPool::SaveIdleThread(Thread * thread ) { if(thread) { thread->task.fun = 0; thread->task.data = NULL; LockMutex(); if(m_threads.empty()) { pthread_cond_broadcast(&m_emptyCond); //发送不空的信号,告诉run函数线程队列已经不空了 } m_threads.push_front(thread); UnlockMutex(); } }
int ThreadPool::TotalThreads() { return m_totalNum; }
void ThreadPool::SendSignal() { LockMutex(); std::list<Thread*>::iterator it = m_threads.begin(); for(; it!= m_threads.end(); ++it) { pthread_mutex_lock( &(*it)->m_mutex ); pthread_cond_signal(&((*it)->m_cond)); pthread_mutex_unlock( &(*it)->m_mutex ); } UnlockMutex(); }
void * ThreadPool::TerminalCheck(void* arg) { Thread * thread = (Thread*)arg; if(NULL == thread || NULL == thread->m_pool) { return NULL; } ThreadPool * pool = thread->m_pool; while((false == pool->GetStop()) || pool->TotalThreads() >0 ) { pool->SendSignal();
usleep(IDLE_CHECK_POLL_EMPTY); } //pool->TerminalCondSignal(); return 0; }
void ThreadPool::TerminalCondSignal() { pthread_cond_signal(&m_terminalCond); }
void ThreadPool::RemoveThread(Thread* thread) { m_threads.remove(thread); }
void ThreadPool::LockMutex() { pthread_mutex_lock( &m_mutex); }
void ThreadPool::UnlockMutex() { pthread_mutex_unlock( &m_mutex ); }
void ThreadPool::IncreaseTotalNum() { LockMutex(); m_totalNum++; UnlockMutex(); } void ThreadPool::DecreaseTotalNum() { m_totalNum--; }
threadpool.h
#ifndef THREADPOOL_H #define THREADPOOL_H /* purpose @ 线程池类,负责线程的创建与销毁,实现线程超时自动退出功能(半驻留)a * 当线程池退出时创建TerminalCheck线程,负责监测线程池所有线程退出 * date @ 2013.12.23 * author @ haibin.wang */
#include <list> #include <string> #include "taskpool.h" //通过threadmanager来控制任务调度进程 //threadpool的TerminalCheck线程负责监测线程池所有线程退出
class ThreadPool; class Thread { public: Thread(bool detach, ThreadPool * pool); ~Thread(); pthread_t m_threadId; //线程id pthread_mutex_t m_mutex; //互斥锁 pthread_cond_t m_cond; //条件变量 pthread_attr_t m_attr; //线程属性 Task task; // ThreadPool * m_pool; //所属线程池 };
//线程池,负责创建线程处理任务,处理完毕后会将线程加入到空闲队列中,从任务池中 class ThreadPool { public: ThreadPool(); ~ThreadPool();
/* pur @ 初始化线程池 * para @ poolMax 线程池最大线程数 * para @ poolPre 预创建线程数 * return @ 0:成功 * -1: parameter error, must poolMax > poolPre >=0 * -2: 创建线程失败 */ int InitPool(const int & poolMax, const int & poolPre);
/* pur @ 执行一个任务 * para @ task 任务指针 * return @ 0任务分配成功,负值 任务分配失败,-1,创建新线程失败 */ int Run(task_fun fun, void* arg);
/* pur @ 设置是否停止线程池工作 * para @ bStop true停止,false不停止 */ void StopPool(bool bStop);
public: //此公有函数主要用于静态函数调用 /* pur @ 获取进程池的启停状态 * return @ */ bool GetStop(); void SaveIdleThread(Thread * thread ); void LockMutex(); void UnlockMutex(); void DecreaseTotalNum(); void IncreaseTotalNum(); void RemoveThread(Thread* thread); void TerminalCondSignal(); int TotalThreads(); void SendSignal(); private: /* pur @ 创建线程 * return @ 非空 成功,NULL失败, */ Thread * CreateThread();
/* pur @ 从线程池中获取一个一个线程运行任务 * para @ fun 函数指针 * para @ arg 函数参数 * return @ */ void GetThreadRun(task_fun fun, void* arg);
static void * WapperFun(void*); static void * TerminalCheck(void*);//循环监测是否所有线程终止线程
private: int m_poolMax;//线程池最大线程数 int m_idleNum; //空闲线程数 int m_totalNum; //当前线程总数 小于最大线程数 bool m_bStop; //是否停止线程池 pthread_mutex_t m_mutex; //线程列表锁 pthread_mutex_t m_runMutex; //run函数锁
pthread_mutex_t m_terminalMutex; //终止所有线程互斥量 pthread_cond_t m_terminalCond; //终止所有线程条件变量 pthread_cond_t m_emptyCond; //空闲线程不空条件变量
std::list<Thread*> m_threads; // 线程列表 }; #endif
threadpoolmanager.cpp
#include "threadpoolmanager.h" #include "threadpool.h" #include "taskpool.h"
#include <errno.h> #include <string.h>
/*#include <string.h> #include <sys/time.h> #include <stdio.h>*/ // struct timeval time_beg, time_end; ThreadPoolManager::ThreadPoolManager() : m_threadPool(NULL) , m_taskPool(NULL) , m_bStop(false) { pthread_mutex_init(&m_mutex_task,NULL); pthread_cond_init(&m_cond_task, NULL);
/* memset(&time_beg, 0, sizeof(struct timeval)); memset(&time_end, 0, sizeof(struct timeval)); gettimeofday(&time_beg, NULL);*/ }
ThreadPoolManager::~ThreadPoolManager() { StopAll(); if(NULL != m_threadPool) { delete m_threadPool; m_threadPool = NULL; } if(NULL != m_taskPool) { delete m_taskPool; m_taskPool = NULL; }
pthread_cond_destroy( &m_cond_task); pthread_mutex_destroy( &m_mutex_task );
/*gettimeofday(&time_end, NULL); long total = (time_end.tv_sec - time_beg.tv_sec)*1000000 + (time_end.tv_usec - time_beg.tv_usec); printf("manager total time = %d\n", total); gettimeofday(&time_beg, NULL);*/ }
int ThreadPoolManager::Init( const int &tastPoolSize, const int &threadPoolMax, const int &threadPoolPre) { m_threadPool = new ThreadPool(); if(NULL == m_threadPool) { return -1; } m_taskPool = new TaskPool(tastPoolSize); if(NULL == m_taskPool) { return -2; }
if(0>m_threadPool->InitPool(threadPoolMax, threadPoolPre)) { return -3; } //启动线程池 //启动任务池 //启动任务获取线程,从任务池中不断拿任务到线程池中 pthread_attr_t attr; pthread_attr_init( &attr ); pthread_attr_setdetachstate( &attr, PTHREAD_CREATE_JOINABLE ); pthread_create(&m_taskThreadId, &attr, TaskThread, this); //创建获取任务进程 pthread_attr_destroy(&attr); return 0; }
void ThreadPoolManager::StopAll() { m_bStop = true; LockTask(); pthread_cond_signal(&m_cond_task); UnlockTask(); pthread_join(m_taskThreadId, NULL); //等待当前所有任务执行完毕 m_taskPool->StopPool(); m_threadPool->StopPool(true); // 停止线程池工作 }
void ThreadPoolManager::LockTask() { pthread_mutex_lock(&m_mutex_task); }
void ThreadPoolManager::UnlockTask() { pthread_mutex_unlock(&m_mutex_task); }
void* ThreadPoolManager::TaskThread(void* arg) { ThreadPoolManager * manager = (ThreadPoolManager*)arg; while(1) { manager->LockTask(); //防止任务没有执行完毕发送了停止信号 while(1) //将任务队列中的任务执行完再退出 { Task * task = manager->GetTaskPool()->GetTask(); if(NULL == task) { break; } else { manager->GetThreadPool()->Run(task->fun, task->data); manager->GetTaskPool()->SaveIdleTask(task); } }
if(manager->GetStop()) { manager->UnlockTask(); break; } manager->TaskCondWait(); //等待有任务的时候执行 manager->UnlockTask(); } return 0; }
ThreadPool * ThreadPoolManager::GetThreadPool() { return m_threadPool; }
TaskPool * ThreadPoolManager::GetTaskPool() { return m_taskPool; }
int ThreadPoolManager::Run(task_fun fun,void* arg) { if(0 == fun) { return 0; } if(!m_bStop) { int iRet = m_taskPool->AddTask(fun, arg);
if(iRet == 0 && (0 == pthread_mutex_trylock(&m_mutex_task)) ) { pthread_cond_signal(&m_cond_task); UnlockTask(); } return iRet; } else { return -3; } }
bool ThreadPoolManager::GetStop() { return m_bStop; }
void ThreadPoolManager::TaskCondWait() { struct timespec to; memset(&to, 0, sizeof to); to.tv_sec = time(0) + 60; to.tv_nsec = 0;
pthread_cond_timedwait( &m_cond_task, &m_mutex_task, &to); //60秒超时 }
threadpoolmanager.h
#ifndef THREADPOOLMANAGER_H #define THREADPOOLMANAGER_H /* purpose @ * 基本流程: * 管理线程池和任务池,先将任务加入任务池,然后由TaskThread负责从任务池中将任务取出放入到线程池中 * 基本功能: * 1、工作线程可以在业务不忙的时候自动退出部分长时间不使用的线程 * 2、任务池可以在业务不忙的时候自动释放长时间不使用的资源(可通过commondef.h修改) * 3、当程序退时不再向任务池中添加任务,当任务池中所有任务执行完毕后才退出相关程序(做到程序的安全退出) * 线程资源: * 如果不预分配任何处理线程的话,ThreadPool只有当有任务的时候才实际创建需要的线程,最大线程创建数为用户指定 * 当manager销毁的时候,manager会创建一个监控所有任务执行完毕的监控线程,只有当所有任务执行完毕后manager才销毁 * 线程最大数为:1个TaskPool线程 + 1个manager任务调度线程 + ThreadPool最大线程数 + 1个manager退出监控线程 + 1线程池所有线程退出监控线程 * 线程最小数为:1个TaskPool创建空闲任务资源销毁监控线程 + 1个manager创建任务调度线程 * 使用方法: * ThreadPoolManager manager; * manager.Init(100000, 50, 5);//初始化一个任务池为10000,线程池最大线程数50,预创建5个线程的管理器 * manager.run(fun, data); //添加执行任务到manager中,fun为函数指针,data为fun需要传入的参数,data可以为NULL * * date @ 2013.12.23 * author @ haibin.wang * * 详细参数控制可以修改commondef.h中的相关变量值 */
#include <pthread.h> typedef void (*task_fun)(void *);
class ThreadPool; class TaskPool;
class ThreadPoolManager { public: ThreadPoolManager(); ~ThreadPoolManager();
/* pur @ 初始化线程池与任务池,threadPoolMax > threadPoolPre > threadPoolMin >= 0 * para @ tastPoolSize 任务池大小 * para @ threadPoolMax 线程池最大线程数 * para @ threadPoolPre 预创建线程数 * return @ 0:初始化成功,负数 初始化失败 * -1:创建线程池失败 * -2:创建任务池失败 * -3:线程池初始化失败 */ int Init(const int &tastPoolSize, const int &threadPoolMax, const int &threadPoolPre);
/* pur @ 执行一个任务 * para @ fun 需要执行的函数指针 * para @ arg fun需要的参数,默认为NULL * return @ 0 任务分配成功,负数 任务分配失败 * -1:任务池满 * -2:任务池new失败 * -3:manager已经发送停止信号,不再接收新任务 */ int Run(task_fun fun,void* arg=NULL);
public: //以下public函数主要用于静态函数调用 bool GetStop(); void TaskCondWait(); TaskPool * GetTaskPool(); ThreadPool * GetThreadPool(); void LockTask(); void UnlockTask(); void LockFull();
private: static void * TaskThread(void*); //任务处理线程 void StopAll();
private: ThreadPool *m_threadPool; //线程池 TaskPool * m_taskPool; //任务池 bool m_bStop; // 是否终止管理器
pthread_t m_taskThreadId; // TaskThread线程id pthread_mutex_t m_mutex_task; pthread_cond_t m_cond_task; }; #endif
main.cpp
#include <iostream> #include <string> #include "threadpoolmanager.h" #include <sys/time.h> #include <string.h> #include <stdlib.h> #include <pthread.h>
using namespace std; int seq = 0; int billNum =0; int inter = 1; pthread_mutex_t m_mutex; void myFunc(void*arg) { pthread_mutex_lock(&m_mutex); seq++; if(seq%inter == 0 ) { cout << "fun 1=" << seq << endl; } if(seq>=1000000000) { cout << "billion" << endl; seq = 0; billNum++; } pthread_mutex_unlock(&m_mutex); //sleep(); }
int main(int argc, char** argv) { if(argc != 6) { cout << "必须有5个参数 任务执行次数 任务池大小 线程池大小 预创建线程数 输出间隔" << endl; cout << "eg: ./test 999999 10000 100 10 20" << endl; cout << "上例代表创建一个间隔20个任务输出,任务池大小为10000,线程池大小为100,预创建10个线程,执行任务次数为:999999" << endl; return 0; } double loopSize = atof(argv[1]); int taskSize = atoi(argv[2]); int threadPoolSize = atoi(argv[3]); int preSize = atoi(argv[4]); inter = atoi(argv[5]);
pthread_mutex_init(&m_mutex,NULL); ThreadPoolManager manager; if(0>manager.Init(taskSize, threadPoolSize, preSize)) { cout << "初始化失败" << endl; return 0; } cout << "*******************初始化完成*********************" << endl; struct timeval time_beg, time_end; memset(&time_beg, 0, sizeof(struct timeval)); memset(&time_end, 0, sizeof(struct timeval)); gettimeofday(&time_beg, NULL); double i=0; for(; i<loopSize; ++i) { while(0>manager.Run(myFunc,NULL)) { usleep(100); } } gettimeofday(&time_end, NULL); long total = (time_end.tv_sec - time_beg.tv_sec)*1000000 + (time_end.tv_usec - time_beg.tv_usec); cout << "total time =" << total << endl; cout << "total num =" << i << " billion num=" << billNum<< endl; cout << __FILE__ << "将关闭所有线程" << endl; //pthread_mutex_destroy(&m_mutex); return 0; }
|