24.atomic

atomic

1. 基础知识

1.1. 概念

原子性:一个或多个操作在CPU的执行过程中不被中断的特性,称为原子性。这些操作对外表现成一个不可分割的整体,他们要么都执行,要么都不执行,外界不会看到他们只执行到一半的状态。

原子操作:进行过程中不能被中断的操作,原子操作由底层硬件支持,而锁则是由操作系统提供的API实现,若实现相同的功能,前者通常会更有效率

1.2. 介绍

atomic包提供了底层的原子级内存操作,对于同步算法的实现很有用。

这些函数必须谨慎地保证正确使用。除了某些特殊的底层应用,使用通道或者sync包的函数/类型实现同步更好。

应通过通信来共享内存,而不通过共享内存实现通信。

被SwapT系列函数实现的交换操作,在原子性上等价于:

1
2
3
old = *addr
*addr = new
return old

CompareAndSwapT系列函数实现的比较-交换操作,在原子性上等价于:

1
2
3
4
5
if *addr == old {
	*addr = new
	return true
}
return false

AddT 系列函数实现加法操作,在原子性上等价于:

1
2
*addr += delta
return *addr

LoadT和StoreT系列函数实现的加载和保持操作,在原子性上等价于:“return *addr"和”*addr = val"。

1.3. 类型与函数

atomic包中支持六种类型

  • int32
  • uint32
  • int64
  • uint64
  • uintptr
  • unsafe.Pointer

对于每一种类型,提供了五类原子操作:

LoadXXX(addr): 原子性的获取*addr的值,等价于:

1
return *addr

StoreXXX(addr, val): 原子性的将val的值保存到*addr,等价于:

1
addr = val

AddXXX(addr, delta): 原子性的将delta的值添加到*addr并返回新值(unsafe.Pointer不支持),等价于:

1
2
*addr += delta
return *addr

SwapXXX(addr, new) old: 原子性的将new的值保存到*addr并返回旧值,等价于:

1
2
3
old = *addr
*addr = new
return old

CompareAndSwapXXX(addr, old, new) bool: 原子性的比较*addrold,如果相同则将new赋值给*addr并返回true,等价于:

1
2
3
4
5
if *addr == old {
    *addr = new
    return true
}
return false

1.4. 支持任意类型的 atomic.Value

Go语言在1.4版本的时候向sync/atomic包中添加了新的类型Value,此类型相当于一个容器,被用来"原子地"存储(Store)和加载任意类型的值

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
func TestGosched(t *testing.T) {
	done := false
	var v atomic.Value
	v.Store(done)

	go func() {
		v.Store(true)
	}()

	for !v.Load().(bool) {
	}

	println("done !")
}

2. 使用案例

2.1. atomic.AddInt32

多个 goroutine 令 count 递增,在没有任何保护措施情况下,会发生 data race。此处使用 atomic.AddInt32 来保证原子性。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
func TestAtomicAddInt32(t *testing.T) {
	var count int32
	var wg sync.WaitGroup
	for i := 0; i < 100; i++ {
		wg.Add(1)
		go func() {
			defer wg.Done()
			atomic.AddInt32(&count, 1)
		}()
	}
	wg.Wait()
	fmt.Println("count: ", count)
}

2.2. atomic.Value

3. 注意事项

4. 源码浅析

4.1. Value

go@1.16.5

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
// 提供原子的加载和存储
// 零值为 nil
// 调用 Store 后,禁止复制 Value
// 第一次使用后,禁止复制
type Value struct {
	v interface{}
}


// ifaceWords 是 interface{} 内部表示
type ifaceWords struct {
	typ  unsafe.Pointer  // 原始类型
	data unsafe.Pointer	 // 值
}

// 返回最近 Store 设置的值
// 如果没有调用 Store 则返回 nil
func (v *Value) Load() (x interface{}) {
	vp := (*ifaceWords)(unsafe.Pointer(v))
	typ := LoadPointer(&vp.typ)
	if typ == nil || uintptr(typ) == ^uintptr(0) {
		// 第一次存储没有完成
		return nil
	}
	data := LoadPointer(&vp.data)
	xp := (*ifaceWords)(unsafe.Pointer(&x))
	xp.typ = typ
	xp.data = data
	return
}

// Store 将 Value 的值设置为 x
// 所有 Store 调用必须使用相同类型的值
// 类型不一致会发生 panic, Store(nil) 也是如此
func (v *Value) Store(x interface{}) {
	if x == nil {
		panic("sync/atomic: store of nil value into Value")
	}
  // 将 V 和 x 转换为 ifaceWords 类型,这样下一步方便获取原始类型和值
	vp := (*ifaceWords)(unsafe.Pointer(v))
	xp := (*ifaceWords)(unsafe.Pointer(&x))
	for {
    // 现有的值
		typ := LoadPointer(&vp.typ)
		// 如果 typ = nil 则表示第一次 Store 
    if typ == nil {
			// 开始第一次存储
			runtime_procPin()
      
			if !CompareAndSwapPointer(&vp.typ, nil, unsafe.Pointer(^uintptr(0))) {
				runtime_procUnpin()
				continue
			}
			// Complete first store.
			StorePointer(&vp.data, xp.data)
			StorePointer(&vp.typ, xp.typ)
			runtime_procUnpin()
			return
		}
    
		if uintptr(typ) == ^uintptr(0) {
			// 如果typ为^uintptr(0)说明第一次写入还没有完成,继续循环等待
			continue
		}
		// 类型不一致,panic
		if typ != xp.typ {
			panic("sync/atomic: store of inconsistently typed value into Value")
		}
    // 覆盖数据
		StorePointer(&vp.data, xp.data)
		return
	}
}

// 禁用/启用 抢占, 在 runtime 实现
func runtime_procPin()
func runtime_procUnpin()

runtime_procPin 可以将一个 Goroutine 占用当前使用的 P,不允许其他的 Gouroutine 抢占,runtime_procUnpin 释放。

Licensed under CC BY-NC-SA 4.0
本文阅读量 次, 总访问量 ,总访客数
Built with Hugo .   Theme Stack designed by Jimmy