简析go语言并发模式(3)
基于管道模式的扩展模式:
1.扇出模式:在某个处理环节,多个功能相同的goroutine从同一个channel读取数据并处理,直到该channel关闭。使用扇出模式可以在一组goroutine中均衡分配工作量,从而更均衡的利用CPU。
2.扇入模式:在某个处理环节,处理程序面对不止一个输入channel,把所有输入channel汇聚到一个统一的输入channel,然后处理程序从这个channel中读取数据并处理,直到该channel因所有输入channel关闭而关闭。
import (
"fmt"
"sync"
)
func newNumGenerator(start, stop int) <-chan int {
c := make(chan int)
go func() {
for i := start; i <= stop; i++ {
c <- i
}
close(c)
}()
return c
}
func filterOdd(in int) (int, bool) {
if in%2 != 0 {
return in, true
}
return 0, false
}
func square(in int) (int, bool) {
return in * in, true
}
func spawnGroup(num int, f func(int) (int, bool), in <-chan int) <-chan int {
groupOut := make(chan int)
var outSlice []chan int
// 创建多个goroutine从同一个channel in中读取数据,并将读取到的数据存放在outSlice中:扇出模式
for i := 0; i < num; i++ {
out := make(chan int)
go func(i int) {
for v := range in {
r, ok := f(v)
if ok {
out <- r
}
}
close(out)
}(i)
outSlice = append(outSlice, out)
}
// 将outSlice中的多个channel汇聚到一个channel groupOut中:扇入模式
go func() {
var wg sync.WaitGroup
for _, out := range outSlice {
wg.Add(1)
go func(out chan int) {
for v := range out {
groupOut <- v
}
wg.Done()
}(out)
}
wg.Wait()
close(groupOut)
}()
return groupOut
}
func main() {
in := newNumGenerator(0, 20)
out := spawnGroup(3, square, spawnGroup(3, filterOdd, in))
for i := range out {
fmt.Println(i)
}
}
go run go-concurrency-pattern.go
1
9
81
25
49
121
225
169
361
289
Go语言基础及实战 文章被收录于专栏
Go语言学习笔记、语法知识、技术要点和个人理解及实战

