sync.Map

Posted by Useless Programmer on September 22, 2018

    golang 线程安全的 Map

章节目录

  1. Map 基本类型定义
  2. Store
  3. Load
  4. Delete
  5. Range

Map 基本类型定义##

Map

这里我们先看一下 Map 都涉及到那些类型.

type Map struct {
    // 互斥锁. 用于互斥的读写 dirty.
    mu Mutex

    // 值得声明的一点, read 和 dirty 中的 entry 是同一个.
    read atomic.Value   // 该属性的实际类型为 readOnly 类型.
    dirty map[interface{}]*entry
    misses int
}

readOnly

type readOnly struct {
    m       map[interface{}]*entry
    amended bool // 表示 dirty 中是否包含一些不存在于 read 中的 key.
}

entry

// 用来表示 Map 中的一个 Value 值.
type entry struct {
    p unsafe.Pointer // *interface{}
}

vars

// 当某个 entry 的值为 expunged, 表示该 entry 已经被标记为删除状态, 等待被删除.
var expunged = unsafe.Pointer(new(interface{}))

Map : Store()##

Store 流程:

  1. 先在 read 中查找相应的 key, 如果该 key 已存在且该 entry 没有被标记为已删除, 则更新该 entry 的值并返回.
  2. 然后在 dirty 中寻找该 key, 如果该 key 存在于 dirty 中, 则更新该 entry 并返回.
  3. 如果 read 和 dirty 中都不包含该 key:
    a) 如果当前 read 中包含的 key 和 dirty 中包含的 key 相同且 dirty 不为空, 则将 read 中不为空的 entry 复制到 dirty 中. 并将
    b) 将新的 key-value 存在 dirty 中.
    这里对于 read 的操作是不需要加锁的.而对 dirty 的操作是需要加锁的.
    对于 read 的操作不会涉及到添加新的 key, 只是读取或者更新 entry 的值. 而 entry 是 atomic.Value 类型的, 通过相应的原子操作方法进行,因此不需要加锁. 而对 dirty 的操作会涉及到往一个普通 map 中添加新的 key-value, 因此需要加锁.
// 创建一个 entry
func newEntry(i interface{}) *entry {
    return &entry{p: unsafe.Pointer(&i)}
}

// Map 中的 entry 有两种合法的状态 : expunged(被标记为删除状态), 正常状态.
// 如果 entry 被标记为删除状态, tryStore 返回 false
// 否则, 会将 i 保存到 map 中, tryStore 返回 true

func (e *entry) tryStore(i *interface{}) bool {
    p := atomic.LoadPointer(&e.p)
    if p == expunged {
        return false
    }
    for {
        // 使用 CAS 设置 entry 的值. 如果设置成功, 返回 true
        if atomic.CompareAndSwapPointer(&e.p, p, unsafe.Pointer(i)) {
            return true
        }
        // 这里的理解有待进一步验证.
        // 在刚刚进入方法 tryStore 时我们取出 entry 的值, 存储于 p.
        // 但是有可能在从第一句代码到使用 CAS 设置 entry 的值期间, 
        // entry.p 被其他线程修改(删除), 导致 CAS 操作失败.
        p = atomic.LoadPointer(&e.p)
        if p == expunged {
            return false
        }
    }
}

// 判断该 entry 的值是否已经被标记为删除状态.
// 如果该 entry 的值为 expunged, 说明该 entry 的值已经被标记为删除, 此时将该 entry 置为 nil.
func (e *entry) unexpungeLocked() (wasExpunged bool) {
    return atomic.CompareAndSwapPointer(&e.p, expunged, nil)
}

func (e *entry) storeLocked(i *interface{}) {
    atomic.StorePointer(&e.p, unsafe.Pointer(i))
}

// 判断该 entry 的值是否已经被标记为删除状态
// 如果该 entry 的值为 nil, 则将该 entry 修改为 expunged.
func (e *entry) tryExpungeLocked() (isExpunged bool) {
    p := atomic.LoadPointer(&e.p)
    for p == nil {
        if atomic.CompareAndSwapPointer(&e.p, nil, expunged) {
            return true
        }
        p = atomic.LoadPointer(&e.p)
    }
    return p == expunged
}

// 如果 dirty map为空,则把 read 中数据拷贝到 dirty 中.
func (m *Map) dirtyLocked() {
    if m.dirty != nil {
        return
    }

    read, _ := m.read.Load().(readOnly)
    m.dirty = make(map[interface{}]*entry, len(read.m))
    for k, e := range read.m {
        // 将所有的 nil 的 entry 设置为 expunged
        // 拷贝不为 expunged 的 entry
        if !e.tryExpungeLocked() {
            m.dirty[k] = e
        }
    }
}

func (m *Map) Store(key, value interface{}) {
    read, _ := m.read.Load().(readOnly)

    // 如果 Map 中相应的 key 已经存在, 且没有被标记为删除状态, 
    // 将 value 写入, 并返回.
    if e, ok := read.m[key]; ok && e.tryStore(&value) {
        return
    }

    // 如果 read 中没有对应的 key, 此时则需要往 dirty 中写入 key-value.
    // 注意, 对于 dirty 的操作需要加锁.
    m.mu.Lock()

    // 在获取锁期间, 有可能 read 被其他线程修改, 
    // 因此在此检查 key 是否已经存在于 read 中.
    read, _ = m.read.Load().(readOnly)

    //在获取锁期间, read 中被存入 key 相关的 entry.
    if e, ok := read.m[key]; ok {
        // 如果 entry 被标记为删除状态, 删除原对象, 并写入新对象 e.
        if e.unexpungeLocked() { 
            m.dirty[key] = e
        }
        // 如果 entry 未被标记为删除状态, 说明原先 entry 也是一个合法数据, 
        // 则直接更新该 entry 的值.
        e.storeLocked(&value)
    } else if e, ok := m.dirty[key]; ok { // read 中不存在 key, 而 dirty 中存在该 key. 
        // 直接更新该 key 所对应的 entry
        e.storeLocked(&value)
    } else { // read 和 dirty 中都不存在该 key.
        if !read.amended {
            m.dirtyLocked()
            m.read.Store(readOnly{m: read.m, amended: true}) // 将 amended 值为true, 标记 dirty 中存在 read 没有的 key.
        }
        m.dirty[key] = newEntry(value)
    }
    m.mu.Unlock()
}

Map 的方法: Load()##

Load 流程:

  1. 先在 read 中查找该 key, 如果找到直接返回.
  2. 否则返回在 dirty 中查找该 key 的结果(可能为空), 并增加 misses 的值. 当misses 的值大于 dirty 的长度时, 将 dirty 中数据转储于 read 中.(之所以转存, 是因为访问 read 不需要锁).
// 增加 misses 计数. 
// 在 misses 大于 dirty 长度时, 将 dirty 复制到 read 中. 并清空 dirty, 重置 misses
func (m *Map) missLocked() {
    m.misses++
    if m.misses < len(m.dirty) {
        return
    }
    m.read.Store(readOnly{m: m.dirty}) // 注意, 这里隐式的将 amended 置为 false.
    m.dirty = nil
    m.misses = 0
}

// 该方法用来在 Map 中查找与 key 关联的 value. 未找到, 返回 nil.
// ok 返回值表示是否找到相关的 key.
func (m *Map) Load(key interface{}) (value interface{}, ok bool) {
    // 查找 key 时, 总是先在 Map.read 字段中查找. 找到则返回该 value.
    read, _ := m.read.Load().(readOnly)
    e, ok := read.m[key]

    // 如果 read 中不存在该 key, 而 dirty 中存在该 key
    if !ok && read.amended {
        m.mu.Lock()
        read, _ = m.read.Load().(readOnly)
        e, ok = read.m[key]
        if !ok && read.amended {
            e, ok = m.dirty[key]
            m.missLocked()
        }
        m.mu.Unlock()
    }
    if !ok {
        return nil, false
    }
    return e.load()
}

Map 的方法: Delete()##

func (e *entry) delete() (hadValue bool) {
    for {
        p := atomic.LoadPointer(&e.p)
        // 如果该元素已经被标记为删除状态, 直接返回.
        if p == nil || p == expunged {
            return false
        }
        // 如果该元素为正常状态, 则将该 entry 置为 nil
        if atomic.CompareAndSwapPointer(&e.p, p, nil) {
            return true
        }
    }
}

func (m *Map) Delete(key interface{}) {
    read, _ := m.read.Load().(readOnly)
    e, ok := read.m[key]

    // read 中不存在该 key
    if !ok && read.amended {
        m.mu.Lock()
        read, _ = m.read.Load().(readOnly)
        e, ok = read.m[key]
        if !ok && read.amended {
            delete(m.dirty, key)
        }
        m.mu.Unlock()
    }

    // read 中存在该 key
    if ok {
        e.delete()
    }
}

Map 的方法: Range()##


func (m *Map) Range(f func(key, value interface{}) bool) {
    read, _ := m.read.Load().(readOnly)
    // 如果 Map 元素存储于 dirty 中, 先将数据转存到 read 中.
    if read.amended {
        m.mu.Lock()
        read, _ = m.read.Load().(readOnly)
        if read.amended {
            read = readOnly{m: m.dirty}
            m.read.Store(read)
            m.dirty = nil
            m.misses = 0
        }
        m.mu.Unlock()
    }

    for k, e := range read.m {
        v, ok := e.load()
        if !ok {
            continue
        }
        if !f(k, v) {
            break
        }
    }
}

感谢 https://juejin.im/post/5b1b3d785188257d45297d0a 文章作者的分享.