某些并发操作不需要显式同步。我们可以利用这些操作来创建无锁类型和函数,以便多个 goroutine 可以安全地使用它们。让我们深入探讨这个话题!
非原子增量
假设多个 goroutines 增加一个共享计数器:
1
2
3
4
5
6
7
8
9
10
11
12
13
|
func main(){
total := 0
var wg sync.WaitGroup
for range 5 {
wg.Go(func() {
for range 10000 {
total++
}
})
}
wg.Wait()
fmt.Println("total", total)
}
|
这里有 5 个 Goroutine,每个 Goroutine total 执行 10,000 次递增,所以最终结果应该是 50,000。但通常情况下会更少。让我们再运行几次代码:
1
2
3
|
total 26775
total 22978
total 30357
|
竞争检测器报告了一个问题:
1
2
3
4
5
6
7
|
$ go run -race total.go
==================
WARNING: DATA RACE
...
==================
total 33274
Found 1 data race(s)
|
这看起来可能有点奇怪—— total++ 操作难道不应该是原子的吗?其实不然。它包含三个步骤(读取-修改-写入):
- 读取
total 的当前值。
- +1
- 将新值写会到
total
如果两个 Goroutine 都读取了值 42 ,然后分别对其进行加一和写回,新的 total 将是 43 而不是应该的 44 。结果,计数器的一些增量将会丢失,最终值将小于 50,000 。
可以使用互斥锁或其他同步工具使操作具有原子性。但在本章中,我们约定不使用它们。这里,我所说的“原子操作”是指不需要调用者使用显式锁,但在并发环境中仍然可以安全使用的操作。
一个没有同步的操作只有转化为单条处理器指令才能真正实现原子性。这样的操作不需要锁,并且在并发调用时(即使是写操作)也不会引发问题。
在理想情况下,每个操作都应该是原子的,我们不必处理互斥锁。但实际上,原子操作只有少数几种,它们都可以在 sync/atomic 包中找到。该包提供了一组原子类型:
原子操作
每种原子类型都提供以下方法:
Load 读取变量的值, Store 设置新值:
1
2
3
|
var n atomic.Int32
n.Store(10)
fmt.Println("Store", n.Load())
|
Swap 设置一个新值(如 Store )并返回旧值:
1
2
3
4
|
var n atomic.Int32
n.Store(10)
old := n.Swap(42)
fmt.Println("Swap", old, "->", n.Load())
|
仅当当前值仍符合你的预期时, CompareAndSwap 才会设置新值:
1
2
3
4
5
|
var n atomic.Int32
n.Store(10)
swapped := n.CompareAndSwap(10, 42)
fmt.Println("CompareAndSwap 10 -> 42:", swapped)
fmt.Println("n =", n.Load())
|
1
2
3
4
5
|
var n atomic.Int32
n.Store(10)
swapped := n.CompareAndSwap(33, 42)
fmt.Println("CompareAndSwap 33 -> 42:", swapped)
fmt.Println("n =", n.Load())
|
数字类型还提供了一种 Add 方法,可以将值增加指定的量:
1
2
3
4
|
var n atomic.Int32
n.Store(10)
n.Add(32)
fmt.Println("Add 32:", n.Load())
|
用于位运算的 And / Or 方法(Go 1.23+):
1
2
3
4
5
6
7
8
9
10
11
|
const (
modeRead = 0b100
modeWrite = 0b010
modeExec = 0b001
)
var mode atomic.Int32
mode.Store(modeRead)
old := mode.Or(modeWrite)
fmt.Printf("mode: %b -> %b\n", old, mode.Load())
|
所有方法都被转换为单个 CPU 指令,因此它们对于并发调用是安全的。
严格来说,这并非总是如此。并非所有处理器都支持完整的并发操作,因此有时需要多条指令。但我们不必担心这一点——Go 为调用者保证 sync/atomic 操作的原子性。它使用特定于每个处理器架构的低级机制来做到这一点。
与其他同步原语一样,每个原子变量都有其自身的内部状态。因此,您应该仅将其作为指针传递,而不是通过值传递,以避免意外复制状态。
使用 atomic.Value 时,所有读取和存储操作都应该使用相同的具体类型。以下代码将引发 panic:
1
2
3
|
var v atomic.Value
v.Store(10)
v.Store("hi")
|
现在,让我们回到计数器程序,并将其重写为使用原子计数器:
1
2
3
4
5
6
7
8
9
10
11
12
13
|
var total atomic.Int32
var wg sync.WaitGroup
for range 5 {
wg.Go(func() {
for range 10000 {
total.Add(1)
}
})
}
wg.Wait()
fmt.Println("total", total.Load())
|
现在好了,没有数据竞争。
原子构成
并发程序中的原子操作非常重要。此类操作通常转换为单处理器指令,并且不需要锁。你可以安全地从不同的 goroutine 调用它,并获得可预测的结果。
但是如果将原子操作结合起来会发生什么呢?让我们来一探究竟。
原子性
让我们看一个增加计数器的函数:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
var counter int32
// increment increases the counter value by two.
func increment() {
counter += 1
sleep(10)
counter += 1
}
// sleep pauses the current goroutine for up to maxMs ms.
func sleep(maxMs int) {
dur := time.Duration(rand.IntN(maxMs)) * time.Millisecond
time.Sleep(dur)
}
|
正如你所知,从多个 goroutine 调用 increment 是不安全的,因为 counter += 1 会导致数据争用。
现在我将尝试解决这个问题,并提出几种方案。在每种情况下,请回答以下问题:如果从 100 个 Goroutine 调用 increment , counter 的最终值是否能保证?
1
2
3
4
5
6
7
8
|
// example 1:
var counter atomic.Int32
func increment() {
counter.Add(1)
sleep(10)
counter.Add(1)
}
|
1
2
3
4
5
6
7
8
9
10
11
12
|
// example 2:
var counter atomic.Int32
func increment() {
if counter.Load()%2 == 0 {
sleep(10)
counter.Add(1)
} else {
sleep(10)
counter.Add(2)
}
}
|
1
2
3
4
5
6
7
8
9
|
// example 3:
var delta atomic.Int32
var counter atomic.Int32
func increment() {
delta.Add(1)
sleep(10)
counter.Add(delta.Load())
}
|
构成
人们有时会认为原子操作的组合也会神奇地变成原子操作。但事实并非如此。
例如上面例子中的第二个,从不同的 goroutines 调用 increment 100 次,使用 -race 运行程序 — 没有竞争。
1
2
3
4
5
6
|
% go run atomic-2.go
192
% go run atomic-2.go
191
% go run atomic-2.go
189
|
但是我们能确定 counter 的最终值吗?不能。 counter.Load 和 counter.Add 调用是在不同的 goroutine 中交错进行的。这会导致竞争条件(不要与数据竞争混淆),并导致 counter 值不可预测。
在哪个例子中 increment 是原子操作?
答案
在所有示例中, increment 都不是原子操作。原子的组合始终是非原子的。
然而,第一个例子保证了并发环境中 counter 的最终值,如果我们运行 100 个 goroutine, counter 最终将等于 200。
原因是 Add 是一个与顺序无关的操作,运行时可以按照任意顺序执行此类操作,结果都不会改变。
第二和第三个例子使用了顺序相关的操作。当我们运行 100 个 goroutine 时,每次操作的顺序都不一样。因此,结果也不同。
使复合操作原子化并防止竞争条件的一种可靠方法是使用互斥锁:
1
2
3
4
5
6
7
8
9
10
11
|
var delta int32
var counter int32
var mu sync.Mutex
func increment() {
mu.Lock()
delta += 1
sleep(10)
counter += delta
mu.Unlock()
}
|
但有时你只需要一个带有 CompareAndSwap 原子变量。让我们看一个例子。
原子性而非互斥性
假设我们有一扇需要关闭的门:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
type Gate struct {
closed bool // gate state
}
func (g *Gate) Close() {
if g.closed {
return // ignore repeated calls
}
g.closed = true
// free resources
}
func work() {
var g Gate
defer g.Close()
// do something while the gate is open
}
|
在并发环境中, closed 字段存在数据竞争。我们可以使用互斥锁来解决这个问题:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
type Gate struct {
closed bool
mu sync.Mutex // protects the state
}
func (g *Gate) Close() {
g.mu.Lock()
defer g.mu.Unlock()
if g.closed {
return // ignore repeated calls
}
g.closed = true
// free resources
}
|
或者,我们可以在原子 Bool 上使用 CompareAndSwap 而不是互斥锁:
1
2
3
4
5
6
7
8
9
10
11
|
type Gate struct {
closed atomic.Bool
}
func (g *Gate) Close() {
if !g.closed.CompareAndSwap(false, true) {
return // ignore repeated calls
}
// The gate is closed.
// We can free resources now.
}
|
Gate 类型现在更加紧凑和简单。
这不是一个很常见的用例——我们通常希望一个 goroutine 等待一个锁定的互斥锁,并在解锁后继续执行。但对于“提前退出”的情况,它是完美的。
总结
原子操作是一种特殊但实用的工具。您可以将其用于简单的计数器和标志,但在使用更复杂的操作时要非常小心。您也可以使用它们代替互斥锁来提前退出。
并发编程的核心目标是为了在不确定的,混乱的调度中,为关键性部分创造出确定性。通过同步机制(mutex,channel),强制让那些原本可能交错执行的指令,以我们期望的,确定的顺序执行。
还有一种场景比较适合原子操作,在聊天房间中,谁说话了要显示出当前说话人,张三发言时显示张三,李四发言时显示李四,可能同时说话,谁后说话就赋值谁。这是一个典型 ‘无关计算’ 的场景。我们采用‘最后者获胜’的冲突解决策略,因此不需要使用互斥锁进行同步,只需要保证赋值的原子性和内存可见性即可。
无关计算是指与变量的旧值无关,不依赖时序或其它并发操作的中间状态。
最后者获胜是兵法冲突下的解决策略,冲突是允许发生的,系统会接受最后一个完成的操作。
那么这种场景不用原子操作行不行? 不行!
有个极端的坏情况,协程 A 开始写入字符串,写到一半"张",协程 B 抢占了 CPU 并完整的写入了"李四",协程 A 恢复执行写入"三",这既不是 “张三” 也不是 “李四”,而是一个损坏的,无效的数据。
原子操作防止了数据损坏,涉及并发的场景可以用 -race 辅助检测,共享状态的读写并发,没有原子操作一定是不安全的。
参考
Gist of Go: Atomics