python进程池效率_python进程池剖析(一)

论坛 期权论坛 脚本     
已经匿名di用户   2022-5-29 19:31   1136   0

python中两个常用来处理进程的模块分别是subprocess和multiprocessing,其中subprocess通常用于执行外部程序,比如一些第三方应用程序,而不是Python程序。如果需要实现调用外部程序的功能,python的psutil模块是更好的选择,它不仅支持subprocess提供的功能,而且还能对当前主机或者启动的外部程序进行监控,比如获取网络、cpu、内存等信息使用情况,在做一些自动化运维工作时支持的更加全面。multiprocessing是python的多进程模块,主要通过启动python进程,调用target回调函数来处理任务,与之对应的是python的多线程模块threading,它们拥有类似的接口,通过定义multiprocessing.Process、threading.Thread,指定target方法,调用start()运行进程或者线程。

在python中由于全局解释锁(GIL)的存在,使用多线程,并不能大大提高程序的运行效率【1】。因此,用python处理并发问题时,尽量使用多进程而非多线程。并发编程中,最简单的模式是,主进程等待任务,当有新任务到来时,启动一个新的进程来处理当前任务。这种每个任务一个进程的处理方式,每处理一个任务都会伴随着一个进程的创建、运行、销毁,如果进程的运行时间越短,创建和销毁的时间所占的比重就越大,显然,我们应该尽量避免创建和销毁进程本身的额外开销,提高进程的运行效率。我们可以用进程池来减少进程的创建和开销,提高进程对象的复用。

实际上,python中已经实现了一个功能强大的进程池(multiprocessing.Pool),这里我们来简单剖析下python自带的进程池是如何实现的。

要创建进程池对象,需要调用Pool函数,函数的声明如下:

Pool(processes=None, initializer=None, initargs=(), maxtasksperchild=None)

Returns a process pool object

processes表示工作进程的个数,默认为None,表示worker进程数为cpu_count()

initializer表示工作进程start时调用的初始化函数,initargs表示initializer函数的参数,如果initializer不为None,在每个工作进程start之前会调用initializer(*initargs)

maxtaskperchild表示每个工作进程在退出/被其他新的进程替代前,需要完成的工作任务数,默认为None,表示工作进程存活时间与pool相同,即不会自动退出/被替换。

函数返回一个进程池(Pool)对象

Pool函数返回的进程池对象中有下面一些数据结构:

self._inqueue 接收任务队列(SimpleQueue),用于主进程将任务发送给worker进程

self._outqueue 发送结果队列(SimpleQueue),用于worker进程将结果发送给主进程

self._taskqueue 同步的任务队列,保存线程池分配给主进程的任务

self._cache = {} 任务缓存

self._processes worker进程个数

self._pool = [] woker进程队列

进程池工作时,任务的接收、分配。结果的返回,均由进程池内部的各个线程合作完成,来看看进程池内部由那些线程:

_work_handler线程,负责保证进程池中的worker进程在有退出的情况下,创建出新的worker进程,并添加到进程队列(pools)中,保持进程池中的worker进程数始终为processes个。_worker_handler线程回调函数为Pool._handler_workers方法,在进程池state==RUN时,循环调用_maintain_pool方法,监控是否有进程退出,并创建新的进程,append到进程池pools中,保持进程池中的worker进程数始终为processes个。

self._worker_handler = threading.Thread(

target=Pool._handle_workers,

args=(self, )

)

Pool._handle_workers方法在_worker_handler线程状态为运行时(status==RUN),循环调用_maintain_pool方法:

def _maintain_pool(self):

if self._join_exited_workers():

self._repopulate_pool()

_join_exited_workers()监控pools队列中的进程是否有结束的,有则等待其结束,并从pools中删除,当有进程结束时,调用_repopulate_pool(),创建新的进程:

w = self.Process(target=worker,

args=(self._inqueue, self._outqueue,

self._initializer, self._initargs,

self._maxtasksperchild)

)

self._pool.append(w)

w是新创建的进程,它是用来处理实际任务的进程,worker是它的回调函数:

def worker(inqueue, outqueue, initializer=None, initargs=(), maxtasks=None):

assert maxtasks is None or (type(maxtasks) == int and maxtasks > 0)

put = outqueue.put

get = inqueue.get

if hasattr(inqueue, '_writer'):

inqueue._writer.close()

outqueue._reader.close()

if initializer is not None:

initializer(*initargs)

completed = 0

while maxtasks is None or (maxtasks and completed < maxtasks):

try:

task = get()

except (EOFError, IOError):

debug('worker got EOFError or IOError -- exiting')

break

if task is None:

debug('worker got sentinel -- exiting')

break

job, i, func, args, kwds = task

try:

result = (True, func(*args, **kwds))

except Exception, e:

result = (False, e)

try:

put((job, i, result))

except Exception as e:

wrapped = MaybeEncodingError(e, result[1])

debug("Possible encoding error while sending result: %s" % (

wrapped))

put((job, i, (False, wrapped)))

completed += 1

debug('worker exiting after %d tasks' % completed)

所有worker进程都使用worker回调函数对任务进行统一的处理,从源码中可以看出:

它的功能是从接入任务队列中(inqueue)读取出task任务,然后根据任务的函数、参数进行调用(result = (True, func(*args, **kwds),

再将结果放入结果队列中(outqueue),如果有最大处理上限的限制maxtasks,那么当进程处理到任务数上限时退出。

_task_handler线程,负责从进程池中的task_queue中,将任务取出,放入接收任务队列(Pipe),

self._task_handler = threading.Thread(

target=Pool._handle_tasks,

args=(self._taskqueue, self._quick_put, self._outqueue, self._pool)

)

Pool._handle_tasks方法不断从task_queue中获取任务,并放入接受任务队列(in_queue),以此触发worker进程进行任务处理。当从task_queue读取到None元素时,

表示进程池将要被终止(terminate),不再处理之后的任务请求,同时向接受任务队列和结果任务队列put None元素,通知其他线程结束。

_handle_results线程,负责将处理完的任务结果,从outqueue(Pipe)中读取出来,放在任务缓存cache中,

self._result_handler = threading.Thread(

target=Pool._handle_results,

args=(self._outqueue, self._quick_get, self._cache)

)

_terminate,这里的_terminate并不是一个线程,而是一个Finalize对象

self._terminate = Finalize(

self, self._terminate_pool,

args=(self._taskqueue, self._inqueue, self._outqueue, self._pool,

self._worker_handler, self._task_handler,

self._result_handler, self._cache),

exitpriority=15

)

Finalize类的构造函数与线程构造函数类似,_terminate_pool是它的回调函数,args回调函数的参数。

_terminate_pool函数负责终止进程池的工作:终止上述的三个线程,终止进程池中的worker进程,清除队列中的数据。

_terminate是个对象而非线程,那么它如何像线程调用start()方法一样,来执行回调函数_terminate_pool呢?查看Pool源码,发现进程池的终止函数:

def terminate(self):

debug('terminating pool')

self._state = TERMINATE

self._worker_handler._state = TERMINATE

self._terminate()

函数中最后将_terminate对象当做一个方法来执行,而_terminate本身是一个Finalize对象,我们看一下Finalize类的定义,发现它实现了__call__方法:

def __call__(self, wr=None):

try:

del _finalizer_registry[self._key]

except KeyError:

sub_debug('finalizer no longer registered')

else:

if self._pid != os.getpid():

res = None

else:

res = self._callback(*self._args, **self._kwargs)

self._weakref = self._callback = self._args = \

self._kwargs = self._key = None

return res

而方法中 self._callback(*self._args, **self._kwargs) 这条语句,就执行了_terminate_pool函数,进而将进程池终止。

进程池中的数据结构、各个线程之间的合作关系如下图所示:

【1】这里针对的是CPU密集型程序,多线程并不能带来效率上的提升,相反还可能会因为线程的频繁切换,导致效率下降;如果是IO密集型,多线程进程可以利用IO阻塞等待时的空闲时间执行其他线程,提升效率。

分享到 :
0 人收藏
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

积分:81
帖子:4969
精华:0
期权论坛 期权论坛
发布
内容

下载期权论坛手机APP