2019-03-11 15:26:19 2893瀏覽
今天扣丁學(xué)堂區(qū)塊鏈培訓(xùn)老師給大家介紹一篇關(guān)于golang 并發(fā)安全Map以及分段鎖的實(shí)現(xiàn)方法詳解,首先涉及概念:并發(fā)安全Map、分段鎖、sync.Map、CAS(CompareAndSwap)、雙檢查等下內(nèi)容,下面我們一起來(lái)看一下吧。
type SimpleCache struct { mu sync.RWMutex items map[interface{}]*simpleItem }
之前使用過(guò)兩個(gè)本地內(nèi)存緩存的開(kāi)源庫(kù),gcache,cache2go,其中存儲(chǔ)緩存對(duì)象的結(jié)構(gòu)都是這樣,對(duì)于輕量級(jí)的緩存庫(kù),為了設(shè)計(jì)簡(jiǎn)潔(包含清理過(guò)期對(duì)象等)再加上當(dāng)需要緩存大量數(shù)據(jù)時(shí)有redis,memcache等明星項(xiàng)目解決。但是如果拋開(kāi)這些因素遇到真正數(shù)量巨大的數(shù)據(jù)量時(shí),直接對(duì)一個(gè)map加鎖,當(dāng)map中的值越來(lái)越多,訪(fǎng)問(wèn)map的請(qǐng)求越來(lái)越多,大家都競(jìng)爭(zhēng)這一把鎖顯得并發(fā)訪(fǎng)問(wèn)控制變重。在go1.9引入sync.Map之前,比較流行的做法就是使用分段鎖,顧名思義就是將鎖分段,將鎖的粒度變小,將存儲(chǔ)的對(duì)象分散到各個(gè)分片中,每個(gè)分片由一把鎖控制,這樣使得當(dāng)需要對(duì)在A分片上的數(shù)據(jù)進(jìn)行讀寫(xiě)時(shí)不會(huì)影響B(tài)分片的讀寫(xiě)。
// Map 分片 type ConcurrentMap []*ConcurrentMapShared // 每一個(gè)Map 是一個(gè)加鎖的并發(fā)安全Map type ConcurrentMapShared struct { items map[string]interface{} sync.RWMutex // 各個(gè)分片Map各自的鎖 }
func New() ConcurrentMap { // SHARD_COUNT 默認(rèn)32個(gè)分片 m := make(ConcurrentMap, SHARD_COUNT) for i := 0; i < SHARD_COUNT; i++ { m[i] = &ConcurrentMapShared{ items: make(map[string]interface{}), } } return m }
func (m ConcurrentMap) GetShard(key string) *ConcurrentMapShared { return m[uint(fnv32(key))%uint(SHARD_COUNT)] } // FNV hash func fnv32(key string) uint32 { hash := uint32(2166136261) const prime32 = uint32(16777619) for i := 0; i < len(key); i++ { hash *= prime32 hash ^= uint32(key[i]) } return hash }
func (m ConcurrentMap) Set(key string, value interface{}) { shard := m.GetShard(key) // 段定位找到分片 shard.Lock() // 分片上鎖 shard.items[key] = value // 分片操作 shard.Unlock() // 分片解鎖 } func (m ConcurrentMap) Get(key string) (interface{}, bool) { shard := m.GetShard(key) shard.RLock() val, ok := shard.items[key] shard.RUnlock() return val, ok }
// 統(tǒng)計(jì)當(dāng)前分段map中item的個(gè)數(shù) func (m ConcurrentMap) Count() int { count := 0 for i := 0; i < SHARD_COUNT; i++ { shard := m[i] shard.RLock() count += len(shard.items) shard.RUnlock() } return count } // 獲取所有的key func (m ConcurrentMap) Keys() []string { count := m.Count() ch := make(chan string, count) // 每一個(gè)分片啟動(dòng)一個(gè)協(xié)程 遍歷key go func() { wg := sync.WaitGroup{} wg.Add(SHARD_COUNT) for _, shard := range m { go func(shard *ConcurrentMapShared) { defer wg.Done() shard.RLock() // 每個(gè)分片中的key遍歷后都寫(xiě)入統(tǒng)計(jì)用的channel for key := range shard.items { ch <- key } shard.RUnlock() }(shard) } wg.Wait() close(ch) }() keys := make([]string, count) // 統(tǒng)計(jì)各個(gè)協(xié)程并發(fā)讀取Map分片的key for k := range ch { keys = append(keys, k) } return keys }
func BenchmarkMapShared(b *testing.B) { num := 10000 testCase := genNoRepetTestCase(num) // 10000個(gè)不重復(fù)的鍵值對(duì) m := New() for _, v := range testCase { m.Set(v.Key, v.Val) } b.ResetTimer() for i := 0; i < 5; i++ { b.Run(strconv.Itoa(i), func(b *testing.B) { b.N = 1000000 wg := sync.WaitGroup{} wg.Add(b.N * 2) for i := 0; i < b.N; i++ { e := testCase[rand.Intn(num)] go func(key string, val interface{}) { m.Set(key, val) wg.Done() }(e.Key, e.Val) go func(key string) { _, _ = m.Get(key) wg.Done() }(e.Key) } wg.Wait() }) } }
原生Map加鎖壓測(cè)結(jié)果
分段鎖壓測(cè)結(jié)果
type Map struct { // 保護(hù)dirty的鎖 mu Mutex // 只讀數(shù)據(jù)(修改采用原子操作) read atomic.Value // 包含只讀中所有數(shù)據(jù)(冗余),寫(xiě)入新數(shù)據(jù)時(shí)也在dirty中操作 dirty map[interface{}]*entry // 當(dāng)原子操作訪(fǎng)問(wèn)只讀read時(shí)找不到數(shù)據(jù)時(shí)會(huì)去dirty中尋找,此時(shí)misses+1,dirty及作為存儲(chǔ)新寫(xiě)入的數(shù)據(jù),又冗余了只讀結(jié)構(gòu)中的數(shù)據(jù),所以當(dāng)misses > dirty 的長(zhǎng)度時(shí), 會(huì)將dirty升級(jí)為read,同時(shí)將老的dirty置nil misses int } // Map struct 中的 read 就是readOnly 的指針 type readOnly struct { // 基礎(chǔ)Map m map[interface{}]*entry // 用于表示當(dāng)前dirty中是否有read中不存在的數(shù)據(jù), 在寫(xiě)入數(shù)據(jù)時(shí), 如果發(fā)現(xiàn)dirty中沒(méi)有新數(shù)據(jù)且dirty為nil時(shí),會(huì)將read中未被刪除的數(shù)據(jù)拷貝一份冗余到dirty中, 過(guò)程與Map struct中的 misses相呼應(yīng) amended bool } // 數(shù)據(jù)項(xiàng) type entry struct { p unsafe.Pointer } // 用于標(biāo)記數(shù)據(jù)項(xiàng)已被刪除(主要保證數(shù)據(jù)冗余時(shí)的并發(fā)安全) // 上述Map結(jié)構(gòu)中說(shuō)到有一個(gè)將read數(shù)據(jù)拷貝冗余至dirty的過(guò)程, 因?yàn)閯h除數(shù)據(jù)項(xiàng)是將*entry置nil, 為了避免冗余過(guò)程中因并發(fā)問(wèn)題導(dǎo)致*entry改變而影響到拷貝后的dirty正確性,所以sync.Map使用expunged來(lái)標(biāo)記entry是否被刪除 var expunged = unsafe.Pointer(new(interface{}))
func (m *Map) Store(key, value interface{}) { // 先不上鎖,而是從只讀數(shù)據(jù)中按key讀取, 如果已存在以compareAndSwap操作進(jìn)行覆蓋(update) read, _ := m.read.Load().(readOnly) if e, ok := read.m[key]; ok && e.tryStore(&value) { return } m.mu.Lock() // 雙檢查獲取read read, _ = m.read.Load().(readOnly) // 如果data在read中,更新entry if e, ok := read.m[key]; ok { // 如果原子操作讀到的數(shù)據(jù)是被標(biāo)記刪除的, 則視為新數(shù)據(jù)寫(xiě)入dirty if e.unexpungeLocked() { m.dirty[key] = e } // 原子操作寫(xiě)新數(shù)據(jù) e.storeLocked(&value) } else if e, ok := m.dirty[key]; ok { // 原子操作寫(xiě)新數(shù)據(jù) e.storeLocked(&value) } else { // 新數(shù)據(jù) // 當(dāng)dirty中沒(méi)有新數(shù)據(jù)時(shí),將read中數(shù)據(jù)冗余到dirty if !read.amended { m.dirtyLocked() m.read.Store(readOnly{m: read.m, amended: true}) } m.dirty[key] = newEntry(value) } m.mu.Unlock() } func (e *entry) tryStore(i *interface{}) bool { p := atomic.LoadPointer(&e.p) if p == expunged { return false } for { if atomic.CompareAndSwapPointer(&e.p, p, unsafe.Pointer(i)) { return true } p = atomic.LoadPointer(&e.p) if p == expunged { return false } } } // 在dirty中沒(méi)有比read多出的新數(shù)據(jù)時(shí)觸發(fā)冗余 func (m *Map) dirtyLocked() { if m.dirty != nil { return } read, _ := m.read.Load().(readOnly) m.dirty = make(map[interface{}]*entry, len(read.m)) for k, e := range read.m { // 檢查entry是否被刪除, 被刪除的數(shù)據(jù)不冗余 if !e.tryExpungeLocked() { m.dirty[k] = e } } } func (e *entry) tryExpungeLocked() (isExpunged bool) { p := atomic.LoadPointer(&e.p) for p == nil { // 將被刪除(置nil)的數(shù)據(jù)以cas原子操作標(biāo)記為expunged(防止因并發(fā)情況下其他操作導(dǎo)致冗余進(jìn)dirty的數(shù)據(jù)不正確) if atomic.CompareAndSwapPointer(&e.p, nil, expunged) { return true } p = atomic.LoadPointer(&e.p) } return p == expunged }
func (m *Map) Load(key interface{}) (value interface{}, ok bool) { read, _ := m.read.Load().(readOnly) e, ok := read.m[key] // 只讀數(shù)據(jù)中沒(méi)有,并且dirty有比read多的數(shù)據(jù),加鎖在dirty中找 if !ok && read.amended { m.mu.Lock() // 雙檢查, 因?yàn)樯湘i之前的語(yǔ)句是非原子性的 read, _ = m.read.Load().(readOnly) e, ok = read.m[key] if !ok && read.amended { // 只讀中沒(méi)有讀取到的次數(shù)+1 e, ok = m.dirty[key] // 檢查是否達(dá)到觸發(fā)dirty升級(jí)read的條件 m.missLocked() } m.mu.Unlock() } if !ok { return nil, false } // atomic.Load 但被標(biāo)記為刪除的會(huì)返回nil return e.load() } func (m *Map) missLocked() { m.misses++ if m.misses < len(m.dirty) { return } m.read.Store(readOnly{m: m.dirty}) m.dirty = nil m.misses = 0 }
func (m *Map) Delete(key interface{}) { read, _ := m.read.Load().(readOnly) e, ok := read.m[key] // 只讀中不存在需要到dirty中去刪除 if !ok && read.amended { m.mu.Lock() // 雙檢查, 因?yàn)樯湘i之前的語(yǔ)句是非原子性的 read, _ = m.read.Load().(readOnly) e, ok = read.m[key] if !ok && read.amended { delete(m.dirty, key) } m.mu.Unlock() } if ok { e.delete() } } func (e *entry) delete() (hadValue bool) { for { p := atomic.LoadPointer(&e.p) if p == nil || p == expunged { return false } if atomic.CompareAndSwapPointer(&e.p, p, nil) { return true } } }
同樣以剛剛壓測(cè)原生加鎖Map和分段鎖的方式來(lái)壓測(cè)sync.Map
還有注意在使用sync.Map時(shí)切忌不要將其拷貝,go源碼中有對(duì)sync.Map注釋到”AMapmustnotbecopiedafterfirstuse.”因?yàn)楫?dāng)sync.Map被拷貝之后,Map類(lèi)型的dirty還是那個(gè)map但是read和鎖卻不是之前的read和鎖(都不在一個(gè)世界你拿什么保護(hù)我),所以必然導(dǎo)致并發(fā)不安全(為了寫(xiě)博我把sync.Map代碼復(fù)制出來(lái)一份把私有成員改成可外部訪(fǎng)問(wèn)的打印指針)
想要了解更多關(guān)于區(qū)塊鏈方面內(nèi)容的小伙伴,請(qǐng)關(guān)注扣丁學(xué)堂區(qū)塊鏈培訓(xùn)官網(wǎng)、微信等平臺(tái),扣丁學(xué)堂IT職業(yè)在線(xiàn)學(xué)習(xí)教育平臺(tái)為您提供權(quán)威的區(qū)塊鏈視頻教程,此外還有與時(shí)俱進(jìn)的區(qū)塊鏈課程體系和區(qū)塊鏈視頻直播課供大家學(xué)習(xí),想要學(xué)好區(qū)塊鏈開(kāi)發(fā)技術(shù)的小伙伴快快行動(dòng)吧??鄱W(xué)堂區(qū)塊鏈交流群:850351616。
【關(guān)注微信公眾號(hào)獲取更多學(xué)習(xí)資料】
查看更多關(guān)于“區(qū)塊鏈培訓(xùn)技術(shù)資訊”的相關(guān)文章>>