简析go语言并发模式(2)
4)管道模式:多个数据处理环节,每个数据处理环节都由一组功能相同的goroutine完成,每个数据处理环节的goroutine都要从数据输入channel获取上一个数据处理环节生产的数据,然后对这些数据进行处理并将处理后的结果通过数据输出channel发往下一个环节。
import "fmt"
func generatorNums(start, stop int) <-chan int {
c := make(chan int)
go func() {
for i := start; i <= stop; i++ {
c <- i
}
close(c)
}()
return c
}
func filterEven(n int) (int, bool) {
if n%2 != 0 {
return 0, false
}
return n, true
}
func spawn(f func(n int) (int, bool), in <-chan int) <-chan int {
out := make(chan int)
go func() {
for v := range in {
r, ok := f(v)
if ok {
out <- r
}
}
close(out)
}()
return out
}
func main() {
in := generatorNums(0, 10)
out := spawn(filterEven, in)
for v := range out {
fmt.Println(v)
}
}
go run go-concurrency-by-channel-pattern.go
0
2
4
6
8
10
上述例子为过滤出一组整数中的偶数,共有三个处理环节:
1.第一个环节生产数据序列,这个序列由generatorNums创建的goroutine负责生成并发送到输出channel中,在序列全部发送完毕后该goroutine关闭channel并退出。
2.第二个环节是过滤偶数,由spawn函数创建的goroutine从第一个环节的输出channel中读取数据,并交由filterEven函数处理,如果是偶数,则发送到该goroutine的输出channel中,在全部数据处理并发送完毕后,该goroutine关闭channel并退出。
3.第三个环节是将序列数据输出到控制台,main goroutine从第二个环节的输出channel中读取数据,并将数据打印输出到控制台上,在全部数据处理完后main goroutine退出。
管道模式具有良好的可扩展性,如果在上例基础上增加对偶数进行平方运算(square),可以向下面这样扩展管道:
import "fmt"
func generatorNums(start, stop int) <-chan int {
c := make(chan int)
go func() {
for i := start; i <= stop; i++ {
c <- i
}
close(c)
}()
return c
}
func filterEven(n int) (int, bool) {
if n%2 != 0 {
return 0, false
}
return n, true
}
func square(n int) (int, bool) {
return n * n, true
}
func spawn(f func(n int) (int, bool), in <-chan int) <-chan int {
out := make(chan int)
go func() {
for v := range in {
r, ok := f(v)
if ok {
out <- r
}
}
close(out)
}()
return out
}
func main() {
in := generatorNums(0, 10)
out := spawn(square, spawn(filterEven, in))
for v := range out {
fmt.Println(v)
}
}
go run go-concurrency-by-channel-pattern.go
0
2
4
6
8
10
Go语言基础及实战 文章被收录于专栏
Go语言学习笔记、语法知识、技术要点和个人理解及实战
查看8道真题和解析