多线程读取IP摄像头(Python)

论坛 期权论坛 期权     
深度学习与python   2019-7-20 09:57   3522   0
算法工程师之路
更多精彩内容
点我查看关注哦

在深度学习时代(这么说也不为过)的今天,我们做各种视觉任务时候都会想到使用深度学习,但是大家也都知道深度学习的模型如果想要使用的话,设备必须得有,虽然各种各样的量化策略和剪枝策略大大加速了模型的推理能力,但是实时的话在低配电脑还是不可用!但是实际中有些视觉任务不怎么依赖实时性,我们只需要保证1s处理一帧图片就可以了,或者几十秒处理一帧也可以。那么这种处理策略怎么处理呢?特别对于IP摄像头,它是以数据流的形式传输,因此当其帧率较高时,本地处理程序会处理不过来,导致卡帧(延时)和程序卡死!我们一起来看看吧!
threading模块(线程)
[h2]在Python中多线程模块有两个,分别是thread(Python3中改名为_thread)和threading模块,其中_thread模块只是提供了基本的线程和线程锁的功能,而threading模块提供的是更加高级和安全的线程管理!那什么是线程呢?[/h2][h2]线程是操作系统能够进行运算调度的最小单位。它被包含在进程之中,是进程中的实际运作单位。一条线程指的是进程中一个单一顺序的控制流,一个进程中可以并发多个线程,每条线程并行执行不同的任务。一个线程是一个execution context(执行上下文),即一个cpu执行时所需要的一串指令。就类似于操作系统的多任务工作模式,我们可以形象的认为,CPU将一秒的执行时间分配给各个任务,在一个任务执行后,保存信息并切换到下一个任务,然后循环往复,这样就会增加CPU的执行效率,即使一个任务需要一直等待,那么CPU也会在等待的这段时间切换到其他任务执行![/h2]
当我们使用Threading模块创建线程时或者自定义线程任务时,最好的方法就是建立一个线程类,并继承于threading.Thead,然后重写run方法,这是最推荐大家使用的方法(最优雅)。当然也可以使用其他方式创建!主要有以下几个常用函数:
  • threading.currentThread(): 返回当前的线程变量
  • threading.enumerate(): 返回一个包含正在运行的线程的list。正在运行指线程启动后、结束前,不包括启动前和终止后的线程
  • threading.activeCount(): 返回正在运行的线程数量,与len(threading.enumerate())有相同的结果
  • run(): 线程活动的函数,自定义时需要重写
  • start():启动线程活动,必须使用
  • join([time]): 等待至线程中止。这阻塞调用线程直至线程的join() 方法被调用中止,正常退出或者抛出未处理的异常,或者是可选的超时发生,只有子线程都运行完了,主线程(main)才会退出!
  • isAlive(): 返回线程是否活动的
  • getName(): 返回线程名
  • setName(): 设置线程名
代码示例
  1. import threadingimport timeclass MyThread(threading.Thread):    def __init__(self, n):        super(MyThread, self).__init__()  # 重构run函数必须要写        self.n = n    def run(self):        print("task", self.n)        time.sleep(1)        print('2s')        time.sleep(1)        print('1s')        time.sleep(1)        print('0s')        time.sleep(1)if __name__ == "__main__":    t1 = MyThread("t1")    t2 = MyThread("t2")    t1.start()    t2.start()    t1.join()    t2.join()
复制代码
[h2]队列模块(queue)[/h2][h2]在Python3中,队列模块为queue, 其包括三种队列类,分别为FIFO(先进先出)、LIFO(后入先出)和PriorityQueue。其常用的方法为:[/h2]
  • queue.qsize() 返回队列的大小
  • queue.empty() 如果队列为空,返回True,反之False
  • queue.full() 如果队列满了,返回True,反之False
  • queue.full 与 maxsize 大小对应
  • queue.get([block[, timeout]])获取队列,timeout等待时间
  • queue.get_nowait() 相当Queue.get(False)
  • queue.put(item) 写入队列,timeout等待时间
  • queue.put_nowait(item) 相当Queue.put(item, False)
代码示例:
  1. import queueq=queue.Queue(5)    #如果不设置长度,默认为无限长# q = queue.LifoQueue(5)  # 后进先出,类似于栈结构print(q.maxsize)    #注意没有括号q.put(123)q.put(456)q.put(789)q.put(100)q.put(111)q.put(233)print(q.get())print(q.get())
复制代码
很遗憾,上面的结构不是我想要的,因为在处理视频流中,如果我们的队列满了,我们需要从队头删除旧数据,在队尾插入新数据,并且每次获取队尾的数据,那么我们就需要一个双端队列了,而上面的queue模块显然不满足!!!
笔者在collections模块中找到了一个deque的子模块,其使用很简单,读者可以自行查阅!

deque的方法列表[h2]多线程处理摄像头读取[/h2]如果我们碰到了一个实时性要求不是那么高的,或者自己设备太差处理不过来图像时,我们可以考虑使用多线程读取摄像头画面!比如我们现在需要两个线程,一个用于实时读取视频流,另外一个每隔一秒钟处理一个最新的摄像头画面!
核心思路:我们使用双端队列来缓存数据,当缓存数据满时,我们从队头剔除数据,然后在队尾加入新数据,在获取时只读取队尾数据,这样就会一直处理当前帧!一定要注意线程退出时,需要在关闭摄像头的同时清空队列,而另一个线程进行队列是否为空的判断!

注意:由于这个队列是可读可写,当两个线程同时工作,会出现冲突,因此我们需要对公共变量加锁进行保护,使用threading.Lock()方法创建锁,加锁解锁的方法为acquire()和release()两种!
  1. import cv2import threadingfrom collections import dequelock = threading.Lock()class RealReadThread(threading.Thread):    def __init__(self, input):        super(RealReadThread).__init__()        self._jobq = input        # ip_camera_url = 'rtsp://admin:admin@192.168.1.64/'   # rtsp数据流        # 创建一个窗口        self.cap = cv2.VideoCapture(0)        threading.Thread.__init__(self)    def run(self):        cv2.namedWindow('ip_camera', flags=cv2.WINDOW_NORMAL | cv2.WINDOW_FREERATIO)        if not self.cap.isOpened():            print('请检查IP地址还有端口号,或者查看IP摄像头是否开启,另外记得使用sudo权限运行脚本')        while self.cap.isOpened():            ret, frame = self.cap.read()            lock.acquire()            if len(self._jobq) == 10:                self._jobq.popleft()            else:                self._jobq.append(frame)            lock.release()            cv2.imshow('ip_camera', frame)            if cv2.waitKey(1) == ord('q'):                # 退出程序                break        print("实时读取线程退出!!!!")        cv2.destroyWindow('ip_camera')        self._jobq.clear()    # 读取进程结束时清空队列        self.cap.release()class GetThread(threading.Thread):    def __init__(self, input):        super(GetThread).__init__()        self._jobq = input        threading.Thread.__init__(self)    def run(self):        cv2.namedWindow('get', flags=cv2.WINDOW_NORMAL | cv2.WINDOW_FREERATIO)        flag = False        while(True):            if len(self._jobq) != 0:                lock.acquire()                im_new = self._jobq.pop()                lock.release()                cv2.imshow("get", im_new)                cv2.waitKey(1000)                flag = True            elif flag == True and len(self._jobq) == 0:                break        print("间隔1s获取图像线程退出!!!!")if __name__ == "__main__":    q = deque([], 10)    th1 = RealReadThread(q)    th2 = GetThread(q)    th1.start()    th2.start()   # 开启两个线程    th1.join()    th2.join()
复制代码
[h2]演示效果[/h2][h2]一个画面进行实时读取,另一个画面隔一秒显示一次当前帧(队列尾部数据)!按 q 键退出并结束两个线程[/h2]显示效果图
分享到 :
0 人收藏
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

积分:
帖子:
精华:
期权论坛 期权论坛
发布
内容

下载期权论坛手机APP