深度解密 Go 語言之 sync.map

工作中,經常會碰到併發讀寫 map 而造成 panic 的情況,為什麼在併發讀寫的時候,會 panic 呢?因為在併發讀寫的情況下,map 里的數據會被寫亂,之後就是 Garbage in, garbage out,還不如直接 panic 了。

目錄

  • 是什麼
  • 有什麼用
  • 如何使用
  • 源碼分析
    • 數據結構
    • Store
    • Load
    • Delete
    • LoadOrStore
    • Range
  • 其他
  • 總結
  • 參考資料

是什麼

Go 語言原生 map 並不是線程安全的,對它進行併發讀寫操作的時候,需要加鎖。而 sync.map 則是一種併發安全的 map,在 Go 1.9 引入。

sync.map 是線程安全的,讀取,插入,刪除也都保持着常數級的時間複雜度。
sync.map 的零值是有效的,並且零值是一個空的 map。在第一次使用之後,不允許被拷貝。

有什麼用

一般情況下解決併發讀寫 map 的思路是加一把大鎖,或者把一個 map 分成若干個小 map,對 key 進行哈希,只操作相應的小 map。前者鎖的粒度比較大,影響效率;後者實現起來比較複雜,容易出錯。

而使用 sync.map 之後,對 map 的讀寫,不需要加鎖。並且它通過空間換時間的方式,使用 read 和 dirty 兩個 map 來進行讀寫分離,降低鎖時間來提高效率。

如何使用

使用非常簡單,和普通 map 相比,僅遍歷的方式略有區別:

package main

import (
	"fmt"
	"sync"
)

func main()  {
	var m sync.Map
	// 1. 寫入
	m.Store("qcrao", 18)
	m.Store("stefno", 20)

	// 2. 讀取
	age, _ := m.Load("qcrao")
	fmt.Println(age.(int))

	// 3. 遍歷
	m.Range(func(key, value interface{}) bool {
		name := key.(string)
		age := value.(int)
		fmt.Println(name, age)
		return true
	})

	// 4. 刪除
	m.Delete("qcrao")
	age, ok := m.Load("qcrao")
	fmt.Println(age, ok)

	// 5. 讀取或寫入
	m.LoadOrStore("stefno", 100)
	age, _ = m.Load("stefno")
	fmt.Println(age)
}

第 1 步,寫入兩個 k-v 對;

第 2 步,使用 Load 方法讀取其中的一個 key;

第 3 步,遍歷所有的 k-v 對,並打印出來;

第 4 步,刪除其中的一個 key,再讀這個 key,得到的就是 nil;

第 5 步,使用 LoadOrStore,嘗試讀取或寫入 “Stefno”,因為這個 key 已經存在,因此寫入不成功,並且讀出原值。

程序輸出:

18
stefno 20
qcrao 18
<nil> false
20

sync.map 適用於讀多寫少的場景。對於寫多的場景,會導致 read map 緩存失效,需要加鎖,導致衝突變多;而且由於未命中 read map 次數過多,導致 dirty map 提升為 read map,這是一個 O(N) 的操作,會進一步降低性能。

源碼分析

數據結構

先來看下 map 的數據結構。去掉大段的註釋后:

type Map struct {
	mu Mutex
	read atomic.Value // readOnly
	dirty map[interface{}]*entry
	misses int
}

互斥量 mu 保護 read 和 dirty。

read 是 atomic.Value 類型,可以併發地讀。但如果需要更新 read,則需要加鎖保護。對於 read 中存儲的 entry 字段,可能會被併發地 CAS 更新。但是如果要更新一個之前已被刪除的 entry,則需要先將其狀態從 expunged 改為 nil,再拷貝到 dirty 中,然後再更新。

dirty 是一個非線程安全的原始 map。包含新寫入的 key,並且包含 read 中的所有未被刪除的 key。這樣,可以快速地將 dirty 提升為 read 對外提供服務。如果 dirty 為 nil,那麼下一次寫入時,會新建一個新的 dirty,這個初始的 dirtyread 的一個拷貝,但除掉了其中已被刪除的 key。

每當從 read 中讀取失敗,都會將 misses 的計數值加 1,當加到一定閾值以後,需要將 dirty 提升為 read,以期減少 miss 的情形。

read mapdirty map 的存儲方式是不一致的。
前者使用 atomic.Value,後者只是單純的使用 map。
原因是 read map 使用 lock free 操作,必須保證 load/store 的原子性;而 dirty map 的 load+store 操作是由 lock(就是 mu)來保護的。

真正存儲 key/value 的是 read 和 dirty 字段。read 使用 atomic.Value,這是 lock-free 的基礎,保證 load/store 的原子性。dirty 則直接用了一個原始的 map,對於它的 load/store 操作需要加鎖。

read 字段里實際上是存儲的是:

// readOnly is an immutable struct stored atomically in the Map.read field.
type readOnly struct {
	m       map[interface{}]*entry
	amended bool // true if the dirty map contains some key not in m.
}

注意到 read 和 dirty 里存儲的東西都包含 entry,來看一下:

type entry struct {
	p unsafe.Pointer // *interface{}
}

很簡單,它是一個指針,指向 value。看來,read 和 dirty 各自維護一套 key,key 指向的都是同一個 value。也就是說,只要修改了這個 entry,對 read 和 dirty 都是可見的。這個指針的狀態有三種:

p == nil 時,說明這個鍵值對已被刪除,並且 m.dirty == nil,或 m.dirty[k] 指向該 entry。

p == expunged 時,說明這條鍵值對已被刪除,並且 m.dirty != nil,且 m.dirty 中沒有這個 key。

其他情況,p 指向一個正常的值,表示實際 interface{} 的地址,並且被記錄在 m.read.m[key] 中。如果這時 m.dirty 不為 nil,那麼它也被記錄在 m.dirty[key] 中。兩者實際上指向的是同一個值。

當刪除 key 時,並不實際刪除。一個 entry 可以通過原子地(CAS 操作)設置 p 為 nil 被刪除。如果之後創建 m.dirty,nil 又會被原子地設置為 expunged,且不會拷貝到 dirty 中。

如果 p 不為 expunged,和 entry 相關聯的這個 value 可以被原子地更新;如果 p == expunged,那麼僅當它初次被設置到 m.dirty 之後,才可以被更新。

整體用一張圖來表示:

Store

先來看 expunged:

var expunged = unsafe.Pointer(new(interface{}))

它是一個指向任意類型的指針,用來標記從 dirty map 中刪除的 entry。

// Store sets the value for a key.
func (m *Map) Store(key, value interface{}) {
	// 如果 read map 中存在該 key  則嘗試直接更改(由於修改的是 entry 內部的 pointer,因此 dirty map 也可見)
	read, _ := m.read.Load().(readOnly)
	if e, ok := read.m[key]; ok && e.tryStore(&value) {
		return
	}

	m.mu.Lock()
	read, _ = m.read.Load().(readOnly)
	if e, ok := read.m[key]; ok {
		if e.unexpungeLocked() {
			// 如果 read map 中存在該 key,但 p == expunged,則說明 m.dirty != nil 並且 m.dirty 中不存在該 key 值 此時:
			//    a. 將 p 的狀態由 expunged  更改為 nil
			//    b. dirty map 插入 key
			m.dirty[key] = e
		}
		// 更新 entry.p = value (read map 和 dirty map 指向同一個 entry)
		e.storeLocked(&value)
	} else if e, ok := m.dirty[key]; ok {
		// 如果 read map 中不存在該 key,但 dirty map 中存在該 key,直接寫入更新 entry(read map 中仍然沒有這個 key)
		e.storeLocked(&value)
	} else {
		// 如果 read map 和 dirty map 中都不存在該 key,則:
		//	  a. 如果 dirty map 為空,則需要創建 dirty map,並從 read map 中拷貝未刪除的元素到新創建的 dirty map
		//    b. 更新 amended 字段,標識 dirty map 中存在 read map 中沒有的 key
		//    c. 將 kv 寫入 dirty map 中,read 不變
		if !read.amended {
		    // 到這裏就意味着,當前的 key 是第一次被加到 dirty map 中。
			// store 之前先判斷一下 dirty map 是否為空,如果為空,就把 read map 淺拷貝一次。
			m.dirtyLocked()
			m.read.Store(readOnly{m: read.m, amended: true})
		}
		// 寫入新 key,在 dirty 中存儲 value
		m.dirty[key] = newEntry(value)
	}
	m.mu.Unlock()
}

整體流程:

  1. 如果在 read 里能夠找到待存儲的 key,並且對應的 entry 的 p 值不為 expunged,也就是沒被刪除時,直接更新對應的 entry 即可。
  2. 第一步沒有成功:要麼 read 中沒有這個 key,要麼 key 被標記為刪除。則先加鎖,再進行後續的操作。
  3. 再次在 read 中查找是否存在這個 key,也就是 double check 一下,這也是 lock-free 編程里的常見套路。如果 read 中存在該 key,但 p == expunged,說明 m.dirty != nil 並且 m.dirty 中不存在該 key 值 此時: a. 將 p 的狀態由 expunged 更改為 nil;b. dirty map 插入 key。然後,直接更新對應的 value。
  4. 如果 read 中沒有此 key,那就查看 dirty 中是否有此 key,如果有,則直接更新對應的 value,這時 read 中還是沒有此 key。
  5. 最後一步,如果 read 和 dirty 中都不存在該 key,則:a. 如果 dirty 為空,則需要創建 dirty,並從 read 中拷貝未被刪除的元素;b. 更新 amended 字段,標識 dirty map 中存在 read map 中沒有的 key;c. 將 k-v 寫入 dirty map 中,read.m 不變。最後,更新此 key 對應的 value。

再來看一些子函數:

// 如果 entry 沒被刪,tryStore 存儲值到 entry 中。如果 p == expunged,即 entry 被刪,那麼返回 false。
func (e *entry) tryStore(i *interface{}) bool {
	for {
		p := atomic.LoadPointer(&e.p)
		if p == expunged {
			return false
		}
		if atomic.CompareAndSwapPointer(&e.p, p, unsafe.Pointer(i)) {
			return true
		}
	}
}

tryStore 在 Store 函數最開始的時候就會調用,是比較常見的 for 循環加 CAS 操作,嘗試更新 entry,讓 p 指向新的值。

unexpungeLocked 函數確保了 entry 沒有被標記成已被清除:

// unexpungeLocked 函數確保了 entry 沒有被標記成已被清除。
// 如果 entry 先前被清除過了,那麼在 mutex 解鎖之前,它一定要被加入到 dirty map 中
func (e *entry) unexpungeLocked() (wasExpunged bool) {
	return atomic.CompareAndSwapPointer(&e.p, expunged, nil)
}

Load

func (m *Map) Load(key interface{}) (value interface{}, ok bool) {
	read, _ := m.read.Load().(readOnly)
	e, ok := read.m[key]
	// 如果沒在 read 中找到,並且 amended 為 true,即 dirty 中存在 read 中沒有的 key
	if !ok && read.amended {
		m.mu.Lock() // dirty map 不是線程安全的,所以需要加上互斥鎖
		// double check。避免在上鎖的過程中 dirty map 提升為 read map。
		read, _ = m.read.Load().(readOnly)
		e, ok = read.m[key]
		// 仍然沒有在 read 中找到這個 key,並且 amended 為 true
		if !ok && read.amended {
			e, ok = m.dirty[key] // 從 dirty 中找
			// 不管 dirty 中有沒有找到,都要"記一筆",因為在 dirty 提升為 read 之前,都會進入這條路徑
			m.missLocked()
		}
		m.mu.Unlock()
	}
	if !ok { // 如果沒找到,返回空,false
		return nil, false
	}
	return e.load()
}

處理路徑分為 fast path 和 slow path,整體流程如下:

  1. 首先是 fast path,直接在 read 中找,如果找到了直接調用 entry 的 load 方法,取出其中的值。
  2. 如果 read 中沒有這個 key,且 amended 為 fase,說明 dirty 為空,那直接返回 空和 false。
  3. 如果 read 中沒有這個 key,且 amended 為 true,說明 dirty 中可能存在我們要找的 key。當然要先上鎖,再嘗試去 dirty 中查找。在這之前,仍然有一個 double check 的操作。若還是沒有在 read 中找到,那麼就從 dirty 中找。不管 dirty 中有沒有找到,都要”記一筆”,因為在 dirty 被提升為 read 之前,都會進入這條路徑

這裏主要看下 missLocked 的函數的實現:

func (m *Map) missLocked() {
	m.misses++
	if m.misses < len(m.dirty) {
		return
	}
	// dirty map 晉陞
	m.read.Store(readOnly{m: m.dirty})
	m.dirty = nil
	m.misses = 0
}

直接將 misses 的值加 1,表示一次未命中,如果 misses 值小於 m.dirty 的長度,就直接返回。否則,將 m.dirty 晉陞為 read,並清空 dirty,清空 misses 計數值。這樣,之前一段時間新加入的 key 都會進入到 read 中,從而能夠提升 read 的命中率。

再來看下 entry 的 load 方法:

func (e *entry) load() (value interface{}, ok bool) {
	p := atomic.LoadPointer(&e.p)
	if p == nil || p == expunged {
		return nil, false
	}
	return *(*interface{})(p), true
}

對於 nil 和 expunged 狀態的 entry,直接返回 ok=false;否則,將 p 轉成 interface{} 返回。

Delete

// Delete deletes the value for a key.
func (m *Map) Delete(key interface{}) {
	read, _ := m.read.Load().(readOnly)
	e, ok := read.m[key]
	// 如果 read 中沒有這個 key,且 dirty map 不為空
	if !ok && read.amended {
		m.mu.Lock()
		read, _ = m.read.Load().(readOnly)
		e, ok = read.m[key]
		if !ok && read.amended {
			delete(m.dirty, key) // 直接從 dirty 中刪除這個 key
		}
		m.mu.Unlock()
	}
	if ok {
		e.delete() // 如果在 read 中找到了這個 key,將 p 置為 nil
	}
}

可以看到,基本套路還是和 Load,Store 類似,都是先從 read 里查是否有這個 key,如果有則執行 entry.delete 方法,將 p 置為 nil,這樣 read 和 dirty 都能看到這個變化。

如果沒在 read 中找到這個 key,並且 dirty 不為空,那麼就要操作 dirty 了,操作之前,還是要先上鎖。然後進行 double check,如果仍然沒有在 read 里找到此 key,則從 dirty 中刪掉這個 key。但不是真正地從 dirty 中刪除,而是更新 entry 的狀態。

來看下 entry.delete 方法:

func (e *entry) delete() (hadValue bool) {
	for {
		p := atomic.LoadPointer(&e.p)
		if p == nil || p == expunged {
			return false
		}
		if atomic.CompareAndSwapPointer(&e.p, p, nil) {
			return true
		}
	}
}

它真正做的事情是將正常狀態(指向一個 interface{})的 p 設置成 nil。沒有設置成 expunged 的原因是,當 p 為 expunged 時,表示它已經不在 dirty 中了。這是 p 的狀態機決定的,在 tryExpungeLocked 函數中,會將 nil 原子地設置成 expunged。

tryExpungeLocked 是在新創建 dirty 時調用的,會將已被刪除的 entry.p 從 nil 改成 expunged,這個 entry 就不會寫入 dirty 了。

func (e *entry) tryExpungeLocked() (isExpunged bool) {
	p := atomic.LoadPointer(&e.p)
	for p == nil {
		// 如果原來是 nil,說明原 key 已被刪除,則將其轉為 expunged。
		if atomic.CompareAndSwapPointer(&e.p, nil, expunged) {
			return true
		}
		p = atomic.LoadPointer(&e.p)
	}
	return p == expunged
}

注意到如果 key 同時存在於 read 和 dirty 中時,刪除只是做了一個標記,將 p 置為 nil;而如果僅在 dirty 中含有這個 key 時,會直接刪除這個 key。原因在於,若兩者都存在這個 key,僅做標記刪除,可以在下次查找這個 key 時,命中 read,提升效率。若只有在 dirty 中存在時,read 起不到“緩存”的作用,直接刪除。

LoadOrStore

這個函數結合了 Load 和 Store 的功能,如果 map 中存在這個 key,那麼返回這個 key 對應的 value;否則,將 key-value 存入 map。這在需要先執行 Load 查看某個 key 是否存在,之後再更新此 key 對應的 value 時很有效,因為 LoadOrStore 可以併發執行。

具體的過程不再一一分析了,可參考 Load 和 Store 的源碼分析。

Range

Range 的參數是一個函數:

f func(key, value interface{}) bool

由使用者提供實現,Range 將遍歷調用時刻 map 中的所有 k-v 對,將它們傳給 f 函數,如果 f 返回 false,將停止遍歷。

func (m *Map) Range(f func(key, value interface{}) bool) {
	read, _ := m.read.Load().(readOnly)
	if read.amended {
		m.mu.Lock()
		read, _ = m.read.Load().(readOnly)
		if read.amended {
			read = readOnly{m: m.dirty}
			m.read.Store(read)
			m.dirty = nil
			m.misses = 0
		}
		m.mu.Unlock()
	}

	for k, e := range read.m {
		v, ok := e.load()
		if !ok {
			continue
		}
		if !f(k, v) {
			break
		}
	}
}

當 amended 為 true 時,說明 dirty 中含有 read 中沒有的 key,因為 Range 會遍歷所有的 key,是一個 O(n) 操作。將 dirty 提升為 read,會將開銷分攤開來,所以這裏直接就提升了。

之後,遍歷 read,取出 entry 中的值,調用 f(k, v)。

其他

關於為何 sync.map 沒有 Len 方法,參考資料里給出了 issue,bcmills 認為對於併發的數據結構和非併發的數據結構並不一定要有相同的方法。例如,map 有 Len 方法,sync.map 卻不一定要有。就像 sync.map 有 LoadOrStore 方法,map 就沒有一樣。

有些實現增加了一個計數器,並原子地增加或減少它,以此來表示 sync.map 中元素的個數。但 bcmills 提出這會引入競爭:atomic 並不是 contention-free 的,它只是把競爭下沉到了 CPU 層級。這會給其他不需要 Len 方法的場景帶來負擔。

總結

  1. sync.map 是線程安全的,讀取,插入,刪除也都保持着常數級的時間複雜度。

  2. 通過讀寫分離,降低鎖時間來提高效率,適用於讀多寫少的場景。

  3. Range 操作需要提供一個函數,參數是 k,v,返回值是一個布爾值:f func(key, value interface{}) bool

  4. 調用 Load 或 LoadOrStore 函數時,如果在 read 中沒有找到 key,則會將 misses 值原子地增加 1,當 misses 增加到和 dirty 的長度相等時,會將 dirty 提升為 read。以期減少“讀 miss”。

  5. 新寫入的 key 會保存到 dirty 中,如果這時 dirty 為 nil,就會先新創建一個 dirty,並將 read 中未被刪除的元素拷貝到 dirty。

  6. 當 dirty 為 nil 的時候,read 就代表 map 所有的數據;當 dirty 不為 nil 的時候,dirty 才代表 map 所有的數據。

參考資料

【德志大佬-設計併發安全的 map】https://halfrost.com/go_map_chapter_one/

【德志大佬-設計併發安全的 map】https://halfrost.com/go_map_chapter_two/

【關於 sync.map 為什麼沒有 len 方法的 issue】https://github.com/golang/go/issues/20680

【芮神增加了 len 方法】http://xiaorui.cc/archives/4972

【圖解 map 操作】https://wudaijun.com/2018/02/go-sync-map-implement/

【從一道面試題開始】https://segmentfault.com/a/1190000018657984

【源碼分析】https://zhuanlan.zhihu.com/p/44585993

【行文通暢,流程圖清晰】https://juejin.im/post/5d36a7cbf265da1bb47da444

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

網頁設計一頭霧水該從何著手呢? 台北網頁設計公司幫您輕鬆架站!

網頁設計公司推薦不同的風格,搶佔消費者視覺第一線

※Google地圖已可更新顯示潭子電動車充電站設置地點!!

※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益

※別再煩惱如何寫文案,掌握八大原則!

網頁設計最專業,超強功能平台可客製化

您可能也會喜歡…