共享变量的并发
一个函数在并发调用时没法工作的原因太多了,比如死锁(deadlock)、活锁(livelock)和饿死(resource starvation)。我们没有空去讨论所有的问题,这里我们只聚焦在竞争条件上。
无论任何时候,只要有两个 goroutine 并发访问同一变量,且至少其中的一个是写操作的时候就会发生数据竞争。
数据竞争会在两个以上的goroutine并发访问相同的变量且至少其中一个为写操作时发生。根据上述定义,有三种方式可以避免数据竞争:
1. 不要去写变量
使用 map
判断是否有改变量
2. 避免从多个 gorontine 访问变量
不要使用共享数据来通信;使用通信来共享数据 , 仅使用一个 gorontine 访问变量, 提供对一个指定的变量通过channel来请求的goroutine叫做这个变量的monitor(监控)goroutine。
例如:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
|
package bank
import (
"fmt"
"sync"
"testing"
)
var deposits = make(chan int) // 汇款
var balances = make(chan int) // 余额
// Deposit 存款
func Deposit(amount int) { deposits <- amount }
// Balance 余额
func Balance() int { return <-balances }
type draw struct {
amount int
succeed chan bool
}
var withdraws = make(chan draw) // 取款
// Withdraw 取款
func Withdraw(amount int) bool {
succeed := make(chan bool)
withdraws <- draw{amount, succeed}
return <-succeed
}
func teller() {
var balance int
for {
select {
case amount := <-deposits: // 存款
balance += amount
case balances <- balance: // 余额
case draw := <-withdraws:
if draw.amount <= balance {
balance -= draw.amount
draw.succeed <- true
} else {
draw.succeed <- false
}
}
}
}
func init() {
go teller()
}
func TestTeller(t *testing.T) {
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
Deposit(100 * i)
}(i)
}
for i := 0; i < 4; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
flag := Withdraw(2000 * i)
if !flag {
fmt.Println("余额不足")
}
}(i)
}
wg.Wait()
fmt.Println(Balance())
fmt.Println("end")
}
// 练习 9.1: 给bank1程序添加一个Withdraw(amount int)取款函数。其返回结果应该要表明事务是成功了还是因为没有足够资金失败了。这条消息会被发送给monitor的goroutine,且消息需要包含取款的额度和一个新的channel,这个新channel会被monitor goroutine来把boolean结果发回给Withdraw。
|
即使当一个变量无法在其整个生命周期内被绑定到一个独立的goroutine,绑定依然是并发问题的一个解决方案。
例如在一条流水线上的goroutine之间共享变量是很普遍的行为,在这两者间会通过channel来传输地址信息。如果流水线的每一个阶段都能够避免在将变量传送到下一阶段后再去访问它,那么对这个变量的所有访问就是线性的。其效果是变量会被绑定到流水线的一个阶段,传送完之后被绑定到下一个,以此类推。这种规则有时被称为串行绑定。
下面的例子中,Cakes会被严格地顺序访问,先是baker gorouine,然后是icer gorouine:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
type Cake struct{ state string }
func baker(cooked chan<- *Cake) {
for {
cake := new(Cake)
cake.state = "cooked"
cooked <- cake // baker never touches this cake again
}
}
func icer(iced chan<- *Cake, cooked <-chan *Cake) {
for cake := range cooked {
cake.state = "iced"
iced <- cake // icer never touches this cake again
}
}
|
3. 互斥锁
允许很多 goruntine 访问, 但是同一时刻只有一个访问
3.1. 二元信号量
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
var (
sema = make(chan struct{}, 1) // 二元信号量
balance int
)
func Deposit(amount int) {
sema <- struct{}{} // acquire token
balance = balance + amount
<-sema // release token
}
func Balance() int {
sema <- struct{}{} // acquire token
b := balance
<-sema // release token
return b
}
|
3.2. sync.Mutex
(1)使用Lock()
加锁,Unlock()
解锁;
(2)对未解锁的 Mutex
使用 Lock()
会阻塞;
(3)对未上锁的 Mutex
使用 Unlock()
会导致 panic 异常。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
import "sync"
var (
mu sync.Mutex // guards balance
balance int
)
func Deposit(amount int) {
mu.Lock()
balance = balance + amount
mu.Unlock()
}
func Balance() int {
mu.Lock()
b := balance
mu.Unlock()
return b
}
|
defer调用只会比显式地调用Unlock成本高那么一点点,不过却在很大程度上保证了代码的整洁性。大多数情况下对于并发程序来说,代码的整洁性比过度的优化更重要。如果可能的话尽量使用defer来将临界区扩展到函数的结束。
1
2
3
4
5
|
func Balance() int {
mu.Lock()
defer mu.Unlock()
return balance
}
|
注意: 没法对一个已经锁上的mutex来再次上锁——这会导致程序死锁,没法继续执行下去,会永远阻塞下去。
1
2
3
4
5
6
7
8
9
10
11
12
|
// ❌ 错误示例
// 此函数并非原子操作,遇到并发时, 会出现异常
func Withdraw(amount int) bool {
// mu.Lock()
// defer mu.Unlock()
Deposit(-amount)
if Balance() < 0 {
Deposit(amount)
return false // insufficient funds
}
return true
}
|
一个通用解决办法是
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
// 不安全的的存款操作
func deposit(amount int) { balance += amount }
// 安全的存款操作
func Deposit(amount int) {
mu.Lock()
defer mu.Unlock()
deposit(amount)
}
// 安全的取款操作
func Withdraw(amount int) bool {
mu.Lock()
defer mu.Unlock()
deposit(-amount)
if balance < 0 {
deposit(amount)
return false // insufficient funds
}
return true
}
|
4. 读写锁 多读单写
特点:读共享,写独占,写优先
1
2
3
4
5
6
7
|
var mu sync.RWMutex
var balance int
func Balance() int {
mu.RLock() // readers lock
defer mu.RUnlock()
return balance
}
|
1
2
3
4
5
|
func (rw *RWMutex) Lock() // 锁读写
func (rw *RWMutex) RLock() // 锁定为读取状态, 禁止写入
func (rw *RWMutex) RLocker() Locker // 返回互斥锁
func (rw *RWMutex) RUnlock() // 解读写锁
func (rw *RWMutex) Unlock() // 解互斥锁
|
(1)RWMutex是单写多读锁,该锁可以加多个读锁或者一个写锁;
(2)读锁占用的情况下会阻止写,不会阻止读,多个 goroutine 可以同时获取读锁;
(3)写锁会阻止其他 goroutine(无论读和写)进来,整个锁由该 goroutine 独占;
(4)适用于读多写少的场景。
内存同步
这个有点复杂
直接总结
可能的话,将变量限定在goroutine内部;如果是多个goroutine都需要访问的变量,使用互斥条件来访问。
因为赋值和打印指向不同的变量,编译器可能会断定两条语句的顺序不会影响执行结果,并且会交换两个语句的执行顺序。如果两个goroutine在不同的CPU上执行,每一个核心有自己的缓存,这样一个goroutine的写入对于其它goroutine的Print,在主存同步之前就是不可见的了。
1
2
3
4
5
|
var x, y int
go func() {
x = 1 // A1
fmt.Print("y:", y, " ") // A2
}()
|
sync.Once 仅执行一次
互斥锁的单例
1
2
3
4
5
6
7
8
|
func Icon(name string) image.Image {
mu.Lock()
defer mu.Unlock()
if icons == nil {
loadIcons()
}
return icons[name]
}
|
使用互斥访问icons的代价就是没有办法对该变量进行并发访问,即使变量已经被初始化完毕且再也不会进行变动。这里我们可以引入一个允许多读的锁:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
var mu sync.RWMutex // guards icons
var icons map[string]image.Image
// Concurrency-safe.
func Icon(name string) image.Image {
mu.RLock()
if icons != nil {
icon := icons[name]
mu.RUnlock()
return icon
}
mu.RUnlock()
// acquire an exclusive lock
mu.Lock()
if icons == nil { // NOTE: must recheck for nil
loadIcons()
}
icon := icons[name]
mu.Unlock()
return icon
}
|
上面的代码太复杂了, 再简化一下
1
2
3
4
5
6
7
|
var loadIconsOnce sync.Once
var icons map[string]image.Image
// Concurrency-safe.
func Icon(name string) image.Image {
loadIconsOnce.Do(loadIcons)
return icons[name]
}
|
竞争检查器
只要在go build,go run或者go test命令后面加上-race的flag
竞争检查器会检查这些事件,会寻找在哪一个goroutine中出现了这样的case,例如其读或者写了一个共享变量,这个共享变量是被另一个goroutine在没有进行干预同步操作便直接写入的。
1
|
go test -run=TestConcurrent -race -v gopl.io/ch9/memo1
|
并发的非阻塞缓冲
一个 http 请求的例子 , 函数调用开销比较大
1
2
3
4
5
6
7
8
|
func httpGetBody(url string) (interface{}, error) {
resp, err := http.Get(url)
if err != nil {
return nil, err
}
defer resp.Body.Close()
return ioutil.ReadAll(resp.Body)
}
|
第一个版本
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
package memo
type Memo struct{
f Func
cache map[string]result
}
type Func func(key string) (interface{},error)
type result struct{
value interface{}
err error
}
// 构造
func New(f Func) *Memo {
return &Memo{ f:f, cache:make(map[string]result) }
}
// 非线程安全
func (memo *Memo) Get(key string) (interface{},err){
res, ok := memo.cache[key]
if !ok{
res.value,res.err = memo.f(key)
memo.cache[key] = res
}
return res.value,res.err
}
|
第二个版本
并发、不重复、无阻塞的cache就完成了。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
|
type result struct {
value interface{}
err error
}
type entry struct{
res reuslt
ready chan struct{} // res 数据准备好后关闭
}
func New(f Func) *Memo {
return &Memo{f:f,cache:make(map[string]*entry)}
}
func Memo struct{
f Func
mu sync.Mutex
cache map[string]*entry
}
func (memo *Memo) Get(key string) (value interface{}) {
// 获取互斥锁来保护共享变量cache map
memo.mu.lock()
e := memo.cache[key]
if e==nil{
// 插入一个新条目,释放互斥锁
e = &entry{ready: make(chan struct{})}
memo.cache[key] = e
memo.mu.Unlock()
e.res.value, e.res.err = memo.f(key)
close(e.ready) // 取到值,通知关闭
}else{
memo.mu.Unlock()
<- e.ready // 等待拿值
}
return e.res.value,e.res.err
}
|
第三个版本
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
|
type request struct {
key string
response chan<- result // the client wants a single result
}
type Memo struct{ requests chan request }
func New(f Func) *Memo {
memo := &Memo{requests: make(chan request)}
go memo.server(f)
return memo
}
func (memo *Memo) Get(key string) (interface{}, error) {
response := make(chan result)
memo.requests <- request{key, response}
res := <-response
return res.value, res.err
}
func (memo *Memo) Close() { close(memo.requests) }
func (memo *Memo) server(f Func){
cache := make(map[string]*entry)
for req := range memo.requests{
e := cache[req.key]
if e ==nil{
e = &entry{ready:make(chan struct{})}
cache[req.key] = e
go e.call(f,req.key) // 获取数据
}
go e.deliver(req.response) // 监听数据
}
}
// 获取数据
func (e *entry) call(f Func, key string){
e.res.value,e.res.err = f(key)
close(e.ready)
}
// 阻塞直到关闭,然后返回结果
func (e *entry) deliver(response chan<- result){
<-e.ready
response <- e.res
}
|
Goroutines 和线程
每个 os 线程都有固定大小的内存块( 一般 2MB ) 做栈, 用来存储调用或挂起的函数内部变量 , 若是创建成百上千非常浪费内存空间 ;
而 Goroutine 会以很小的栈开始生命周期 ( 一般 2KB ) , 空间是动态伸缩 , 最大值可达到 1GB,
OS 线程会被操作系统内核调度 , 进行线程切换 , 其局部性很差, 需要几次内存访问
而 Go 的运行包含了自己的调度器 , 使用 M:N , n 和 os 线程操作 M 个 goroutine
GOMAXPROCS 环境变量决定有多少个os 线程同时执行 Go 代码, 默认值是机器 CPU 核心数 , 阻塞/休眠不需要对应 os 线程, I/O,系统调用,非 go 语言函数需要对应 OS 线程, GOMAXPROCS 不会计算这几种情况 , 运行时使用 runtime.GOMAXPROCS
修改