多协程执行任务,channel收集结果

利用WaiGroup,优雅关闭channel

需求:

使用 goroutine 和 channel 实现一个计算int64随机数各位数和的程序,例如生成随机数61345,计算其每个位数上的数字之和为19。

开启一个 goroutine 循环生成int64类型的随机数,发送到jobChan

开启24个 goroutine 从jobChan中取出随机数计算各位数的和,将结果发送到resultChan

主 goroutine 从resultChan取出结果并打印到终端输出

 1
 2package apitests
 3
 4import (
 5	"fmt"
 6	"math"
 7	"math/rand"
 8	"sync"
 9	"testing"
10	"time"
11)
12
13/*
14*
15
16使用 goroutine 和 channel 实现一个计算int64随机数各位数和的程序,例如生成随机数61345,计算其每个位数上的数字之和为19。
17开启一个 goroutine 循环生成int64类型的随机数,发送到jobChan
18开启24个 goroutine 从jobChan中取出随机数计算各位数的和,将结果发送到resultChan
19主 goroutine 从resultChan取出结果并打印到终端输出
20*
21*/
22
23var jobChan chan int64
24var resultChan chan int
25var wg sync.WaitGroup
26
27const needNum int = 10
28
29func TestInterface(t *testing.T) {
30	jobChan = make(chan int64, 4)
31	resultChan = make(chan int, 4)
32	wg.Add(1)
33	go func() {
34		defer wg.Done()
35		for i := 0; i < needNum; i++ {
36			jobChan <- MakeRand64Num()
37		}
38		close(jobChan)
39
40	}()
41	//
42	for i := 0; i < 24; i++ {
43		wg.Add(1)
44		go func() {
45			defer wg.Done()
46			for v := range jobChan {
47				resultChan <- RandInt64Count(v)
48			}
49		}()
50	}
51
52	go func() {
53		defer close(resultChan)
54		wg.Wait() //这个结束就代表,。结果肯定已经放进去过了,就可以关闭 resultChan
55	}()
56
57	//打印结果
58	for v := range resultChan {
59		fmt.Println("结果", v)
60	}
61
62}
63
64func MakeRand64Num() int64 {
65	seek := time.Now().UnixNano()
66	rand.Seed(seek)
67	r := int64(rand.Intn(math.MaxInt64))
68	fmt.Println("job :", r)
69	return r
70}
71
72func RandInt64Count(num int64) int {
73	if num < 0 {
74		num = -num
75	}
76	var res int
77	for num != 0 {
78		res += int(num % 10)
79		num = num / 10
80	}
81	return res
82}

ping-pong模式

 1package main
 2
 3import (
 4	"fmt"
 5	"time"
 6)
 7
 8func main() {
 9
10	var Ball int
11
12	table := make(chan int)
13
14	go player(table)
15	go player(table)
16
17	table <- Ball
18
19	time.Sleep(1 * time.Second)
20	<-table
21
22}
23func player(table chan int) {
24	for {
25		ball := <-table
26		fmt.Println(ball)
27		ball++
28		time.Sleep(100 * time.Millisecond)
29		table <- ball
30	}
31}

fan-in 模式

fan-in模式又叫扇入模式,意思是多个协程把数据写入到通道中,但只有一个协程等待读取通道数据。

 1package main
 2
 3import (
 4	"fmt"
 5	"time"
 6)
 7
 8func search(msg string) chan string {
 9	var ch = make(chan string)
10	go func() {
11		var i int
12		for {
13			ch <- fmt.Sprintf("get %s %d", msg, i)
14			i++
15			time.Sleep(100 * time.Millisecond)
16		}
17	}()
18	return ch
19}
20
21func main() {
22	ch1 := search("jonson")
23	ch2 := search("olaya")
24
25	for {
26		select {
27		case msg := <-ch1:
28			fmt.Println(msg)
29		case msg := <-ch2:
30			fmt.Println(msg)
31		}
32	}
33}

fan-out 模式

Fan-out模式通常会用在任务的分配中。比方说,程序消费Kafka、NATS等中间件的数据,多个协程就会监听同一个通道中的数据,读到数据后立即进行后续的处理,处理完毕后再继续读取,循环往复。

 1package main
 2
 3import (
 4	"fmt"
 5	"sync"
 6	"time"
 7)
 8
 9func worker(tasksCh <-chan int, wg *sync.WaitGroup) {
10	defer wg.Done()
11	for {
12		task, ok := <-tasksCh
13		if !ok {
14			return
15		}
16		d := time.Duration(task) * time.Millisecond
17		time.Sleep(d)
18		fmt.Println("processing task", task)
19	}
20}
21
22func pool(wg *sync.WaitGroup, workers, tasks int) {
23	tasksCh := make(chan int)
24
25	for i := 0; i < workers; i++ {
26		go worker(tasksCh, wg)
27	}
28
29	for i := 0; i < tasks; i++ {
30		tasksCh <- i
31	}
32
33	close(tasksCh)
34}
35
36func main() {
37	var wg sync.WaitGroup
38	wg.Add(36)
39	go pool(&wg, 36, 50)
40	wg.Wait()
41}

pipeline模式

pipeline模式即管道模式,指的是由通道连接的一系列连续的阶段,以类似流的形式进行计算。每个阶段是一组执行特定任务的协程,每个阶段的协程都会通过通道获取从上游传递过来的值,经过处理后,再把新的值发送给下游。

我们要计算 2*(2*number+1) 这串数字就可以用下面的方式实现。可以看到,multiply(v, 2) 首先被计算出来,计算的结果会紧接着被送入add函数中执行加1操作。之后,生成的结果将继续作为multiply函数的参数被处理。

 1package main
 2
 3import "fmt"
 4
 5func main() {
 6	multiply := func(value, multiplier int) int {
 7		return value * multiplier
 8	}
 9
10	add := func(value, additive int) int {
11		return value + additive
12	}
13
14	ints := []int{1, 2, 3, 4}
15	for _, v := range ints {
16		fmt.Println(multiply(add(multiply(v, 2), 1), 2))
17	}
18}

将上例的算术操作转换为pipeline模式的例子

 1package main
 2
 3import "fmt"
 4
 5func main() {
 6	generator := func(done <-chan interface{}, integers ...int) <-chan int {
 7		intStream := make(chan int)
 8		go func() {
 9			defer close(intStream)
10			for _, i := range integers {
11				select {
12				case <-done:
13					return
14				case intStream <- i:
15				}
16			}
17		}()
18		return intStream
19	}
20
21	multiply := func(
22		done <-chan interface{},
23		intStream <-chan int,
24		multiplier int,
25	) <-chan int {
26		multipliedStream := make(chan int)
27		go func() {
28			defer close(multipliedStream)
29			for i := range intStream {
30				select {
31				case <-done:
32					return
33				case multipliedStream <- i * multiplier:
34				}
35			}
36		}()
37		return multipliedStream
38	}
39
40	add := func(
41		done <-chan interface{},
42		intStream <-chan int,
43		additive int,
44	) <-chan int {
45		addedStream := make(chan int)
46		go func() {
47			defer close(addedStream)
48			for i := range intStream {
49				select {
50				case <-done:
51					return
52				case addedStream <- i + additive:
53				}
54			}
55		}()
56		return addedStream
57	}
58
59	done := make(chan interface{})
60	defer close(done)
61
62	intStream := generator(done, 1, 2, 3, 4)
63	pipeline := multiply(done, add(done, multiply(done, intStream, 2), 1), 2)
64
65	for v := range pipeline {
66		fmt.Println(v)
67	}
68}

周期性控制任务执行

一次分发5个任务,完成了,再分发任务。

 1package bingfa
 2
 3import (
 4	"fmt"
 5	"math/rand"
 6	"sync"
 7	"testing"
 8	"time"
 9)
10
11// 周期性控制任务执行,一次发放5个任务。
12
13func job() {
14	fmt.Println(rand.Intn(10))
15	time.Sleep(time.Second * 3)
16}
17
18//每次设置 5次任务
19func setTaskPool(ch chan struct{}) {
20	for i := 0; i < 5; i++ {
21		ch <- struct{}{}
22	}
23}
24
25//
26func TestTaskTimeControl(t *testing.T) {
27	wg := sync.WaitGroup{}
28	ch := make(chan struct{}, 5)
29	setTaskPool(ch)
30	wg.Add(5)
31	go func() {
32		for {
33			wg.Wait()
34			fmt.Println("发放了5个任务")
35			setTaskPool(ch)
36			wg.Add(5)
37		}
38	}()
39
40	for {
41		<-ch
42		go func() {
43			defer wg.Done()
44			job()
45		}()
46	}
47
48}