python中多线程与多进程中的数据共享问题

论坛 期权论坛 期权     
序语程言   2019-7-28 23:17   2140   0
之前在写多线程与多进程的时候,因为一般情况下都是各自完成各自的任务,各个子线程或者各个子进程之前并没有太多的联系,如果需要通信的话我会使用队列或者数据库来完成,但是最近我在写一些多线程与多进程的代码时,发现如果它们需要用到共享变量的话,需要有一些注意的地方
多线程之间的共享数据[h1]标准数据类型在线程间共享[/h1]看以下代码
  1. #coding:utf-8import threadingdef test(name,data):    print("in thread {} name is {}".format(threading.current_thread(),name))    print("data is {} id(data) is {}".format(data,id(data)))if __name__ == '__main__':    d = 5    name = "杨彦星"    for i in range(5):        th = threading.Thread(target=test,args=(name,d))        th.start()
复制代码
这里我创建一个全局的int变量d,它的值是5,当我在5个线程中调用test函数时,将d作为参数传进去,那么这5个线程所拥有的是同一个d吗?我在test函数中通过
  1. id(data)
复制代码
来打印一下它们的ID,得到了如下的结果
  1. in thread  name is 杨彦星data is 5 id(data) is 1763791776in thread  name is 杨彦星data is 5 id(data) is 1763791776in thread  name is 杨彦星data is 5 id(data) is 1763791776in thread  name is 杨彦星data is 5 id(data) is 1763791776in thread  name is 杨彦星data is 5 id(data) is 1763791776
复制代码
从结果中可以看到,在5个子线程中,data的id都是1763791776,说明在主线程中创建了变量d,在子线程中是可以共享的,在子线程中对共享元素的改变是会影响到其它线程的,所以如果要对共享变量进行修改时,也就是线程不安全的,需要加锁。
[h1]自定义类型对象在线程间共享[/h1]如果我们要自定义一个类呢,将一个对象作为变量在子线程中传递呢?会是什么效果呢?
  1. #coding:utf-8import threadingclass Data:    def __init__(self,data=None):        self.data = data    def get(self):        return self.data    def set(self,data):        self.data = datadef test(name,data):    print("in thread {} name is {}".format(threading.current_thread(),name))    print("data is {} id(data) is {}".format(data.get(),id(data)))if __name__ == '__main__':    d = Data(10)    name = "杨彦星"    print("in main thread id(data) is {}".format(id(d)))    for i in range(5):        th = threading.Thread(target=test,args=(name,d))        th.start()
复制代码
这里我定义一个简单的类,在主线程初始化了一个该类型的对象d,然后将它作为参数传给子线程,主线程和子线程分别打印了这个对象的id,我们来看一下结果
  1. in main thread id(data) is 2849240813864in thread  name is 杨彦星data is 10 id(data) is 2849240813864in thread  name is 杨彦星data is 10 id(data) is 2849240813864in thread  name is 杨彦星data is 10 id(data) is 2849240813864in thread  name is 杨彦星data is 10 id(data) is 2849240813864in thread  name is 杨彦星data is 10 id(data) is 2849240813864
复制代码
我们看到,在主线程和子线程中,这个对象的id是一样的,说明它们用的是同一个对象。
无论是标准数据类型还是复杂的自定义数据类型,它们在多线程之间是共享同一个的,但是在多进程中是这样的吗?

多进程之间的共享数据[h1]标准数据类型在进程间共享[/h1]还是上面的代码,我们先来看一下int类型的变量的子进程间的共享
  1. #coding:utf-8import threadingimport multiprocessingdef test(name,data):    print("in thread {} name is {}".format(threading.current_thread(),name))    print("data is {} id(data) is {}".format(data,id(data)))if __name__ == '__main__':    d = 10    name = "杨彦星"    print("in main thread id(data) is {}".format(id(d)))    for i in range(5):        pro = multiprocessing.Process(target=test,args=(name,d))        pro.start()
复制代码
得到的结果是
  1. in main thread id(data) is 1763791936in thread  name is 杨彦星data is 10 id(data) is 1763791936in thread  name is 杨彦星data is 10 id(data) is 1763791936in thread  name is 杨彦星data is 10 id(data) is 1763791936in thread  name is 杨彦星data is 10 id(data) is 1763791936in thread  name is 杨彦星data is 10 id(data) is 1763791936
复制代码
可以看到它们的id是一样的,说明用的是同一个变量,但是当我尝试把d由int变为了string时,发现它们又不一样了……
  1. if __name__ == '__main__':    d = 'yangyanxing'    name = "杨彦星"    print("in main thread id(data) is {}".format(id(d)))    for i in range(5):        pro = multiprocessing.Process(target=test,args=(name,d))        pro.start()
复制代码
此时得到的结果是
  1. in main thread id(data) is 2629633397040in thread  name is 杨彦星data is yangyanxing id(data) is 1390942032880in thread  name is 杨彦星data is yangyanxing id(data) is 2198251377648in thread  name is 杨彦星data is yangyanxing id(data) is 2708672287728in thread  name is 杨彦星data is yangyanxing id(data) is 2376058999792in thread  name is 杨彦星data is yangyanxing id(data) is 2261044040688
复制代码
于是我又尝试了list、Tuple、dict,结果它们都是不一样的,我又回过头来试着在多线程中使用列表元组和字典,结果在多线程中它们的id还是一样的。
这里有一个有趣的问题,如果是int类型,当值小于等于256时,它们在多进程间的id是相同的,如果大于256,则它们的id就会不同了,这个我没有查看原因。
[h1]自定义类型对象在进程间共享[/h1]
  1. #coding:utf-8import threadingimport multiprocessingclass Data:    def __init__(self,data=None):        self.data = data    def get(self):        return self.data    def set(self,data):        self.data = datadef test(name,data):    print("in thread {} name is {}".format(threading.current_thread(),name))    print("data is {} id(data) is {}".format(data.get(),id(data)))if __name__ == '__main__':    d = Data(10)    name = "杨彦星"    print("in main thread id(data) is {}".format(id(d)))    for i in range(5):        pro = multiprocessing.Process(target=test,args=(name,d))        pro.start()
复制代码
得到的结果是
  1. in main thread id(data) is 1927286591728in thread  name is 杨彦星data is 10 id(data) is 1561177927752in thread  name is 杨彦星data is 10 id(data) is 2235260514376in thread  name is 杨彦星data is 10 id(data) is 2350586073040in thread  name is 杨彦星data is 10 id(data) is 2125002248088in thread  name is 杨彦星data is 10 id(data) is 1512231669656
复制代码
可以看到它们的id是不同的,也就是不同的对象。
[h1]在多进程间如何共享数据[/h1]我们看到,数据在多进程间是不共享的(小于256的int类型除外),但是我们又想在主进程和子进程间共享一个数据对象时该如何操作呢?
在看这个问题之前,我们先将之前的多线程代码做下修改
  1. #coding:utf-8import threadingimport multiprocessingclass Data:    def __init__(self,data=None):        self.data = data    def get(self):        return self.data    def set(self,data):        self.data = datadef test(name,data,lock):    lock.acquire()    print("in thread {} name is {}".format(threading.current_thread(),name))    print("data is {} id(data) is {}".format(data,id(data)))    data.set(data.get()+1)    lock.release()if __name__ == '__main__':    d = Data(0)    thlist = []    name = "yang"    lock = threading.Lock()    for i in range(5):        th = threading.Thread(target=test,args=(name,d,lock))        th.start()        thlist.append(th)    for i in thlist:        i.join()    print(d.get())
复制代码
我们这个代码的目的是这样,使用自定义的Data类型对象,当经过5个子线程操作以后,每个子线程对其data值进行加1操作,最后在主线程打印对象的data值。该输出结果如下
  1. in thread  name is yangdata is  id(data) is 1805246501272in thread  name is yangdata is  id(data) is 1805246501272in thread  name is yangdata is  id(data) is 1805246501272in thread  name is yangdata is  id(data) is 1805246501272in thread  name is yangdata is  id(data) is 18052465012725
复制代码
可以看到在主线程最后打印出来了5,符合我们的预期,但是如果放到多进程中呢?因为多进程下,每个子进程所持有的对象是不同的,所以每个子进程操作的是各自的Data对象,对于主进程的Data对象应该是没有影响的,我们来看下它的结果
  1. #coding:utf-8import threadingimport multiprocessingclass Data:    def __init__(self,data=None):        self.data = data    def get(self):        return self.data    def set(self,data):        self.data = datadef test(name,data,lock):    lock.acquire()    print("in thread {} name is {}".format(threading.current_thread(),name))    print("data is {} id(data) is {}".format(data,id(data)))    data.set(data.get()+1)    lock.release()if __name__ == '__main__':    d = Data(0)    thlist = []    name = "yang"    lock = multiprocessing.Lock()    for i in range(5):        th = multiprocessing.Process(target=test,args=(name,d,lock))        th.start()        thlist.append(th)    for i in thlist:        i.join()    print(d.get())
复制代码
它的输出结果是:
  1. in thread  name is yangdata is  id(data) is 1997429477048in thread  name is yangdata is  id(data) is 3044738469504in thread  name is yangdata is  id(data) is 2715076202224in thread  name is yangdata is  id(data) is 2482736991872in thread  name is yangdata is  id(data) is 18611887837440
复制代码
最后的输出是0,说明了子进程对于主进程传入的Data对象操作其实对于主进程的对象是不起作用的,我们需要怎样的操作才能实现子进程可以操作主进程的对象呢?我们可以使用
  1. multiprocessing.managers 下的 BaseManager
复制代码
来实现
  1. #coding:utf-8import threadingimport multiprocessingfrom multiprocessing.managers import BaseManagerclass Data:    def __init__(self,data=None):        self.data = data    def get(self):        return self.data    def set(self,data):        self.data = data        BaseManager.register("mydata",Data)def test(name,data,lock):    lock.acquire()    print("in thread {} name is {}".format(threading.current_thread(),name))    print("data is {} id(data) is {}".format(data,id(data)))    data.set(data.get()+1)    lock.release()def getManager():    m = BaseManager()    m.start()    return mif __name__ == '__main__':    manager = getManager()    d = manager.mydata(0)    thlist = []    name = "yang"    lock = multiprocessing.Lock()    for i in range(5):        th = multiprocessing.Process(target=test,args=(name,d,lock))        th.start()        thlist.append(th)    for i in thlist:        i.join()    print(d.get())
复制代码
使用
  1. from multiprocessing.managers import BaseManager
复制代码
引入 BaseManager以后,在定义完Data类型之后,使用
  1. BaseManager.register("mydata",Data)
复制代码
将Data类型注册到BaseManager中,并且给了它一个名字叫
  1. mydata
复制代码
,之后就可以使用
  1. BaseManager
复制代码
对象的这个名字来初始化对象,我们来看一下输出
  1. C:\Python35\python.exe F:/python/python3Test/multask.pyin thread  name is yangdata is  id(data) is 2222932504080in thread  name is yangdata is  id(data) is 1897574510096in thread  name is yangdata is  id(data) is 2053415775760in thread  name is yangdata is  id(data) is 2766155820560in thread  name is yangdata is  id(data) is 25011598904485
复制代码
我们看到,虽然在每个子进程中使用的是不同的对象,但是它们的值却是可以“共享”的。
标准的数据类型也可以通过multiprocessing库中的Value对象,举一个简单的例子
  1. #coding:utf-8import threadingimport multiprocessingfrom multiprocessing.managers import BaseManagerclass Data:    def __init__(self,data=None):        self.data = data    def get(self):        return self.data    def set(self,data):        self.data = dataBaseManager.register("mydata",Data)def test(name,data,lock):    lock.acquire()    print("in thread {} name is {}".format(threading.current_thread(),name))    print("data is {} id(data) is {}".format(data,id(data)))    data.value +=1    lock.release()if __name__ == '__main__':    d = multiprocessing.Value("l",10) #    print(d)    thlist = []    name = "yang"    lock = multiprocessing.Lock()    for i in range(5):        th = multiprocessing.Process(target=test,args=(name,d,lock))        th.start()        thlist.append(th)    for i in thlist:        i.join()    print(d.value)
复制代码
这里使用
  1. d = multiprocessing.Value("l",10)
复制代码
初始化了一个数字类型的对象,这个类型是
  1. Synchronized wrapper for c_long
复制代码
,
  1. multiprocessing.Value
复制代码
在初始化时,第一个参数是类型,第二个参数是值,具体支持的类型如下

还可以使用ctypes库里和类初始化字符串
  1. >>> from ctypes import c_char_p>>> s = multiprocessing.Value(c_char_p, b'\xd1\xee\xd1\xe5\xd0\xc7')>>> print(s.value.decode('gbk'))杨彦星
复制代码
还可以使用Manager对象初始list,dict等
  1. #coding:utf-8import multiprocessingdef func(mydict, mylist):    # 子进程改变dict,主进程跟着改变    mydict["index1"] = "aaaaaa"     # 子进程改变List,主进程跟着改变    mydict["index2"] = "bbbbbb"    mylist.append(11)    mylist.append(22)    mylist.append(33)if __name__ == "__main__":    # 主进程与子进程共享这个字典    mydict = multiprocessing.Manager().dict()    # 主进程与子进程共享这个List    mylist = multiprocessing.Manager().list(range(5))    p = multiprocessing.Process(target=func, args=(mydict, mylist))    p.start()    p.join()    print(mylist)    print(mydict)
复制代码
其实我们这里所说的共享只是数据值上的共享,因为在多进程中,各自持有的对象都不相同,所以如果想要同步状态需要曲线救国。不过这种在自己写的小项目中可以简单的使用,如果做一些大一点的项目,还是建议不要使用这种共享数据的方式,这种大大的增加了程序间的耦合性,使用逻辑变得复杂难懂,所以建议还是使用队列或者数据库作为进程间通信的渠道。
参考文章
Python 进程之间共享数据(全局变量)
Python多进程编程-进程间共享数据

分享到 :
0 人收藏
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

积分:
帖子:
精华:
期权论坛 期权论坛
发布
内容

下载期权论坛手机APP