多协程执行任务,channel收集结果
利用WaiGroup,优雅关闭channel
需求:
使用 goroutine 和 channel 实现一个计算int64随机数各位数和的程序,例如生成随机数61345,计算其每个位数上的数字之和为19。
开启一个 goroutine 循环生成int64类型的随机数,发送到jobChan
开启24个 goroutine 从jobChan中取出随机数计算各位数的和,将结果发送到resultChan
主 goroutine 从resultChan取出结果并打印到终端输出
1
2package apitests
3
4import (
5 "fmt"
6 "math"
7 "math/rand"
8 "sync"
9 "testing"
10 "time"
11)
12
13/*
14*
15
16使用 goroutine 和 channel 实现一个计算int64随机数各位数和的程序,例如生成随机数61345,计算其每个位数上的数字之和为19。
17开启一个 goroutine 循环生成int64类型的随机数,发送到jobChan
18开启24个 goroutine 从jobChan中取出随机数计算各位数的和,将结果发送到resultChan
19主 goroutine 从resultChan取出结果并打印到终端输出
20*
21*/
22
23var jobChan chan int64
24var resultChan chan int
25var wg sync.WaitGroup
26
27const needNum int = 10
28
29func TestInterface(t *testing.T) {
30 jobChan = make(chan int64, 4)
31 resultChan = make(chan int, 4)
32 wg.Add(1)
33 go func() {
34 defer wg.Done()
35 for i := 0; i < needNum; i++ {
36 jobChan <- MakeRand64Num()
37 }
38 close(jobChan)
39
40 }()
41 //
42 for i := 0; i < 24; i++ {
43 wg.Add(1)
44 go func() {
45 defer wg.Done()
46 for v := range jobChan {
47 resultChan <- RandInt64Count(v)
48 }
49 }()
50 }
51
52 go func() {
53 defer close(resultChan)
54 wg.Wait() //这个结束就代表,。结果肯定已经放进去过了,就可以关闭 resultChan
55 }()
56
57 //打印结果
58 for v := range resultChan {
59 fmt.Println("结果", v)
60 }
61
62}
63
64func MakeRand64Num() int64 {
65 seek := time.Now().UnixNano()
66 rand.Seed(seek)
67 r := int64(rand.Intn(math.MaxInt64))
68 fmt.Println("job :", r)
69 return r
70}
71
72func RandInt64Count(num int64) int {
73 if num < 0 {
74 num = -num
75 }
76 var res int
77 for num != 0 {
78 res += int(num % 10)
79 num = num / 10
80 }
81 return res
82}
ping-pong模式
1package main
2
3import (
4 "fmt"
5 "time"
6)
7
8func main() {
9
10 var Ball int
11
12 table := make(chan int)
13
14 go player(table)
15 go player(table)
16
17 table <- Ball
18
19 time.Sleep(1 * time.Second)
20 <-table
21
22}
23func player(table chan int) {
24 for {
25 ball := <-table
26 fmt.Println(ball)
27 ball++
28 time.Sleep(100 * time.Millisecond)
29 table <- ball
30 }
31}
fan-in 模式
fan-in模式又叫扇入模式,意思是多个协程把数据写入到通道中,但只有一个协程等待读取通道数据。
1package main
2
3import (
4 "fmt"
5 "time"
6)
7
8func search(msg string) chan string {
9 var ch = make(chan string)
10 go func() {
11 var i int
12 for {
13 ch <- fmt.Sprintf("get %s %d", msg, i)
14 i++
15 time.Sleep(100 * time.Millisecond)
16 }
17 }()
18 return ch
19}
20
21func main() {
22 ch1 := search("jonson")
23 ch2 := search("olaya")
24
25 for {
26 select {
27 case msg := <-ch1:
28 fmt.Println(msg)
29 case msg := <-ch2:
30 fmt.Println(msg)
31 }
32 }
33}
fan-out 模式
Fan-out模式通常会用在任务的分配中。比方说,程序消费Kafka、NATS等中间件的数据,多个协程就会监听同一个通道中的数据,读到数据后立即进行后续的处理,处理完毕后再继续读取,循环往复。
1package main
2
3import (
4 "fmt"
5 "sync"
6 "time"
7)
8
9func worker(tasksCh <-chan int, wg *sync.WaitGroup) {
10 defer wg.Done()
11 for {
12 task, ok := <-tasksCh
13 if !ok {
14 return
15 }
16 d := time.Duration(task) * time.Millisecond
17 time.Sleep(d)
18 fmt.Println("processing task", task)
19 }
20}
21
22func pool(wg *sync.WaitGroup, workers, tasks int) {
23 tasksCh := make(chan int)
24
25 for i := 0; i < workers; i++ {
26 go worker(tasksCh, wg)
27 }
28
29 for i := 0; i < tasks; i++ {
30 tasksCh <- i
31 }
32
33 close(tasksCh)
34}
35
36func main() {
37 var wg sync.WaitGroup
38 wg.Add(36)
39 go pool(&wg, 36, 50)
40 wg.Wait()
41}
pipeline模式
pipeline模式即管道模式,指的是由通道连接的一系列连续的阶段,以类似流的形式进行计算。每个阶段是一组执行特定任务的协程,每个阶段的协程都会通过通道获取从上游传递过来的值,经过处理后,再把新的值发送给下游。
我们要计算 2*(2*number+1) 这串数字就可以用下面的方式实现。可以看到,multiply(v, 2) 首先被计算出来,计算的结果会紧接着被送入add函数中执行加1操作。之后,生成的结果将继续作为multiply函数的参数被处理。
1package main
2
3import "fmt"
4
5func main() {
6 multiply := func(value, multiplier int) int {
7 return value * multiplier
8 }
9
10 add := func(value, additive int) int {
11 return value + additive
12 }
13
14 ints := []int{1, 2, 3, 4}
15 for _, v := range ints {
16 fmt.Println(multiply(add(multiply(v, 2), 1), 2))
17 }
18}
将上例的算术操作转换为pipeline模式的例子
1package main
2
3import "fmt"
4
5func main() {
6 generator := func(done <-chan interface{}, integers ...int) <-chan int {
7 intStream := make(chan int)
8 go func() {
9 defer close(intStream)
10 for _, i := range integers {
11 select {
12 case <-done:
13 return
14 case intStream <- i:
15 }
16 }
17 }()
18 return intStream
19 }
20
21 multiply := func(
22 done <-chan interface{},
23 intStream <-chan int,
24 multiplier int,
25 ) <-chan int {
26 multipliedStream := make(chan int)
27 go func() {
28 defer close(multipliedStream)
29 for i := range intStream {
30 select {
31 case <-done:
32 return
33 case multipliedStream <- i * multiplier:
34 }
35 }
36 }()
37 return multipliedStream
38 }
39
40 add := func(
41 done <-chan interface{},
42 intStream <-chan int,
43 additive int,
44 ) <-chan int {
45 addedStream := make(chan int)
46 go func() {
47 defer close(addedStream)
48 for i := range intStream {
49 select {
50 case <-done:
51 return
52 case addedStream <- i + additive:
53 }
54 }
55 }()
56 return addedStream
57 }
58
59 done := make(chan interface{})
60 defer close(done)
61
62 intStream := generator(done, 1, 2, 3, 4)
63 pipeline := multiply(done, add(done, multiply(done, intStream, 2), 1), 2)
64
65 for v := range pipeline {
66 fmt.Println(v)
67 }
68}
周期性控制任务执行
一次分发5个任务,完成了,再分发任务。
1package bingfa
2
3import (
4 "fmt"
5 "math/rand"
6 "sync"
7 "testing"
8 "time"
9)
10
11// 周期性控制任务执行,一次发放5个任务。
12
13func job() {
14 fmt.Println(rand.Intn(10))
15 time.Sleep(time.Second * 3)
16}
17
18//每次设置 5次任务
19func setTaskPool(ch chan struct{}) {
20 for i := 0; i < 5; i++ {
21 ch <- struct{}{}
22 }
23}
24
25//
26func TestTaskTimeControl(t *testing.T) {
27 wg := sync.WaitGroup{}
28 ch := make(chan struct{}, 5)
29 setTaskPool(ch)
30 wg.Add(5)
31 go func() {
32 for {
33 wg.Wait()
34 fmt.Println("发放了5个任务")
35 setTaskPool(ch)
36 wg.Add(5)
37 }
38 }()
39
40 for {
41 <-ch
42 go func() {
43 defer wg.Done()
44 job()
45 }()
46 }
47
48}