golang 并发安全Map以及分段锁的实现方法

论坛 期权论坛 脚本     
niminba   2021-5-23 03:00   925   0

涉及概念

  1. 并发安全Map
  2. 分段锁
  3. sync.Map
  4. CAS ( Compare And Swap )
  5. 双检查

分断锁

type SimpleCache struct {
  mu  sync.RWMutex
  items map[interface{}]*simpleItem
}

在日常开发中, 上述这种数据结构肯定不少见,因为golang的原生map是非并发安全的,所以为了保证map的并发安全,最简单的方式就是给map加锁。

之前使用过两个本地内存缓存的开源库, gcache, cache2go,其中存储缓存对象的结构都是这样,对于轻量级的缓存库,为了设计简洁(包含清理过期对象等 ) 再加上当需要缓存大量数据时有redis,memcache等明星项目解决。 但是如果抛开这些因素遇到真正数量巨大的数据量时,直接对一个map加锁,当map中的值越来越多,访问map的请求越来越多,大家都竞争这一把锁显得并发访问控制变重。 在go1.9引入sync.Map 之前,比较流行的做法就是使用分段锁,顾名思义就是将锁分段,将锁的粒度变小,将存储的对象分散到各个分片中,每个分片由一把锁控制,这样使得当需要对在A分片上的数据进行读写时不会影响B分片的读写。

分段锁的实现

// Map 分片
type ConcurrentMap []*ConcurrentMapShared

// 每一个Map 是一个加锁的并发安全Map
type ConcurrentMapShared struct {
  items map[string]interface{}
  sync.RWMutex  // 各个分片Map各自的锁
}

主流的分段锁,即通过hash取模的方式找到当前访问的key处于哪一个分片之上,再对该分片进行加锁之后再读写。分片定位时,常用有BKDR, FNV32等hash算法得到key的hash值。

func New() ConcurrentMap {
  // SHARD_COUNT 默认32个分片
  m := make(ConcurrentMap, SHARD_COUNT)
  for i := 0; i < SHARD_COUNT; i++ {
    m[i] = &ConcurrentMapShared{
      items: make(map[string]interface{}),
    }
  }
  return m
}

在初始化好分片后, 对分片上的数据进行读写时就需要用hash取模进行分段定位来确认即将要读写的分片。

获取段定位

func (m ConcurrentMap) GetShard(key string) *ConcurrentMapShared {
  return m[uint(fnv32(key))%uint(SHARD_COUNT)]
}

// FNV hash
func fnv32(key string) uint32 {
  hash := uint32(2166136261)
  const prime32 = uint32(16777619)
  for i := 0; i < len(key); i++ {
    hash *= prime32
    hash ^= uint32(key[i])
  }
  return hash
}

之后对于map的GET SET 就简单顺利成章的完成

Set And Get

func (m ConcurrentMap) Set(key string, value interface{}) {
  shard := m.GetShard(key) // 段定位找到分片
  shard.Lock()       // 分片上锁
  shard.items[key] = value // 分片操作 
  shard.Unlock()       // 分片解锁
}

func (m ConcurrentMap) Get(key string) (interface{}, bool) {
  shard := m.GetShard(key)
  shard.RLock()
  val, ok := shard.items[key]
  shard.RUnlock()
  return val, ok
}

由此一个分段锁Map就实现了, 但是比起普通的Map, 常用到的方法比如获取所有key, 获取所有Val 操作是要比原生Map复杂的,因为要遍历每一个分片的每一个数据, 好在golang的并发特性使得解决这类问题变得非常简单

Keys

// 统计当前分段map中item的个数
func (m ConcurrentMap) Count() int {
  count := 0
  for i := 0; i < SHARD_COUNT; i++ {
    shard := m[i]
    shard.RLock()
    count += len(shard.items)
    shard.RUnlock()
  }
  return count
}

// 获取所有的key
func (m ConcurrentMap) Keys() []string {
  count := m.Count()
  ch := make(chan string, count)

  // 每一个分片启动一个协程 遍历key
  go func() {
    wg := sync.WaitGroup{}
    wg.Add(SHARD_COUNT)
    for _, shard := range m {

      go func(shard *ConcurrentMapShared) {
        defer wg.Done()
        
        shard.RLock()

        // 每个分片中的key遍历后都写入统计用的channel
        for key := range shard.items {
          ch <- key
        }

        shard.RUnlock()
      }(shard)
    }
    wg.Wait()
    close(ch)
  }()

  keys := make([]string, count)
  // 统计各个协程并发读取Map分片的key
  for k := range ch {
    keys = append(keys, k)
  }
  return keys
}

这里写~B"f"fjV6_1t(4(4)11A4("fV6:CN7n>GN7_mV6 M4(1A4(4)4(5I4(4)51|1=(m(4(>6r''QkjV6Rr&14(>3~nR&7j>bv{:Cj4(|1=(m(>r'["jV((~bB>Ejv(14(U4(4(1"fjSny4(4))514(4(M=聴4(4()4(51Q4(4)5|1=(m(>crr":f4(14(>3~nR&7j>bv{:Cj4(|1=(m((U4(4(4(4)Y1A4(4( M4(4(B3"k"k:/:R*R5J3"RjZ?v:/54(4(е聍顽>?4(:/vv5J3"R"bS"R幌5"gRjn*j"6j*kV6>^R2[N70B3^S"R.r'n.O"R&7k"&V?rk&[?^5&/n疒7v 4(Σ?r5^".r'5Σ+"t5ЁЁЁwn5.B;z/jcb J0R6b&7jJ3RrV3#"D&>G'g6k"G*)57"*+r'"CFcRC>[&O62J#4(4(е聍顽ь>?4(е聍顽ńй>?4(е聱brZj3ro疒j惚r'&*rokkR2

分享到 :
0 人收藏
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

积分:1060120
帖子:212021
精华:0
期权论坛 期权论坛
发布
内容

下载期权论坛手机APP