19.Channels

channels

channel 是一个通信机制,它可以让一个 goroutine 通过它给另一个goroutine 发送值信息。

channel 是线程安全的, 多个协程操作同一管道, 不会发生资源争抢

[toc]

1. 基础知识

1.1. 创建 channel

channel 的零值也是 nil , 和 map 类似,channel 也对应一个make创建的底层数据结构的引用。当我们复制一个 channel 或用于函数参数传递时,我们只是拷贝了一个 channel 引用,因此调用者和被调用者将引用同一个channel对象。

1
2
ch := make(chan int) 		// 无缓存
ch2 := make(chan int,1) // 有缓冲

1.2. 读写操作

1
2
3
ch <- x   // 发送
x := <-ch // 接收
<-ch      // 接收,丢弃结果

1.3. 关闭

关闭channel,随后对该 channel 的任何发送都将导致panic异常;

对一个已经被close过的 channel 进行接收操作依然可以接受到之前已经成功发送的数据;如果channel中已经没有数据的话将产生一个零值的数据。

1
2
3
close(ch)

result, ok :=  <- ch

第二个结果是一个布尔值ok,ture表示成功从channels接收到值,false表示channels已经被关闭并且里面没有值可接收。

试图重复关闭一个channel将导致panic异常,试图关闭一个nil值的channel也将导致panic异常。关闭一个channels还会触发一个广播机制;

不管一个channel是否被关闭,当它没有被引用时将会被Go语言的垃圾自动回收器回收。

1.4. 比较

两个相同类型的channel可以使用 == 运算符比较

如果两个 channel 引用的是相同的对象,那么比较的结果为真。一个 channel 也可以和nil进行比较。

1
2
3
4
5
ch := make(chan int) 
ch1 := make(chan int)
ch2 := ch
fmt.Printf("%v" , ch==ch2) // false
fmt.Printf("%v", ch==ch2)  // true 引用相同对象

1.5. channel 的两种类型

无缓存

1
2
3
ch := make(chan int)    // 无缓存
ch1 := make(chan int, 0) // 无缓存
<-ch  // 接收 : 阻塞, 会等待发送
1
ch <- 5  // 发送: 阻塞, 会等待接收

Channels的发送操作将导致发送者goroutine阻塞,直到另一个goroutine在相同的Channels上执行接收操作;

当发送的值通过Channels成功传输之后,两个goroutine可以继续执行后面的语句。反之,如果接收操作先发生,那么接收者goroutine也将阻塞,直到有另一个goroutine在相同的Channels上执行发送操作。

有缓存

带缓存的Channel内部持有一个元素队列。队列的最大容量是在调用make函数创建channel时通过第二个参数指定的。向缓存Channel的发送操作就是向内部缓存队列的尾部插入元素,接收操作则是从队列的头部删除元素。

1
2
3
ch := make(chan int, 1)
ch1 := make(chan int, 3)
ch <- 5  // 发送: 队列已满则阻塞
1
<-ch // 接收: 队列为空则阻塞

1.6. 单向 channel

1
func foo(ch chan<- int) <-chan int

在这个函数中 , 参数是单向发送channel , 返回值是单向接收channel

类型chan<- int 表示一个只发送int的channel,只能发送不能接收

类型<-chan int表示一个只接收int的channel,只能接收不能发送。

会在编译阶段自动转换单向类型

注意 : 关闭操作只用于断言不再向channel发送新的数据,所以只有在发送者所在的goroutine才会调用close函数,因此对一个只接收的channel调用close将是一个编译错误。

2. 使用案例

2.1. 模拟镜像点请求 , 获取最快的

1
2
3
4
5
6
7
8
func mirroredQuery() string {
    responses := make(chan string, 3)
    go func() { responses <- request("asia.gopl.io") }()
    go func() { responses <- request("europe.gopl.io") }()
    go func() { responses <- request("americas.gopl.io") }()
    return <-responses // return the quickest response
}
func request(hostname string) (response string) { /* ... */ }

2.2. select 多路复用

  1. 在select语句中操作nil的channel永远都不会被select到
  2. 多个 case 同时就绪, select 会随机选择执行
  3. break 只能跳出 select
1
2
3
4
5
6
7
// 外层套个 for 循环, 可用于轮询监听
select {
	case <-abort:
    fmt.Printf("Launch aborted!\n")
	default:
    // do nothing
}

2.3. 超时控制

1
2
3
4
5
6
select {
	case <- ch:
  //...
  case <- time.After(10*time.Second)
  // ...
}

2.4. for..range… 遍历

使用range 来操作channel , 当 channel 关闭时, 取完所有数据自动结束循环

1
2
for ch := range chs {
}

2.5. 通知所有 goroutine 退出

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// 从终端接收任意键盘输入, 关闭 channel
go func() {
    os.Stdin.Read(make([]byte, 1)) // read a single byte
    close(done)
}()

// 轮询退出状态 工具函数
var done = make(chan struct{})
func cancelled() bool {
    select {
    case <-done:
        return true
    default:
        return false
    }
}

go func(){
  // 在所有函数头部做轮询判断
  if cancelled() {
        return
  }
}()

2.6. 定时器

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
func main() {
    fmt.Println("Commencing countdown.")
    // 定时器
    tick := time.Tick(1 * time.Second)
    for countdown := 10; countdown > 0; countdown-- {
        fmt.Println(countdown)
        <-tick
    
    launch()
}

Tick函数挺方便,但是只有当程序整个生命周期都需要这个时间时我们使用它才比较合适。否则的话,我们应该使用下面的这种模式:

1
2
3
ticker := time.NewTicker(1 * time.Second)
<-ticker.C    
ticker.Stop() 

3. 注意避坑

3.1. goroutine 泄露

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
func makeThumbnails4(filenames []string) error {
    errors := make(chan error)

    for _, f := range filenames {
        go func(f string) {
            _, err := thumbnail.ImageFile(f)
            errors <- err
        }(f)
    }

    for range filenames {
        if err := <-errors; err != nil {
            return err // NOTE: incorrect: goroutine leak!
        }
    }

    return nil
}

当它遇到第一个非nil的error时会直接将error返回到调用方,使得没有一个goroutine去排空errors channel。这样剩下的worker goroutine在向这个channel中发送值时,都会永远地阻塞下去,并且永远都不会退出。这种情况叫做goroutine泄露,可能会导致整个程序卡住或者跑出out of memory的错误。

最简单的解决办法就是用一个具有合适大小的buffered channel,这样这些worker goroutine向channel中发送错误时就不会被阻塞。

sync.WaitGroup

1
2
3
4
5
var wg sync.WaitGroup

wg.Add(1)  // 加入
wg.Done()  // 完成
wg.Wait()  // 等待

简单使用就是在创建一个任务的时候wg.Add(1), 任务完成的时候使用wg.Done()来将任务减一。使用wg.Wait()来阻塞等待所有任务完成。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
func makeThumbnails6(filenames <-chan string) int64 {
    sizes := make(chan int64)
    var wg sync.WaitGroup // number of working goroutines
    for f := range filenames {
        wg.Add(1)
        // worker
        go func(f string) {
            defer wg.Done()
            thumb, err := thumbnail.ImageFile(f)
            if err != nil {
                log.Println(err)
                return
            }
            info, _ := os.Stat(thumb) // OK to ignore error
            sizes <- info.Size()
        }(f)
    }

    // closer
    go func() {
        wg.Wait()
        close(sizes)
    }()

    var total int64
    for size := range sizes {
        total += size
    }
    return total
}

判断 channel 是否关闭

来看看源码

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
type hchan struct {
	qcount   uint           // 队列中有多少数据
	dataqsiz uint           // 环形队列有多大
	buf      unsafe.Pointer // 指向大小为 dataqsiz 的数组
	elemsize uint16					// 元素大小	
	closed   uint32					// 是否关闭
  elemtype *_type // 发送什么类型  *_type 指针是运行时的类型系统
	sendx    uint   // 队列头, 发送索引
	recvx    uint   // 队列尾, 接受索引
  recvq    waitq  // recv 等待列表 ( <- chan )
  sendq    waitq  // send 等待列表 ( ch<- )

	// 保护 hchan 所有字段,以及在此 channel 上阻塞 sudog 的一些字段
	lock mutex
}

image-20210109142238173

警告

关闭一个已关闭的 channel 会导致 panic

向已经关闭的 channel 发送数据会导致 panic

向已经关闭的 channel 读取数据不会导致 panic ,但读取的值为 Channel 缓存数据的零值,可以通过接受语句第二个返回值来检查 Channel 是否关闭:

1
2
3
4
v, ok := <- ch
if !ok {
  ... // Channel 已经关闭
}
Licensed under CC BY-NC-SA 4.0
本文阅读量 次, 总访问量 ,总访客数
Built with Hugo .   Theme Stack designed by Jimmy