Channels 模式

通道模式

不能仅通过增加缓存区来提升性能,不要认为把缓冲区设置的非常大就可以解决性能问题。应该将缓冲区尽量设置的小一些,尽量把延迟降低。

缓冲区大小不能胡乱设置成 10000,缓冲区不一定能够提升性能!!不要指望缓冲区大小来提升程序性能,我们要做的是降低发送与接收操作可能产生的延迟,

option open close
send ok no
recive ok ok

面对并发代码时,凭借打印语句,没办法判断哪个发生在前。因为打印不是原子操作。

通道模式

waitForTask - 三个基本模式之一

经理指派任务给员工,经理去做别的事情,同时随时可以叫停或修正员工的任务。

可以实现 Pooling 模式

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
func waitForTask() {
	ch := make(chan string)

	go func() {
		p := <-ch
		fmt.Println("recv'd signal : ", p)
	}()

	time.Sleep(500 * time.Millisecond)
	ch <- "paper"
	fmt.Println("manager : sent signal")

	time.Sleep(time.Second)
	fmt.Println("-------------end-------------")
}

waitForResult - 三个基本模式之一

经理等待员工完成他的工作任务,再继续往下走。

可以实现 Drop 模式和 FanOut 模式

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
func waitForResult() {
	ch := make(chan string)
	go func() {
		time.Sleep(500 * time.Millisecond)
		ch <- "paper"
		fmt.Println("employee : sned signal")
	}()
	p := <-ch
	fmt.Println("manage : recv'd signal : ", p)

	time.Sleep(1 * time.Second)
	fmt.Println("-------------end-------------")
}

waitForFinished - 三个基本模式之一

其实用 waitGroup 更好,通过它对 goroutine 编组会更加清晰。

经理请员工做事,员工已经知道自己的任务。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
func waitForFinished() {
	ch := make(chan struct{})
	go func() {
		time.Sleep(500 * time.Millisecond)
		close(ch)
		fmt.Println("employee : sned signal")
	}()

	_, ok := <-ch
	fmt.Println("manage : recv'd signal : ", ok)

	time.Sleep(1 * time.Second)
	fmt.Println("-------------end-------------")
}

Pooling 模式

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
func pooling() {
	ch := make(chan string)
	const emps = 2
	for i := 0; i < emps; i++ {
		go func(emp int) {
			for p := range ch {
				fmt.Printf("employee %d : recv'd signal : %s\n", emp, p)
			}
			fmt.Printf("employee %d : recv'd signal\n", emp)
		}(i)
	}

	const work = 10
	for i := 0; i < work; i++ {
		ch <- "paper" + strconv.Itoa(i)
		fmt.Println("manager : sent signal : ", i)
	}

	close(ch)
	fmt.Println("manage : recv'd signal end ")

	time.Sleep(1 * time.Second)
	fmt.Println("-------------end-------------")
}

Fan out 模式

很危险,可能会导致程序中的 goroutine 迅速增长

对于隔一段时间执行的定时任务,和命令行工具来说,这种模式很合适。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
// 允许任意数量的 goroutine 执行
func fanOut() {
	emps := 20
	ch := make(chan string, emps)
	for i := 0; i < emps; i++ {
		go func(i int) {
			time.Sleep(200 * time.Millisecond)
			ch <- "paper" + strconv.Itoa(i)
			fmt.Println("manager : sent signal : ", i)
		}(i)
	}

	for emps > 0 {
		p := <-ch
		fmt.Println(p)
		fmt.Println("manage : recv'd signal : ", emps)
		emps--
	}

	time.Sleep(2 * time.Second)
	fmt.Println("-------------end-------------")
}

fanoutSemaphore 模式

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// 限制同时执行的 goroutine 数量
func fanoutSemaphore() {
	emps := 20
	ch := make(chan string, emps)

	const cap = 5
	sem := make(chan struct{}, cap)

	for i := 0; i < emps; i++ {
		go func(i int) {
			sem <- struct{}{}
			{
				time.Sleep(200 * time.Millisecond)
				ch <- "paper" + strconv.Itoa(i)
				fmt.Println("manager : sent signal : ", i)
			}
			<-sem
		}(i)
	}

	for emps > 0 {
		p := <-ch
		fmt.Println(p)
		fmt.Println("manage : recv'd signal : ", emps)
		emps--
	}
}

drop 模式

drop 尽快发现有问题的地方,并且防止恶化 抽象点就是往水杯注水,满了后,就让后来的水溢出去 这个模式可以用来测试性能的瓶颈

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
func drop() {
	const cap = 5
	ch := make(chan string, cap)

	go func() {
		for p := range ch {
			fmt.Println("employee : recv'd signal : ", p)
		}
	}()

	const work = 20
	for i := 0; i < work; i++ {
		select {
		case ch <- "paper":
			fmt.Println("manager : sent signal : ", i)
		default:
			fmt.Println("manager : dropped data : ", i)
		}
	}

	close(ch)
	fmt.Println("manager sent shutdown signal")
}

cancellation 模式

超时机制,如果有这样的需求,应该使用 context 包来完成。

注意,在这个例子中要使用 有缓冲通道 否则有可能会泄露,假设现在使用无缓冲通道,而任务超时,主任务结束。 而子任务还卡在那,等待 channel 发送过去。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
func cancellation() {
	ch := make(chan string, 1)

	go func() {
		time.Sleep(time.Duration(rand.Intn(150)) * time.Millisecond)
		ch <- "paper"
		fmt.Println("manager : sent signal ")
	}()

	tc := time.After(100 * time.Millisecond)

	select {
	case p := <-ch:
		fmt.Println("manage : recv'd signal : ", p)
	case t := <-tc:
		fmt.Println("manager : timedout :", t)
	}
	time.Sleep(1 * time.Second)
	fmt.Println("-------------end-------------")
}

代码案例

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
// 同时最多运行 3 个 goroutine
// 需要启动大量 goroutine 执行任务
func m(){
  data := make([]int,50) // 假设要处理的数据
  ch := make(chan struct{},3)
  
  var wg sync.Wait
  for i:=0; i<len(data); i++{
    wg.Add(1)
    ch <- struct{}{}
    go func(){
      <- ch
      defer wg.Done()
    }()
    // ...
  }
  wg.Wait()
  close(ch)
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
// 同时最多运行 3 个 goroutine
// 仅用 3 个来执行任务,不需要创建更多 goroutine
func m(){
  data := make([]int,50) // 假设要处理的数据
  ch := make(chan int,len(data))
  for _, v:= range data{
    ch <- v
  }
  close(ch)
  
  var wg sync.Wait
  for i:=0; i<3; i++{
    go func(){
      defer wg.Done()
      for v := ch {
      }
    }()
  }
  wg.Wait()
}
Licensed under CC BY-NC-SA 4.0
本文阅读量 次, 总访问量 ,总访客数
Built with Hugo .   Theme Stack designed by Jimmy