《并发哲学:从编程入道到开悟升天》3.4 Golang实现并发的其他要素

Golang配合并发编程的其他要素(可选)

本章我们将罗列和介绍基于CSP并发原语体系内Golang其他关于控制并发编程的要素(或者称之为构建模块)。

CSP代表"Communicating Sequential Processes",它既是一种技术,也是引入它的论文的名称。 1978年,Charles Antony Richard Hoare在计算机械协会(又称为ACM)上发表了这篇论文。

在该论文中,Hoare认为输入和输出是两个被忽视的编程原语,特别是在并发代码中。在这之前,用于输入输出的要素往往被排除在编程体系之外。在Hoare撰写本文时,关于如何构造程序的研究仍在进行中,但大部分工作都是针对连续代码的技术:goto语句的使用正在讨论中,面向对象的思想开始萌发。并发并没有得到太多关注。 Hoare开始纠正这个问题,于是他的论文和CSP诞生了。

我们略去对CSP完整体系的介绍,这里直接介绍结论。简而言之,CSP是在建模领域真正系统化、程序化、标准化描述并发相关概念的原语。在准确性上,CSP构建的知识体系,比作者本人在本书中描绘的三大要素、两大模型,要精确的多啦。

正如CSP的英文直译过来所代表的意思:“顺序程序的交流”,套用在投掷者模型上,人按自己的计划干事,是一个顺序过程,石头按规律飞行,是一个顺序过程,我们把人投掷石头,称之为并发;套用在包工头模型上,包工头干自己的事情,是一系列顺序作业,工人干自己的事情,也是一系列顺序作业,我们把包工头分配工人作业,称之为并发。“顺序过程与顺序过程的交流是并发的表现形式”,这种描述概念和本书所介绍的“不能够并行作业的实体必须借助其他实体才可以实现并行作业的结果”背后是殊途同归的。

在3.3中,我们通过编程案例9--利用多数语言通用语法实现并发任务协调,介绍了不利用管道等golang独有的高级原语特性,仅用其他语言也能通用理解的元素构建和并发控制相关的体系,最终也是可以实现的。事实上,基于这种资源标记和信号标记进行共享,而进行并发控制的影子,在几乎所有完备的支持并发编程的语言内,都可以得到实现。总结他们的特点,均是基于内存访问同步进行通信,回顾在案例9内的代码组织吧,我们使用了大量的全局变量或者是由指针为参数(指针访问意味着保留原始内存特征)传入下游函数来作为同步单元。

即使早在1974年,Edgar Dijkstra(对,就是发明了Dijkstra算法的那个Dijkstra)的一篇论文“Guarded commands, nondeterminacy and formal derivation of programs”已经引入了“->”这样的符号代表信息的定向传送,构建守护变量来实现CSP内过程的信息同步,管道的思想已经成型,但直到Golang诞生,像Channel这样直观而彻底贯彻Dijkstra思想的设计语言才真正诞生。

内存访问同步本质上并不坏,叙述本小节内容也并不是说之前介绍的Channel(管道)/Select机制比下面的内容更加多么的高级。事实上正因为横跨多语言的易理解性,时至今日,利用内存访问同步机制进行并发控制在各类语言内都十分常见(另一方面说,其他语言并没有将Dijkstra控制并发的方式具体化),在接下来的编程内容介绍内,我想有其他计算机编程语言基础的你一定不会陌生,例如读写锁、信号量等概念正是源自于此。

了解了业内关于并发体系CSP的起源,我们接下来将介绍Golang并发编程内用于兼容广泛的内存访问同步机制下的并发原语。请注意,这些原语(或者说是构建要素)对于并发控制来说不是必须的!,但我们强烈推荐你学习。

Sync包

Golang有一个单独的包,名为Sync,该包内包含了下列对于低级别内存访问同步最有用的并发原语:

  • WaitGroup
  • Mutex和RWMutex
  • Cond
  • Once
  • Pool

这些并发原语都是基于内存访问同步,为并发控制而服务。也许你在其他语言内也已经了解诸如RWMutex等读写锁的具体实现,但Golang与这些语言的区别在于,Golang在内存访问同步基元的基础上构建了一组新的并发基元,并为使用者提供扩展的内容,因而如果有需要,我们还是强烈建议你看完完整的本小节内容,并在你认为合适的时候使用这些工具。

WaitGroup

WaitGroup你可以理解他为一个内存安全的并发计数器。如果在并发编程设计中,你不关心包工头分配出去的任务执行的结果,或者你有其他的方式可以收集结果,单纯的只是做一个计数,那么Waitgroup将发挥很大的作用。

总的说来,WaitGroup基本用法包括Add(x),Done()和Wait(),功能如名称,这三个函数分别代表:计x个数,完成一个数,等待。就好比包工头分配任务,每分配一个人出去干活,就记录一次,分配三个人出去干活,就记录三次,每当有人干完活的时候,减去一次,手里记录的次数为0,就代表自己派出去别人干的活都已经取得了结果。来看下面的编程实例:

编程案例11--WaitGroup演示
package main

import (
    "fmt"
    "sync"
    "time"
)

var wg sync.WaitGroup //声明一个WaitGroup的全局变量,确保所有需要该变量的地方都可以访问到

func main(){
    wg.Add(2) //即将发配2个工人去干活
    go func() {
        defer wg.Done() //工人干完活,计数减1
        fmt.Println("我出去干活了。。。")
        time.Sleep(1)
    }()

    go func() {
        defer wg.Done() //工人干完活,计数减1
        fmt.Println("我出去干活了。。。")
        time.Sleep(2)
    }()

    wg.Wait() //等待工人干完活
    fmt.Println("所有工人都干完活儿了。")
}
编程案例11输出
我出去干活了。。。
我出去干活了。。。
所有工人都干完活儿了。

请记住,上述代码中的Add()等一系列与WaitGroup相关的操作,均是一种额外的记录,就好比一个独立的小本本,包工头通过观察小本本的变化情况,来决定下一步的工作安排,但工作状态本身的变化并不会自动触发小本本的记录变更。只有任务完成后,有一个人对“小本本”的变化也加上合理安排,流程才能够正常进行。如果在上述代码内,设计的Add的数量和Done的数量不一致,如果Add的数量大于Done的数量,程序将一直在Wait处阻塞

注:利用带缓冲管道,同样可以实现Waitgroup的作用,感兴趣的你可以自行尝试。

Mutex和RWMutex

如果你学过操作系统相关知识,对于Mutex作用一定不会陌生。他代表着锁"mutual exclusion(互斥)",这是一种对共享资源的独占标记。就好比厕所的一个坑位相对于上厕所的人来说,是互斥的。我们先来看下面的例子,在下面的例子中,两个协程(goroutine)试图增加和减少一个公共值,我们用Mutex来实现互斥访问:

编程案例12--Mutex演示上厕所
package main

import (
    "fmt"
    "sync"
)

var lock sync.Mutex //代表厕所的锁
var shit int

func main(){
    //包工头先后安排A、B去上厕所,但实际上A和B都有可能先到达厕所

    go func() {
        defer lock.Unlock() //确保A离开时一定会解开锁
        fmt.Println("A尝试进门")
        lock.Lock()//无论是A还是B,运行到此处如果厕所的锁已经被锁上,则会阻塞到锁处于解开状态
        fmt.Println("A进门并上锁成功!开始如厕")
        shit+=1
        fmt.Println("A如厕完毕")
    }()

    go func() {
        defer lock.Unlock()
        fmt.Println("B尝试进门")
        lock.Lock()
        fmt.Println("B进门并上锁成功!")
        shit+=1
        fmt.Println("B如厕完毕")
    }()

    select {}
}
编程案例12输出
B尝试进门
B进门并上锁成功!
B如厕完毕
A尝试进门
A进门并上锁成功!开始如厕
A如厕完毕
fatal error: all goroutines are asleep - deadlock!

在此次的运行内,看来是B先到达厕所抢到了首发权。正如过程演示可见,通过对共享量的加锁,两个分立过程做到了执行上的原子有序,感兴趣的读者可以尝试“把锁拆开”,也就是移除所有本实例内和Mutex有关的内容,运行后,你将发现有可能A正在如厕的时候,B也进门成功,开始如厕,哦,这太混乱了!

现在,可以确保A或者B一个人静静安排完毕后,另外一个人才可以正常如厕。

注:使用无缓冲管道,可以替代Mutex。

而RWMutex,则比较特殊,拥有该标记的资源,对读写操作来说,只要没有别的东西占用写操作,任意数量的读取者就可以进行读取操作。还是用上厕所来类比,在Mutex作用下,即使是单纯的看一看坑位里面的情况,即使坑位里没人,一次也只能一个人看,而RWMutex则比较符合实际:如果坑位里没人,则可以好几个人同时看坑位里是个什么情况。

编程案例13--RWMutex演示
package main

import (
    "fmt"
    "sync"
    "time"
)

var lock sync.RWMutex
var shit int

func main(){
    for i:=1;i<10;i++{
        go func(seq int) {
            defer lock.RUnlock()
            time.Sleep(1*time.Second)
            fmt.Println("其他人",seq,"号尝试看一看")
            lock.RLock()//如果其他人也在看,不影响。但有人正在上厕所,则无法继续看。
            fmt.Println("其他人",seq,"号看到了有",shit,"坨")
        }(i)
    }
    go func() {
        defer lock.Unlock() //确保A离开时一定会解开锁
        time.Sleep(1*time.Second)
        fmt.Println("A尝试进门")
        lock.Lock()//无论是A还是B,运行到此处如果厕所的锁已经被锁上或正在被人看(加了RLock),则会阻塞到锁处于解开状态(上厕所的人结束上厕所或者看厕所的人不看了)
        fmt.Println("A进门并上锁成功!开始如厕")
        shit+=1
        fmt.Println("A如厕完毕")
    }()
    go func() {
        defer lock.Unlock()
        time.Sleep(1*time.Second)
        fmt.Println("B尝试进门")
        lock.Lock()
        fmt.Println("B进门并上锁成功!开始如厕")
        shit+=1
        fmt.Println("B如厕完毕")
    }()
    select {}
}
编程案例13输出
fatal error: all goroutines are asleep - deadlock!
其他人 1 号尝试看一看
其他人 1 号看到了有 0 坨
其他人 5 号尝试看一看
其他人 2 号尝试看一看
其他人 2 号看到了有 0 坨
其他人 3 号尝试看一看
其他人 5 号看到了有 0 坨
其他人 4 号尝试看一看
其他人 4 号看到了有 0 坨
其他人 9 号尝试看一看
其他人 9 号看到了有 0 坨
其他人 8 号尝试看一看
其他人 8 号看到了有 0 坨
B尝试进门
其他人 6 号尝试看一看
其他人 3 号看到了有 0 坨
A尝试进门
其他人 7 号尝试看一看
B进门并上锁成功!开始如厕
B如厕完毕
其他人 6 号看到了有 1 坨
A进门并上锁成功!开始如厕
A如厕完毕
其他人 7 号看到了有 2 坨

由输出结果可以看到,当之前已经有人尝试看一看的时候(添加了RLock),上厕所的人尝试进门也需要阻塞到那些看厕所的人看完(RUnlock)。而在A和B如厕的过程中,则不会有其他人来看,因为他们需要等待A或者B再释放锁(Unlock)才可以。想必而知,在面对大量读少量写的情况下,我们一般用RWMutex来处理资源共享安全问题。

Cond

Cond实现了一个条件变量,用于等待或宣布事件发生时协程的交汇点。这样阐述可能比较抽象,简单说来,Cond提供了一种方式,可以让需要等待某种条件的协程进入休眠状态,由其他协程触发唤醒。在休眠过程中,休眠的协程可以放弃掉之前占用的运行资源(线程等)。Cond通过这种优雅的方式实现被动触发。

我们在回过头来看编程案例9,如果要让某个协程在确保条件为真的情况下再继续运行(比如说根据信号量决定下一步操作),粗暴的方法就如案例内使用无限循环。抽象来看可以总结成下面的伪代码:

for 判定条件() == false {
    //条件如果为假,则一直用无限循环的方式阻塞
}
//如果条件为真,则会跳出循环,继续执行下面的代码
做点什么()

但是这样的方式有一个问题,循环的执行询问条件是否为真,本身也需要占用运行资源。就好比,后厨开始炒菜一定需要前台派单,后厨不停的询问前台是否有人来点单,无疑对前台正常工作也会有影响———这和这类协程无限轮询也要占用运行时资源一样。

有没有一种办法,可以让前台接单后喊醒睡觉的后厨?Cond可以做到,来看如下的代码:

编程案例14--Cond演示
package main

import (
    "fmt"
    "sync"
)

var IfHasOrder = false//请注意,这是一个共享信号量,不同协程对它的操作需要加锁,利用Mutex实例化后的Cond内部提供了锁机制
var c = sync.NewCond(&sync.Mutex{})

func main(){
    go QianTai()//前台开始工作
    go HouChu()//后台开始工作
    select {}
}

func HouChu(){
    c.L.Lock() //马上要对共享信号量进行查询操作,因而需要加锁
    for IfHasOrder == false { //后厨每次都会检查是否有订单
        c.Wait() // 调用Wait进入休眠状态,如有必要,请在其他协程唤醒
    }
    fmt.Println("后厨:好,开始做饭啦!")
    c.L.Unlock()
}

func QianTai(){
    c.L.Lock() //马上要对共享信号量进行写入操作,因而需要加锁
    IfHasOrder = true
    fmt.Println("前台:来订单了!")
    c.L.Unlock()
    c.Signal() // 利用Signal唤醒后厨
}
编程案例14输出
前台:来订单了!
后厨:好,开始做饭啦!
fatal error: all goroutines are asleep - deadlock!

如果你对于本小节互斥锁有充分实践,那么你一定会有疑问,在后厨进入到休眠状态时,此时明面上的c.L处于加锁状态,按道理,在前台准备接受订单的时候,在申请加锁应该会被拒绝——锁是互斥的。难道这里的互斥锁和之前Mutex内容内介绍的互斥锁不一致?答案是否定的,奥秘在于,Cond在调用Wait()时,默默调用了Unlock(),而从Wait状态被唤醒时,又默默调用了Lock()。代码评审的时候应尤其注重这一点!

在本案例中,后厨检查是否有订单用的是for循环,这意味着如果被唤醒后,后厨会再次检验订单是否真的有了,如果还是没有,那么后厨又会睡觉。如果不想让后厨自己再检查一遍,感兴趣的读者可以把对应的for给修改成if,感受两者的差异。

现在你明白了,Cond代表的是一种唤醒机制,类似于基于IO操作被动触发的select机制。那么问题来了,如果多个后厨都在睡觉,前台用Signal()尝试唤醒,所有的后厨都会被唤醒吗?这里明确的告诉你,答案是否定的。Signal只会成功唤醒睡得时间最久的协程。也就是说,睡得最久的后厨才会被唤醒。

那么有没有一种方式,发出的信号可以让所有后厨都听见呢?如果把Signal比作靠近身体拍一拍拍醒,那么有必要还应该有一个大喇叭把所有人都喊醒的机制,这就是Brocast。如果将上述案例中的c.Signal()换成c.Brocast(),所有沉睡的后厨都将被唤醒!Brocast,广播,字如其名,这不就是大喇叭嘛。

Once

领导安排大家去预定会议室,这肯定是一个并发过程,这同时也是一个“预定会议室成功”只需要执行一次的过程,任何一个下属预定成功并且会议室符合规定,那么其他人的预定就没有必要了——评优评先就看谁起劲啦。

Once确保了多个人可以同时执行一个任务,但任意一个人执行成功,他人便会放弃执行。来看下面的例子:

编程案例15--Once演示员工预定会议室
package main

import (
    "fmt"
    "sync"
)

func main(){
    var once sync.Once

    fmt.Println("老板:你们三个饭桶给我去订会议室!")
    for i := 0; i < 3; i++ {
        go func() {
            fmt.Println("好!我将尝试预定会议室。")
            once.Do(Order)
        }()
    }
    select{}
}

func Order(){
    fmt.Println("画外音:有人预定了会议室!")
}
编程案例15输出
老板:你们三个饭桶给我去订会议室!
好!我将尝试预定会议室。
画外音:有人预定了会议室!
好!我将尝试预定会议室。
好!我将尝试预定会议室。
fatal error: all goroutines are asleep - deadlock!

由输出结果可得,三个员工都前往去预定会议室了。会议室已经被预定,其他再尝试预定会议室的员工,看到会议室被预定,直接跳过了该步骤。

和其他的sync包内原语一样,Once也是对任务调度过程中抢占式仅需一次执行任务的抽象,本身并没有具体任务的含义,语义需要由后面的函数具体实现。重点在于,sync.Once对Do的计数只对初始化出来的这个对象有用,换言之,sync.Once好比声明了一个水果,但是这个水果究竟是苹果还是梨,需要进一步说明。程序只关心这个水果是不是被执行了一次,而不关心这个水果究竟是苹果还是梨。

检验你是否能理解这一点,我们来看下面的代码片段:

var count int
increment := func() { count=3 }
decrement := func() { count=5 }

var once sync.Once
once.Do(increment)
once.Do(decrement) //decrement和increment虽然是两个不同的任务,但是once已经Do过一次了呀

如果你能得出代码执行后count=3的结论,说明你已经真正理解我们所说的概念了。

Pool

Pool是对象池模式的并发安全实现。对象池往往是一种灵活的资源池化,为什么说它灵活呢?主要体现在,这种对象池可以在需要时检索池中有无对象,没有则创建而不会阻塞,如果有则直接使用。在Pool中,我们调用Get()方法尝试获取对象,如果池中没有对象,则会按照Pool内实现的New方法创建对象。如果有对象,则获取对象。考虑下面的例子:

myPool := &sync.Pool{
    New: func() interface{} { //Pool内应实现New方法,来应对如果池内没有对象的措施
        fmt.Println("创建.")
        return struct{}{}
    },
}

myPool.Get()             //1
instance := myPool.Get() //2
myPool.Put(instance)     //3
myPool.Get()             //4

上述代码,运行结果如下

创建.
创建.

分析代码,在1处我们尝试Get,但是池内没有对象,于是调用New。但是,1创造了的对象,并没有被保留,因而,在2处我们再次尝试Get,又创造了对象。但是,这次创建后,我们用instance送入Put,添加到池中,因而在下次Get时,池中已经有对象。所以,我们在1,3,4处调用了Get,但只执行了两次,就是如此。

不过,为什么在上面的小例子内,1创造的对象没有被保留呢?答案在于,Golang拥有垃圾回收机制,所有实例化的对象都将在适当的时候被自动清理。如果把程序比作一个酒吧,一般的流程是需要从吧台拿杯子,在桌子上喝酒。当你喝完的酒杯,不打招呼,被子就会被打扫卫生的大妈给收走。当你下次需要再喝酒的时候,又需要到吧台拿杯子喝酒。

而Pool的意义就在于,如果通过Put添加到池内的对象,则就好比跟打扫卫生的大妈打了招呼:“这个杯子你别给我收走!”,从而,就像高速缓存一样,你可以不用往返这么多次吧台,用自己桌子上的酒杯几乎不间断的喝酒了。

别忘了,当你用Get取到了Pool已有的一个对象,记得在使用完毕后再使用Put将对象放入池中。

编程案例16--用Pool预申请资源构建高速缓存
package main

import (
    "fmt"
    "sync"
)

func main(){
    var CallNum int
    calcPool := &sync.Pool{
        New: func() interface{} {
            CallNum += 1
            return "杯子" // 1
        },
    }

    // 提前申请了4个杯子,利用Put确保杯子不会被大妈收走
    calcPool.Put(calcPool.New())
    calcPool.Put(calcPool.New())
    calcPool.Put(calcPool.New())
    calcPool.Put(calcPool.New())
    const Drinkers = 1024 //1024个人围着一个桌子喝酒,他们尽可能用这四个杯子
    var wg sync.WaitGroup
    wg.Add(Drinkers)

    for i := Drinkers; i > 0; i-- {
        go func() {
            defer wg.Done()
            mem := calcPool.Get() // 2
            defer calcPool.Put(mem)
        }()
    }

    // 由于我们最后需要统计酒保总共被打扰了多少次,因而我们不再用select卡死,而转用WaitGroup
    wg.Wait()
    fmt.Printf("酒保被打扰了%d次", CallNum)
}
编程案例16输出
酒保被打扰了4次

如果不使用Pool的资源留置功能,这些人围绕着酒桌喝酒,大妈都会在杯子空闲的时候想办法把杯子收走。这样,在最坏的情况下,人们需要打扰1024次酒保。酒保小哥得知,一定会狠狠把大妈呵斥一顿,但却无可奈何,因为顾客就是上帝。实验证明,尽管在池内资源不够的时候可以允许再申请,但是,在上述案例内的调度量级内,1024个人围绕着桌子用4个酒杯交替喝酒是完全可行的。这也暗示着,日常生活调度中很多资源还有极大的复用空间。

Pool经常用于高速缓存场景,在今后的案例中我们将可能再次见到他。

总结

再次声明,sync包内的工具,都可以用其他基本原语实现。仅从CSP要求的各类场景出发,sync包内的工具一个个都各显神通,在各自擅长对应的具体场景,做到了作用精简,编码优雅。简要说来,我们总结各类场景下需要用到sync包内的工具特点如下:

  • 对并发任务进行分配计数、标记完成计数,并基于上面两个计数关系决定进一步任务走向,使用WaitGroup
  • 对需要互斥访问的变量,使用Mutex和RWMutex
  • 对需要抢占执行,但只需要执行一次的任务,使用Once
  • 实现优雅的通知唤醒,使用Cond
  • 实现高效安全的对象池,使用Pool

也希望你能通过上述有关分配工人干活、上厕所、预定会议室、前台与后厨、酒吧喝酒五个相关的生活场景例子进一步加深理解,同时也体会并发建模源于生活,指导生活的内在哲学。

全部评论

相关推荐

评论
点赞
收藏
分享

创作者周榜

更多
牛客网
牛客网在线编程
牛客网题解
牛客企业服务