用 Python 实现读写锁

论坛 期权论坛 期权     
Python中文社区   2019-6-16 04:36   3822   0


作者:weapon,闲来笑浮生悬笔一卷入毫端,朱绂临身可与言者不过二三。
博客:zhihu.com/people/hong-wei-peng
起步
Python 提供的多线程模型中并没有提供读写锁,读写锁相对于单纯的互斥锁,适用性更高,可以多个线程同时占用读模式的读写锁,但是只能一个线程占用写模式的读写锁。
通俗点说就是当没有写锁时,就可以加读锁且任意线程可以同时加;而写锁只能有一个线程,且必须在没有读锁时才能加上。
简单的实现
  1. import threading
  2. class RWlock(object):
  3.     def __init__(self):
  4.         self._lock = threading.Lock()
  5.         self._extra = threading.Lock()
  6.         self.read_num = 0
  7.     def read_acquire(self):
  8.         with self._extra:
  9.             self.read_num += 1
  10.             if self.read_num == 1:
  11.                 self._lock.acquire()
  12.     def read_release(self):
  13.         with self._extra:
  14.             self.read_num -= 1
  15.             if self.read_num == 0:
  16.                 self._lock.release()
  17.     def write_acquire(self):
  18.         self._lock.acquire()
  19.     def write_release(self):
  20.         self._lock.release()
复制代码
这是读写锁的一个简单的实现,
  1. self.read_num
复制代码
用来保存获得读锁的线程数,这个属性属于临界区,对其操作也要加锁,所以这里需要一个保护内部数据的额外的锁
  1. self._extra
复制代码

但是这个锁是不公平的。理想情况下,线程获得所的机会应该是一样的,不管线程是读操作还是写操作。而从上述代码可以看到,读请求都会立即设置
  1. self.read_num += 1
复制代码
,不管有没有获得锁,而写请求想要获得锁还得等待
  1. read_num
复制代码
为 0 。
所以这个就造成了只有锁没有被占用或者没有读请求时,可以获得写权限。我们应该想办法避免读模式锁长期占用。
读写锁的优先级
读写锁也有分 读优先 和 写优先。上面的代码就属于读优先。
如果要改成写优先,那就换成去记录写线程的引用计数,读和写在同时竞争时,可以让写线程增加写的计数,这样可使读线程的读锁一直获取不到, 因为读线程要先判断写的引用计数,若不为0,则等待其为 0,然后进行读。这部分代码不罗列了。
但这样显然不够灵活。我们不需要两个相似的读写锁类。我们希望重构我们代码,使它更强大。
改进
为了能够满足自定义优先级的读写锁,要记录等待的读写线程数,并且需要两个条件
  1. threading.Condition
复制代码
用来处理哪方优先的通知。计数引用可以扩大语义:正数:表示正在读操作的线程数,负数:表示正在写操作的线程数(最多-1)
在获取读操作时,先然后判断时候有等待的写线程,没有,进行读操作,有,则等待读的计数加 1 后等待
  1. Condition
复制代码
通知;等待读的计数减 1,计数引用加 1,继续读操作,若条件不成立,循环等待;
在获取写操作时,若锁没有被占用,引用计数减 1,若被占用,等待写线程数加 1,等待写条件
  1. Condition
复制代码
的通知。
读模式和写模式的释放都是一样,需要根据判断去通知对应的
  1. Condition
复制代码
  1. class RWLock(object):
  2.     def __init__(self):
  3.         self.lock = threading.Lock()
  4.         self.rcond = threading.Condition(self.lock)
  5.         self.wcond = threading.Condition(self.lock)
  6.         self.read_waiter = 0    # 等待获取读锁的线程数
  7.         self.write_waiter = 0   # 等待获取写锁的线程数
  8.         self.state = 0          # 正数:表示正在读操作的线程数   负数:表示正在写操作的线程数(最多-1)
  9.         self.owners = []        # 正在操作的线程id集合
  10.         self.write_first = True # 默认写优先,False表示读优先
  11.     def write_acquire(self, blocking=True):
  12.         # 获取写锁只有当
  13.         me = threading.get_ident()
  14.         with self.lock:
  15.             while not self._write_acquire(me):
  16.                 if not blocking:
  17.                     return False
  18.                 self.write_waiter += 1
  19.                 self.wcond.wait()
  20.                 self.write_waiter -= 1
  21.         return True
  22.     def _write_acquire(self, me):
  23.         # 获取写锁只有当锁没人占用,或者当前线程已经占用
  24.         if self.state == 0 or (self.state < 0 and me in self.owners):
  25.             self.state -= 1
  26.             self.owners.append(me)
  27.             return True
  28.         if self.state > 0 and me in self.owners:
  29.             raise RuntimeError('cannot recursively wrlock a rdlocked lock')
  30.         return False
  31.     def read_acquire(self, blocking=True):
  32.         me = threading.get_ident()
  33.         with self.lock:
  34.             while not self._read_acquire(me):
  35.                 if not blocking:
  36.                     return False
  37.                 self.read_waiter += 1
  38.                 self.rcond.wait()
  39.                 self.read_waiter -= 1
  40.         return True
  41.     def _read_acquire(self, me):
  42.         if self.state < 0:
  43.             # 如果锁被写锁占用
  44.             return False
  45.         if not self.write_waiter:
  46.             ok = True
  47.         else:
  48.             ok = me in self.owners
  49.         if ok or not self.write_first:
  50.             self.state += 1
  51.             self.owners.append(me)
  52.             return True
  53.         return False
  54.     def unlock(self):
  55.         me = threading.get_ident()
  56.         with self.lock:
  57.             try:
  58.                 self.owners.remove(me)
  59.             except ValueError:
  60.                 raise RuntimeError('cannot release un-acquired lock')
  61.             if self.state > 0:
  62.                 self.state -= 1
  63.             else:
  64.                 self.state += 1
  65.             if not self.state:
  66.                 if self.write_waiter and self.write_first:   # 如果有写操作在等待(默认写优先)
  67.                     self.wcond.notify()
  68.                 elif self.read_waiter:
  69.                     self.rcond.notify_all()
  70.                 elif self.write_waiter:
  71.                     self.wcond.notify()
  72.     read_release = unlock
  73.     write_release = unlock
复制代码
热 门 推 荐
用Python创建微信机器人

用Python机器人监听微信群聊

用Python获取摄像头并实时控制人脸

开源项目 | 用Python美化LeetCode仓库

推荐Python中文社区旗下的几个服务类公众号
征稿启事 | Python中文社区有奖征文


▼ 点击成为社区注册会员          「在看」一下,一起PY!
分享到 :
0 人收藏
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

积分:200
帖子:40
精华:0
期权论坛 期权论坛
发布
内容

下载期权论坛手机APP