通道模式
不能仅通过增加缓存区来提升性能,不要认为把缓冲区设置的非常大就可以解决性能问题。应该将缓冲区尽量设置的小一些,尽量把延迟降低。
缓冲区大小不能胡乱设置成 10000,缓冲区不一定能够提升性能!!不要指望缓冲区大小来提升程序性能,我们要做的是降低发送与接收操作可能产生的延迟,
option |
open |
close |
send |
ok |
no |
recive |
ok |
ok |
面对并发代码时,凭借打印语句,没办法判断哪个发生在前。因为打印不是原子操作。
通道模式
waitForTask - 三个基本模式之一
经理指派任务给员工,经理去做别的事情,同时随时可以叫停或修正员工的任务。
可以实现 Pooling 模式
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
func waitForTask() {
ch := make(chan string)
go func() {
p := <-ch
fmt.Println("recv'd signal : ", p)
}()
time.Sleep(500 * time.Millisecond)
ch <- "paper"
fmt.Println("manager : sent signal")
time.Sleep(time.Second)
fmt.Println("-------------end-------------")
}
|
waitForResult - 三个基本模式之一
经理等待员工完成他的工作任务,再继续往下走。
可以实现 Drop 模式和 FanOut 模式
1
2
3
4
5
6
7
8
9
10
11
12
13
|
func waitForResult() {
ch := make(chan string)
go func() {
time.Sleep(500 * time.Millisecond)
ch <- "paper"
fmt.Println("employee : sned signal")
}()
p := <-ch
fmt.Println("manage : recv'd signal : ", p)
time.Sleep(1 * time.Second)
fmt.Println("-------------end-------------")
}
|
waitForFinished - 三个基本模式之一
其实用 waitGroup 更好,通过它对 goroutine 编组会更加清晰。
经理请员工做事,员工已经知道自己的任务。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
func waitForFinished() {
ch := make(chan struct{})
go func() {
time.Sleep(500 * time.Millisecond)
close(ch)
fmt.Println("employee : sned signal")
}()
_, ok := <-ch
fmt.Println("manage : recv'd signal : ", ok)
time.Sleep(1 * time.Second)
fmt.Println("-------------end-------------")
}
|
Pooling 模式
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
func pooling() {
ch := make(chan string)
const emps = 2
for i := 0; i < emps; i++ {
go func(emp int) {
for p := range ch {
fmt.Printf("employee %d : recv'd signal : %s\n", emp, p)
}
fmt.Printf("employee %d : recv'd signal\n", emp)
}(i)
}
const work = 10
for i := 0; i < work; i++ {
ch <- "paper" + strconv.Itoa(i)
fmt.Println("manager : sent signal : ", i)
}
close(ch)
fmt.Println("manage : recv'd signal end ")
time.Sleep(1 * time.Second)
fmt.Println("-------------end-------------")
}
|
Fan out 模式
很危险,可能会导致程序中的 goroutine 迅速增长
对于隔一段时间执行的定时任务,和命令行工具来说,这种模式很合适。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
// 允许任意数量的 goroutine 执行
func fanOut() {
emps := 20
ch := make(chan string, emps)
for i := 0; i < emps; i++ {
go func(i int) {
time.Sleep(200 * time.Millisecond)
ch <- "paper" + strconv.Itoa(i)
fmt.Println("manager : sent signal : ", i)
}(i)
}
for emps > 0 {
p := <-ch
fmt.Println(p)
fmt.Println("manage : recv'd signal : ", emps)
emps--
}
time.Sleep(2 * time.Second)
fmt.Println("-------------end-------------")
}
|
fanoutSemaphore 模式
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
|
// 限制同时执行的 goroutine 数量
func fanoutSemaphore() {
emps := 20
ch := make(chan string, emps)
const cap = 5
sem := make(chan struct{}, cap)
for i := 0; i < emps; i++ {
go func(i int) {
sem <- struct{}{}
{
time.Sleep(200 * time.Millisecond)
ch <- "paper" + strconv.Itoa(i)
fmt.Println("manager : sent signal : ", i)
}
<-sem
}(i)
}
for emps > 0 {
p := <-ch
fmt.Println(p)
fmt.Println("manage : recv'd signal : ", emps)
emps--
}
}
|
drop 模式
drop 尽快发现有问题的地方,并且防止恶化
抽象点就是往水杯注水,满了后,就让后来的水溢出去
这个模式可以用来测试性能的瓶颈
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
func drop() {
const cap = 5
ch := make(chan string, cap)
go func() {
for p := range ch {
fmt.Println("employee : recv'd signal : ", p)
}
}()
const work = 20
for i := 0; i < work; i++ {
select {
case ch <- "paper":
fmt.Println("manager : sent signal : ", i)
default:
fmt.Println("manager : dropped data : ", i)
}
}
close(ch)
fmt.Println("manager sent shutdown signal")
}
|
cancellation 模式
超时机制,如果有这样的需求,应该使用 context 包来完成。
注意,在这个例子中要使用 有缓冲通道
否则有可能会泄露,假设现在使用无缓冲通道,而任务超时,主任务结束。
而子任务还卡在那,等待 channel 发送过去。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
func cancellation() {
ch := make(chan string, 1)
go func() {
time.Sleep(time.Duration(rand.Intn(150)) * time.Millisecond)
ch <- "paper"
fmt.Println("manager : sent signal ")
}()
tc := time.After(100 * time.Millisecond)
select {
case p := <-ch:
fmt.Println("manage : recv'd signal : ", p)
case t := <-tc:
fmt.Println("manager : timedout :", t)
}
time.Sleep(1 * time.Second)
fmt.Println("-------------end-------------")
}
|
代码案例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
// 同时最多运行 3 个 goroutine
// 需要启动大量 goroutine 执行任务
func m(){
data := make([]int,50) // 假设要处理的数据
ch := make(chan struct{},3)
var wg sync.Wait
for i:=0; i<len(data); i++{
wg.Add(1)
ch <- struct{}{}
go func(){
<- ch
defer wg.Done()
}()
// ...
}
wg.Wait()
close(ch)
}
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
// 同时最多运行 3 个 goroutine
// 仅用 3 个来执行任务,不需要创建更多 goroutine
func m(){
data := make([]int,50) // 假设要处理的数据
ch := make(chan int,len(data))
for _, v:= range data{
ch <- v
}
close(ch)
var wg sync.Wait
for i:=0; i<3; i++{
go func(){
defer wg.Done()
for v := ch {
}
}()
}
wg.Wait()
}
|