python线程池threadpool实现篇

论坛 期权论坛 脚本     
niminba   2021-5-23 05:09   1609   0

本文为大家分享了threadpool线程池中所有的操作,供大家参考,具体内容如下

首先介绍一下自己使用到的名词:

工作线程(worker):创建线程池时,按照指定的线程数量,创建工作线程,等待从任务队列中get任务;

任务(requests):即工作线程处理的任务,任务可能成千上万个,但是工作线程只有少数。任务通过          makeRequests来创建

任务队列(request_queue):存放任务的队列,使用了queue实现的。工作线程从任务队列中get任务进行处理;

任务处理函数(callable):工作线程get到任务后,通过调用任务的任务处理函数即(request.callable_)具体     的     处理任务,并返回处理结果;

任务结果队列(result_queue):任务处理完成后,将返回的处理结果,放入到任务结果队列中(包括异常);

任务异常处理函数或回调(exc_callback):从任务结果队列中get结果,如果设置了异常,则需要调用异常回调处理异常;

任务结果回调(callback):从任务结果队列中get结果,对result进行进一步处理;

上一节介绍了线程池threadpool的安装和使用,本节将主要介绍线程池工作的主要流程:

(1)线程池的创建
(2)工作线程的启动
(3)任务的创建
(4)任务的推送到线程池
(5)线程处理任务
(6)任务结束处理
(7)工作线程的退出

下面是threadpool的定义:

class ThreadPool: 
 """A thread pool, distributing work requests and collecting results. 
 
 See the module docstring for more information. 
 
 """ 
 def __init__(self, num_workers, q_size=0, resq_size=0, poll_timeout=5): 
  pass 
 def createWorkers(self, num_workers, poll_timeout=5): 
  pass 
 def dismissWorkers(self, num_workers, do_join=False): 
  pass 
 def joinAllDismissedWorkers(self): 
  pass 
 def putRequest(self, request, block=True, timeout=None): 
  pass 
 def poll(self, block=False): 
  pass 
 def wait(self): 
  pass 

1、线程池的创建(ThreadPool(args))

task_pool=threadpool.ThreadPool(num_works)

task_pool=threadpool.ThreadPool(num_works) 
 def __init__(self, num_workers, q_size=0, resq_size=0, poll_timeout=5): 
  """Set up the thread pool and start num_workers worker threads. 
 
  ``num_workers`` is the number of worker threads to start initially. 
 
  If ``q_size > 0`` the size of the work *request queue* is limited and 
  the thread pool blocks when the queue is full and it tries to put 
  more work requests in it (see ``putRequest`` method), unless you also 
  use a positive ``timeout`` value for ``putRequest``. 
 
  If ``resq_size > 0`` the size of the *results queue* is limited and the 
  worker threads will block when the queue is full and they try to put 
  new results in it. 
 
  .. warning: 
   If you set both ``q_size`` and ``resq_size`` to ``!= 0`` there is 
   the possibilty of a deadlock, when the results queue is not pulled 
   regularly and too many jobs are put in the work requests queue. 
   To prevent this, always set ``timeout > 0`` when calling 
   ``ThreadPool.putRequest()`` and catch ``Queue.Full`` exceptions. 
 
  """ 
  self._requests_queue = Queue.Queue(q_size)#任务队列,通过threadpool.makeReuests(args)创建的任务都会放到此队列中 
  self._results_queue = Queue.Queue(resq_size)#字典,任务对应的任务执行结果</span> 
  self.workers = []#工作线程list,通过self.createWorkers()函数内创建的工作线程会放到此工作线程list中 
  self.dismissedWorkers = []#被设置线程事件并且没有被join的工作线程 
  self.workRequests = {}#字典,记录任务被分配到哪个工作线程中</span> 
  self.createWorkers(num_workers, poll_timeout) 

其中,初始化参数为:

num_works:   线程池中线程个数
q_size :   任务队列的长度限制,如果限制了队列的长度,那么当调用putRequest()添加任务时,到达限制长度后,那么putRequest将会不断尝试添加任务,除非在putRequest()设置了超时或者阻塞; 
esq_size:  任务结果队列的长度;
pool_timeout:  工作线程如果从request队列中,读取不到request,则会阻塞pool_timeout,如果仍没request则直接返回;

其中,成员变量:

self._requests_queue:  任务队列,通过threadpool.makeReuests(args)创建的任务都会放到此队列中;
self._results_queue:  字典,任务对应的任务执行 
self.workers:  工作线程list,通过self.createWorkers()函数内创建的工作线程会放到此工作线程list中;
self.dismisssedWorkers:  被设置线程事件,并且没有被join的工作线程
self.workRequests:  字典,列为空,那么break,返回,否则进入第4步;

(4)如果任务处理过程中出现异常,即设置了request.exception,并且设置了异常处理回调即request.exc_callback则执行异常回调,再回调中处理异常,返回后将任务从任务列表self.workRequests中移除,继续get任务,返回第1步。否则进入第5步;

(5)如果设置了任务结果回调即request.callback不为空,则执行任务结果回调即request.callbacl(request,result),并
将任务从任务列表self.workRequests中移除,继续get任务,返回第1步。

(6)重复进行上面的步骤直到抛出异常,或者任务队列为空,则poll返会;

至此抛出NoResultPending wait操作接受此异常后,至此wait()返回。

7、工作线程的退出

threadpool提供的工作线程退出的的操作有dismissWorkers()和joinAllDismissedWorker()操作:

def dismissWorkers(self, num_workers, do_join=False): 
 """Tell num_workers worker threads to quit after their current task.""" 
 dismiss_list = [] 
 for i in range(min(num_workers, len(self.workers))): 
  worker = self.workers.pop() 
  worker.dismiss() 
  dismiss_list.append(worker) 
 
 if do_join: 
  for worker in dismiss_list: 
   worker.join() 
 else: 
  self.dismissedWorkers.extend(dismiss_list) 
 
def joinAllDismissedWorkers(self): 
 """Perform Thread.join() on all worker threads that have been dismissed. 
 """ 
 for worker in self.dismissedWorkers: 
  worker.join() 
 self.dismissedWorkers = [] 

从dismissWorkers可看出,主要工作是从self.workers 工作线程中pop出指定的线程数量,并且设置此线程的线程事件,设置线程事件后,此线程self.run()函数,则会检测到此设置,并结束线程。

如果设置了在do_join,即设置了在此函数中join退出的线程,那么对退出的线程执行join操作。否则将pop出的线程放入到self.dismissedWorkers中,以等待joinAllDismissedWorkers操作去处理join线程。

8、总结

到此为止,threadpool线程池中所有的操作介绍完毕,其实现也做了具体的介绍。从上面可看出,线程池并没有那么复杂,只有几个简单的操作,主要是了解整个处理流程即可。

希望大家多多提出建议和意见。

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持社区。

分享到 :
0 人收藏
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

积分:1060120
帖子:212021
精华:0
期权论坛 期权论坛
发布
内容

下载期权论坛手机APP