##内容回顾
# JoinableQueue 可以被join的队列 join是等待任务结束 队列怎么叫结束? 调用task_done一次则表示有一个数据被处理完成了 当task_done次数等于put的次数就意味着任务处理完成了 这也是join的执行时机 该队列可以明确告知数据的使用方,所有数据都已经处理完成 在生产者消费者模型中,解决了消费者不知道何时算是任务结束的问题 具体过程:主进程先等待所有的生产者进程生成完毕,再等队列中的数据被全部处理,这就意味着,任务全部结束 # 多线程 使用多线程 多进程的目的 是一致 ,都是为了并发执行任务,从而提高效率 什么是线程: 线程是操作系统运算调度的最小单位 (CPU最小执行单位),线程被包含在进程中,一个线程就是一个固定的执行流程 (控制流) 线程的特点: 进程是不能被执行,进程是一个资源单位,其中包含了程序运行所需要的所有资源, 线程才是真正的执行单位,光有进程程序是无法运行的,必须先创建进程将资源加载到进程中,在启动线程来执行任务 一个进程至少包含一个线程,称之为主线程,主线程是由操作系统来开启, 一个进程可以包含多个线程,来提高程序的效率 线程与进程的区别: 线程创建的开销远小于进程 统一进程中的所有线程共享进程内的资源 线程之间没有父子关系,都是平等的,PID相同 如何选择: 要根据具体的任务类型,IO密集 计算密集 ## 线程的使用方法与进程一模一样 开启线程的位置可以是任何位置 # 守护线程 守护线程会在所有非守护线程结束时一起结束 ,当然守护可以提前结束 # 线程安全问题 并发操作同一个资源 可能导致数据错乱的问题 解决方案: 加互斥锁 互斥锁 mutex 递归锁 Rlock 同一线程可以多次执行acquire() 信号量 semaphore 死锁问题: 不止一个锁,分别被不同线程持有,相互等待对方释放,就会导致锁死问题
##基于多线程实现并发的套接字案例
---------------------------服务器.py---------------------------------------------- import socket from concurrent.futures import ThreadPoolExecutor pool = ThreadPoolExecutor(5) def working(client): while True: try: data = client.recv(1024) if not data: client.close() break client.send(data.upper()) except ConnectionResetError: client.close() break def server(): server = socket.socket() server.bind(("127.0.0.1", 1688)) server.listen() while True: client, addr = server.accept() # 把任务提交到线程池 pool.submit(working,client) # working(client) server() ---------------------------客户端.py---------------------------------------------- import socket client = socket.socket() client.connect(("127.0.0.1",1688)) while True: msg = input(":").strip() if not msg:continue client.send(msg.encode("utf-8")) data = client.recv(1024).decode("utf-8") print(data)
##GIL 全局解释器锁
#1、什么是GIL? “”“ 在CPython中,这个全局解释器锁,也称为GIL,是一个互斥锁,防止多个线程在同一时间执行Python字节码,这个锁是非常重要的,因为CPython的内存管理非线程安全的,很多其他的特性依赖于GIL,所以即使它影响了程序效率也无法将其直接去除。 需要知道的是,解释器并不只有CPython,还有PyPy,JPython等等。GIL也仅存在与CPython中,这并不是Python这门语言的问题,而是CPython解释器的问题! ”“” #2、GIL与GC的孽缘 “”“ 在使用Python中进行编程时,程序员无需参与内存的管理工作,这是因为Python有自带的内存管理机制,简称GC。那么GC与GIL有什么关联? 要搞清楚这个问题,需先了解GC的工作原理,Python中内存管理使用的是引用计数,每个数会被加上一个整型的计数器,表示这个数据被引用的次数,当这个整数变为0时则表示该数据已经没有人使用,成了垃圾数据。 当内存占用达到某个阈值时,GC会将其他线程挂起,然后执行垃圾清理操作,垃圾清理也是一串代码,也就需要一条线程来执行。 GC与其他线程都在竞争解释器的执行权,而CPU何时切换,以及切换到哪个线程都是无法预支的,这样一来就造成了竞争问题,假设线程1正在定义变量a=10,而定义变量第一步会先到到内存中申请空间把10存进去,第二步将10的内存地址与变量名a进行绑定,如果在执行完第一步后,CPU切换到了GC线程,GC线程发现10的地址引用计数为0则将其当成垃圾进行了清理,等CPU再次切换到线程1时,刚刚保存的数据10已经被清理掉了,导致无法正常定义变量。 当然其他一些涉及到内存的操作同样可能产生问题问题,为了避免GC与其他线程竞争解释器带来的问题,CPython简单粗暴的给解释器加了互斥锁 ”“” #3、GIL的加锁与解锁时机 “”“ 加锁: 只有有一个线程要使用解释器就立马枷锁 释放: 该线程任务结束 该线程遇到IO 该线程使用解释器过长 默认100纳秒 ”“” #4、总结: “”“ 在CPython中,有了GIL后 优点:多个线程将不可能在同一时间使用解释器,从而保证了解释器的数据安全 缺点:GIL的特性使得多线程无法并行,会把线程的并行变成串行,导致效率降低 ”“” #5、解决方案:区分任务类型 “”“ 1.单核下无论是IO密集还是计算密集GIL都不会产生任何影响 2.多核下对于IO密集任务,GIL会有细微的影响,基本可以忽略 3.Cpython中IO密集任务应该采用多线程,计算密集型应该采用多进程
4.现在的计算机基本上都是多核,python对于计算密集型的任务开多线程的效率并不能带来多大性能上的提升,甚至不如串行(没有大量切换),但是,对于IO密集型的任务效率还是有显著提升的。 ”“” #计算密集型的效率测试 “”“ from multiprocessing import Process from threading import Thread import time def task(): for i in range(10000000): i += 1 if __name__ == '__main__': start_time = time.time() # 多进程 # p1 = Process(target=task) # p2 = Process(target=task) # p3 = Process(target=task) # p4 = Process(target=task) # 多线程 p1 = Thread(target=task) p2 = Thread(target=task) p3 = Thread(target=task) p4 = Thread(target=task) p1.start() p2.start() p3.start() p4.start() p1.join() p2.join() p3.join() p4.join() print(time.time()-start_time) ”“” #IO密集型的效率测试 “”“ from multiprocessing import Process from threading import Thread import time def task(): with open("test.txt",encoding="utf-8") as f: f.read() if __name__ == '__main__': start_time = time.time() # 多进程 # p1 = Process(target=task) # p2 = Process(target=task) # p3 = Process(target=task) # p4 = Process(target=task) # 多线程 p1 = Thread(target=task) p2 = Thread(target=task) p3 = Thread(target=task) p4 = Thread(target=task) p1.start() p2.start() p3.start() p4.start() p1.join() p2.join() p3.join() p4.join() print(time.time()-start_time) ”“” #6、自定义的线程锁与GIL的区别 “”“ GIL保护的是解释器级别的数据安全,比如对象的引用计数,垃圾分代数据等等,具体参考垃圾回收机制详解。 对于程序中自己定义的数据则没有任何的保护效果,这一点在没有介绍GIL前我们就已经知道了,所以当程序中出现了共享自定义的数据时就要自己加锁 ”“” #自定义锁示例 “”“ from threading import Thread,Lock import time lock = Lock() a = 0 def task(): global a lock.acquire() temp = a time.sleep(0.01) a = temp + 1 lock.release() t1 = Thread(target=task) t2 = Thread(target=task) t1.start() t2.start() t1.join() t2.join() print(a) 过程分析: 1.线程1获得CPU执行权,并获取GIL锁执行代码 ,得到a的值为0后进入睡眠,释放CPU并释放GIL,不释放lock 2.线程2获得CPU执行权,并获取GIL锁,尝试获取lock失败,无法执行,释放CPU并释放GIL 3.线程1睡醒后获得CPU执行权,并获取GIL继续执行代码 ,将temp的值0+1后赋给a,执行完毕释放CPU释放GIL,释放lock,此时a的值为1 4.线程2获得CPU执行权,获取GIL锁,尝试获取lock成功,执行代码,得到a的值为1后进入睡眠,释放CPU并释放GIL,不释放lock 5.线程2睡醒后获得CPU执行权,获取GIL继续执行代码 ,将temp的值1+1后赋给a,执行完毕释放CPU释放GIL,释放lock,此时a的值为2 ”“”
##线程池与进程池
#线程池和进程池理论基础 #1、什么是进程/线程池? 池表示一个容器,本质上就是一个存储进程或线程的列表 #2、池子中存储线程还是进程? 如果是IO密集型任务使用线程池,如果是计算密集任务则使用进程池 #3、为什么需要进程/线程池? “”“ 1.自动管理线程的开启和销毁 2.自动分配任务给空闲的线程 3.可以线程开启线程的数量 保证系统稳定 信号量中是限制同时并发多少,但是线程已经全都建完了 ”“” #4、如何使用 “”“ 1.创建池子 2.submit 提交任务 3.pool.shutdown() # 等待所有任务全部完毕 销毁所有线程 后关闭线程池 关闭后就不能提交新任务了 ”“” #示例1:多线程对比多进程 """ io任务 """ # from multiprocessing import Process # from threading import Thread # import time # def task(): # time.sleep(2) # # if __name__ == '__main__': # start = time.time() # # ps = [] # for i in range(100): # p = Process(target=task) # p.start() # ps.append(p) # # for p in ps: # p.join() # print(time.time() - start) # "对于io'密集型 使用多进程反而可能效率比不上多线程 " """ 计算密集型 """ # # # from multiprocessing import Process # from threading import Thread # import time # # def task(): # for i in range(100000000): # 1 * 10 # # # if __name__ == '__main__': # start = time.time() # ps = [] # for i in range(5): # p = Thread(target=task) # p.start() # ps.append(p) # # for p in ps: # p.join() # print(time.time() - start) """ 对于计算密集型任务而言 应该使用多进程 注意:并行的任务不能太多 开启进程非常消耗资源 """ # from threading import Thread,Lock # import time # # a = 0 # def task(): # global a # temp = a # time.sleep(0.01) # a = temp + 1 # # t1 = Thread(target=task) # t2 = Thread(target=task) # t1.start() # t2.start() # # # t1.join() # t2.join() # print(a) #示例2:线程的常用方法 “”“ from threading import Thread,current_thread,enumerate,active_count # 获取当前线程对象 # print(current_thread()) # print(current_thread().__class__) # # def task(): # print(current_thread()) # print(current_thread().__class__) # # Thread(target=task).start() # 获取正常运行的所有线程对象 # print(enumerate()) import time Thread(target=lambda : time.sleep(100)).start() # 获取存活的线程数量 print(active_count()) # 获取线程的名称 其他属性进程对象差不多 print(current_thread().getName()) ”“” #示例3:线程池和进程池 “”“ import os,time # 获取CPU核心数 # print(os.cpu_count()) """ 池 Pool 指得是一个容器 线程池就是用来存储线程对象的 容器 """ from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor from threading import enumerate,current_thread # 1.创建池子 可以指定池子里有多少线程 如果不指定默认为CPU个数 * 5 # 不会立即开启线程 会等到有任务提交后在开启线程 # pool = ThreadPoolExecutor(10) # 线程池最大值,应该机器所能承受的最大值 当然需要考虑你的机器有几个任务要做 # # # print(enumerate()) # def task(name,age): # print(name) # print(current_thread().name,"run") # time.sleep(2) # # # # # 该函数提交任务到线程池中 # pool.submit(task,"jerry",10) #任务的参数 直接写到后面不需要定义参数名称 因为是可变位置参数 # pool.submit(task,"owen",20) # pool.submit(task) # # time.sleep(5) # # # # print(enumerate()) """ 线程池,不仅帮我们管理了线程的开启和销毁,还帮我们管理任务的分配 特点: 线程池中的线程只要开启之后 即使任务结束也不会立即结束 因为后续可能会有新任务 避免了频繁开启和销毁线程造成的资源浪费 1.创建一个线程池 2.使用submit提交任务到池子中 ,线程池会自己为任务分配线程 """ # 进程池的使用 同样可以设置最大进程数量 默认为cpu的个数 pool = ProcessPoolExecutor() # 具体的值也要参考机器性能 def task(name): print(os.getpid()) print(name) if __name__ == '__main__': pool.submit(task,"jerry") pool.shutdown() pool.submit(task, "jerry")#RuntimeError: cannot schedule new futures after shutdown ”““ #示例4:线程池的shutdown方法 ”“” from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor from threading import current_thread,enumerate import time pool = ThreadPoolExecutor(3) def task(): print(current_thread().name) print(current_thread().isDaemon()) time.sleep(1) for i in range(5): pool.submit(task) st = time.time() pool.shutdown() # 等待所有任务全部完毕 销毁所有线程 后关闭线程池 # pool.submit(task) print(time.time() - st) print("over") “”“
##同步 异步
#同步异步理论基础 同步异步-阻塞非阻塞,经常会被程序员提及,并且概念非常容易混淆! #1、阻塞非阻塞指的是程序的运行状态 “”“ 阻塞:当程序执行过程中遇到了IO操作,在执行IO操作时,程序无法继续执行其他代码,称为阻塞! 非阻塞:程序在正常运行没有遇到IO操作,或者通过某种方式使程序即时遇到了也不会停在原地,还可以执行其他操作,以提高CPU的占用率 ”“” #2、并发 - 并行 指的是多任务处理方式 “”“ 并发:多个任务看起来像是同时运行 ,本质是切换+保存状态 并行:真正的同时进行中, 必须具备多核处理器 ”“” #3、同步-异步 指的是提交任务的方式 """ 同步指调用:发起任务后必须在原地等待任务执行完成,才能继续执行 异步指调用:发起任务后必须不用等待任务执行,可以立即开启执行其他操作 """ #4、 同步 != 阻塞 异步 != 非阻塞 """ 指的是发起任务后,必须在原地等待,直到任务完成拿到结果 ,称之为同步 阻塞时程序会被剥夺CPU执行权 .程序调用默认情况就是同步的 """ #5、在python中,异步的效率明显高于同步 ,但是任务的执行结果如何获取呢?通过多线程或多进程 #6、异步回调 “”“ ### 什么是异步回调 异步回调指的是:在发起一个异步任务的同时指定一个函数,在异步任务完成时会自动的调用这个函数 ### 为什么需要异步回调 之前在使用线程池或进程池提交任务时,如果想要处理任务的执行结果则必须调用result函数或是shutdown函数,而它们都是是阻塞的,会等到任务执行完毕后才能继续执行,这样一来在这个等待过程中就无法执行其他任务,降低了效率,所以需要一种方案,即保证解析结果的线程不用等待,又能保证数据能够及时被解析,该方案就是异步回调 ###总结 异步回调指的就是一个函数,通过add_done_callback(回调函数对象)方法,该函数会在任务后自动被调用,并且会传入Future对象 , 通过Future对象的result()获取执行结果 , 有了回调函数 就可以在任务完成时 及时处理它 通常异步任务都会绑定一个回调函数,用来处理任务结果 ###注意 1. 使用进程池时,回调函数都是主进程中执行执行 2.如果你的任务结果需要交给父进程来处理,那建议回调函数,回调函数会自动将数据返回给父进程,不需要自己处理IPC 3. 使用线程池时,回调函数的执行线程是不确定的,哪个线程空闲就交给哪个线程,原因是 线程之间数据本来是共享的 4. 回调函数默认接收一个参数就是这个任务对象自己,再通过对象的result函数来获取任务的处理结果 ”“” #示例1:异步同步 “”“ 一般情况当不加回调函数情况下,开启的异步调用pool.submit(task)需要给主线程返回值,办法就是可以pool.shutdown() # 阻塞直到线程池所有任务全部完成
会导致主线卡在原地,或者print("执行的结果:",res.result()) # 会导致主线卡在原地 由异步调用又变成了同步了 要等待子线程结束才能打印
主线程over,所以方法就是加回调函数
”“” from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor import time from concurrent.futures._base import Future pool = ThreadPoolExecutor() # my cpu_count = 6 so 6 * 5 = 30 def task(): time.sleep(2) print("执行完成!") return "一瓶黑牛!" print("start") # task()# 同步调用 # 1.发起一个异步任务 res = pool.submit(task) # 异步调用 res就是一个表示异步任务的对象 # 2.定义一个回调函数 传的参数就是一个完成后任务对象 def finished(arg): print(arg.result()) print("黑牛买回来了! ") pass # 3.给这个异步任务添加了一个回调函数 res.add_done_callback(finished) # pool.shutdown() # 阻塞直到线程池所有任务全部完成 会导致主线卡在原地 # print(res) # print("执行的结果:",res.result()) # 会导致主线卡在原地 print("over") #示例2:程序中的异步调用并获取结果方式1 from concurrent.futures import ThreadPoolExecutor from threading import current_thread import time pool = ThreadPoolExecutor(3) def task(i): time.sleep(0.01) print(current_thread().name,"working..") return i ** i if __name__ == '__main__': objs = [] for i in range(3): res_obj = pool.submit(task,i) # 异步方式提交任务# 会返回一个对象用于表示任务结果 objs.append(res_obj) # 该函数默认是阻塞的 会等待池子中所有任务执行结束后执行 pool.shutdown(wait=True) # 从结果对象中取出执行结果 for res_obj in objs: print(res_obj.result()) print("over") #示例3:程序中的异步调用并获取结果方式2 “”“ from concurrent.futures import ThreadPoolExecutor from threading import current_thread import time pool = ThreadPoolExecutor(3) def task(i): time.sleep(0.01) print(current_thread().name,"working..") return i ** i if __name__ == '__main__': objs = [] for i in range(3): res_obj = pool.submit(task,i) # 会返回一个对象用于表示任务结果 print(res_obj.result()) #result是同步的一旦调用就必须等待 任务执行完成拿到结果 print("over") ”“” #示例4:异步直接调用 """ 爬虫是干啥的 从网络中下载某个页面数据 在从页面中提取有价值的数据 """ import requests from concurrent.futures import ThreadPoolExecutor from threading import current_thread pool = ThreadPoolExecutor() # 获取数据 def get_data(url): response = requests.get(url) # return url,response.text print("%s下载完成!" % current_thread().name) parser(url,response.text) # 解析数据 def parser(url,data): # url,data = obj.result() print("%s 长度%s" % (url, len(data))) print("%s解析完成!" % current_thread().name) urls = [ "http://www.baidu.com", "http://www.qq.com", "http://www.python.org", "http://www.sina.com", "http://www.oldboyedu.com", ] """ 为嘛使用异步回调 1.生产者和消费者解开了耦合 2.消费者可以及时处理生产完成的数据 """ for u in urls: obj = pool.submit(get_data,u) # obj.add_done_callback(parser) print("提交完成!")
#示例5:异步回调
"""
爬虫是干啥的
从网络中下载某个页面数据
在从页面中提取有价值的数据
爬虫是干啥的
从网络中下载某个页面数据
在从页面中提取有价值的数据
"""
import requests
from concurrent.futures import ThreadPoolExecutor
from threading import current_thread
import requests
from concurrent.futures import ThreadPoolExecutor
from threading import current_thread
pool = ThreadPoolExecutor(2)
# 获取数据
def get_data(url):
response = requests.get(url)
print("%s下载完成!" % current_thread().name)
return url,response.text
def get_data(url):
response = requests.get(url)
print("%s下载完成!" % current_thread().name)
return url,response.text
# 解析数据
def parser(obj):
url,data = obj.result()
print("%s 长度%s" % (url, len(data)))
print("%s解析完成!" % current_thread().name)
urls = [
"http://www.baidu.com",
"http://www.qq.com",
"http://www.python.org",
"http://www.sina.com",
"http://www.oldboyedu.com",
]
"http://www.baidu.com",
"http://www.qq.com",
"http://www.python.org",
"http://www.sina.com",
"http://www.oldboyedu.com",
]
"""
为嘛使用异步回调
1.生产者和消费者解开了耦合
2.消费者可以及时处理生产完成的数据
"""
for u in urls:
obj = pool.submit(get_data,u)
obj.add_done_callback(parser)
print("提交完成!")
#示例6: “”“ 先来看一个案例: 在编写爬虫程序时,通常都是两个步骤: 1.从服务器下载一个网页文件 2.读取并且解析文件内容,提取有用的数据 按照以上流程可以编写一个简单的爬虫程序 要请求网页数据则需要使用到第三方的请求库requests可以通过pip或是pycharm来安装,在pycharm中点击settings->解释器->点击+号->搜索requests->安装 ”“” import requests,re,os,random,time from concurrent.futures import ProcessPoolExecutor def get_data(url): print("%s 正在请求%s" % (os.getpid(),url)) time.sleep(random.randint(1,2)) response = requests.get(url) print(os.getpid(),"请求成功 数据长度",len(response.content)) #parser(response) # 3.直接调用解析方法 哪个进程请求完成就那个进程解析数据 强行使两个操作耦合到一起了 return response def parser(obj): data = obj.result() htm = data.content.decode("utf-8") ls = re.findall("href=.*?com",htm) print(os.getpid(),"解析成功",len(ls),"个链接") if __name__ == '__main__': pool = ProcessPoolExecutor(3) urls = ["https://www.baidu.com", "https://www.sina.com", "https://www.python.org", "https://www.tmall.com", "https://www.mysql.com", "https://www.apple.com.cn"] # objs = [] for url in urls: # res = pool.submit(get_data,url).result() # 1.同步的方式获取结果 将导致所有请求任务不能并发 # parser(res) obj = pool.submit(get_data,url) # obj.add_done_callback(parser) # 4.使用异步回调,保证了数据可以被及时处理,并且请求和解析解开了耦合 # objs.append(obj) # pool.shutdown() # 2.等待所有任务执行结束在统一的解析 # for obj in objs: # res = obj.result() # parser(res) # 1.请求任务可以并发 但是结果不能被及时解析 必须等所有请求完成才能解析 # 2.解析任务变成了串行, #示例6:进程池异步回调 import requests from concurrent.futures import ProcessPoolExecutor from threading import current_thread import os pool = ProcessPoolExecutor(2) # 获取数据 def get_data(url): response = requests.get(url) print("%s下载完成!" % os.getpid()) return url, response.text # 解析数据 def parser(obj): url, data = obj.result() print("%s 长度%s" % (url, len(data))) print("%s解析完成!" % os.getpid()) urls = [ "http://www.baidu.com", "http://www.qq.com", "http://www.python.org", "http://www.sina.com", "http://www.oldboyedu.com", ] """ 为嘛使用异步回调 1.生产者和消费者解开了耦合 2.消费者可以及时处理生产完成的数据 """ if __name__ == '__main__': for u in urls: obj = pool.submit(get_data, u) obj.add_done_callback(parser) print("提交完成!") print("父进程pid:",os.getpid())