Goroutine

Go语言从语言层面上就支持了并发,这与其他语言大不一样。Go语言中有个概念叫做goroutine,这类似我们熟知的线程,但是更轻。

进程、线程、协程

进程和线程 进程是程序在操作系统中的一次执行过程,系统进行资源分配和调度的一个独立单位。 线程是进程的一个执行实体,是CPU调度和分派的基本单位,它是比进程更小的能独立运行的基本单位。 一个进程可以创建和撤销多个线程,同一个进程中的多个线程之间可以并发执行。 所以程序的类型可以分为以下几种:

  • 一个进程,它只有一个线程,就是单线程程序
  • 一个进程,它又多个线程,就是多线程程序
  • 一个进程,它可能还会fork多个子进程,就是多进程程序

并发和并行的区别

  • 多线程程序在单核的cou上运行,这是并发(concurrency)。
  • 多线程程序在多个核的cpu上运行(真正的同时运行),这才是并行(parallelism)。

并发,在微观上,任意时刻只有一个程序在运行。因为线程已经是CPU调度的最小单元,一个CPU一次只能处理一个线程。但是宏观上这些程序时同时在那里执行的,所以这个只是并发。 所以在python里,貌似讲的都是高并发,似乎没听过并行的概念。

协程和线程 协程,独一的栈空间,共享堆空间,调度由用户自己控制。本质上类似于用户级线程,这些用户级线程的调度也是自己实现的。 线程,一个线程上可以跑多个协程,协程是轻量级的线程。

goroutine 调度模型

Go的调度器模型:G-P-M模型。

  • G代表goroutine,它通过go关键字调用runtime.newProc创建。
  • P代表processer,可以理解为上下文。
  • M表示machine,可以理解为操作系统的线程。

设置Golang运行的cpu核数 设置当前的程序运行在多少核上,下面的例子是获取CPU的核数,然后运行在所有核上:

package main import ( "fmt" "runtime" ) func main() { num := runtime.NumCPU() runtim.GOMAXPROCS(num) fmt.Println(num) } 

上面P的数目就是这里GOMAXPROCS设置的数目,通常设置为CPU核数。 1.8版本以上的Golang,是不需要做上面的设置的,默认就是运行在所有的核上。当然还是可以设置一下,比如限制只能使用多少核。

goroutine的示例:

package main import ( "fmt" "time" ) func example() { var i int for { fmt.Println(i) i++ time.Sleep(time.Millisecond * 30) } } func main() { go example() // 起一个goroutine var j int for j > -100 { fmt.Println(j) j-- time.Sleep(time.Millisecond * 100) } fmt.Println("运行结束") } 

Channel

不同goroutine之间要进行通讯,有下面2种方法:

  • 全局变量和锁同步
  • Channel

先讲管道(channel),然后讲 goroutine 和 channel 结合的一些用法。 这篇的channel可以参考下: https://www.jianshu.com/p/24ede9e90490

全局变量的实现示例

在下面的例子里定义了变量 m 来实现goroutine之间的通讯:

package main import ( "fmt" "time" "sync" ) var ( m = make(map[int]uint64) lock sync.Mutex ) type task struct { n int } func calc(t *task) { var res uint64 res = 1 for i := 1; i <= t.n; i++ { res *= uint64(i) } lock.Lock() m[t.n] = res // 变量m用来存放结果,这样主线程里就能拿到m的值,操作要加锁 lock.Unlock() } func main() { for i := 0; i < 100; i++ { t := &task{i} go calc(t) } for j := 0; j < 10; j++ { fmt.Printf("\r已运行%d秒", j) time.Sleep(time.Second) } fmt.Println("\r运行完毕,输出结果:") lock.Lock() for k, v := range m { if v != 0 { fmt.Printf("%d! = %v\n", k, v) } } lock.Unlock() } 

channel 概念

channel的概念如下:

  • 类型Unix中的管道(Pipe)
  • 先进先出
  • 线程安全,多个goroutine同时访问,不需要加锁
  • channel是有类型的,一个整数的channel只能存放整数

channel 声明 var 变量名 chan 类型

var test1 chan int var test2 chan string var tesr3 chan map[string]string var test4 chan stu var test5 chan *stu 

只是声明还不够,使用前还要make,分配内存空间:

package main import "fmt" func main() { var intChan chan int // 声明 intChan = make(chan int, 10) // 初始化,长度是10 intChan <- 10 // 存入管道 n := <- intChan // 取出 fmt.Println(n) } 

定义信号(空结构体) 有一些场景中,一些 goroutine 需要一直循环处理信息,直到收到 quit 信号。作为信号,只需要随便传点什么,并不关注具体的值。那么可以选择使用空结构体,像下面这样定义了2个信号:

msgCh := make(chan struct{}) quitCh := make(chan struct{}) // 传信号的方法 msgCh <- struct{}{} // 前面的 struct{} 是变量的类型,后面的 {} 则是做初始化传入空值生成实例 quitCh <- struct{}{} 

通过channel实现通讯

起一个goroutine往管道里存,再起一个goroutine从管道里把数据取出:

package main import ( "fmt" "time" ) func write(ch chan int) { var i int for { ch <- i i ++ time.Sleep(time.Millisecond) } } func read(ch chan int) { for { b := <- ch fmt.Println(b) } } func main() { intChan := make(chan int, 10) go write(intChan) go read(intChan) time.Sleep(time.Second * 5) } 

channel 的类型和阻塞

channel 分为不带缓存的 channel 和带缓存的 channel。 channel 一定要初始化后才能进行读写操作,否则会永久阻塞。这个不是这里要讲的重点,顺便带一下。

无缓存的channle 初始化make的时候不传入第二个参数设置容量就是:

ch := make(chan int) 

从无缓存的 channel 中读取消息会阻塞,直到有 goroutine 向该 channel 中发送消息;同理,向无缓存的 channel 中发送消息也会阻塞,直到有 goroutine 从 channel 中读取消息。

有缓存的 channel 有缓存的 channel 的声明方式为指定 make 函数的第二个参数,该参数为 channel 缓存的容量:

ch := make(chan int, 10) 

有缓存的 channel 类似一个阻塞队列(采用环形数组实现)。当缓存未满时,向 channel 中发送消息时不会阻塞,当缓存满时,发送操作将被阻塞,直到有其他 goroutine 从中读取消息; 相应的,当 channel 中消息不为空时,读取消息不会出现阻塞,当 channel 为空时,读取操作会造成阻塞,直到有 goroutine 向 channel 中写入消息。

缓冲区的大小 通过 len 函数可以获得 chan 中的元素个数,通过 cap 函数可以得到 channel 的缓冲区长度。

无缓存和缓冲区是1的差别 无缓存的 channel 的 len和cap 始终都是0。

通过无缓存的 channel 进行通信时,接收者收到数据 happens before 发送者 goroutine 唤醒

上面这句不好理解,不过可以先看下现象。 下面的这2行函数会报错,说是死锁。但是如果设置了 channel 的容量哪怕是1,就不会报错的:

func main() { ch := make(chan int) ch <- 1 } 

虽然容量1的channel也只能存1个数,但是无缓冲区的channel似乎1个数都存不了,除非马上能取走:

func main() { ch := make(chan int, 1) // 要起一个goroutine可以马上接收channel里的数据 go func () { fmt.Println(<- ch) }() ch <- 1 time.Sleep(time.Second) // 要给goroutine执行完成的时间 } 

小结:无缓存的channel需要有一个goroutine可以把channel里的数据马上取走。

channel之间的同步

在学习关闭channel之前,先看下下面的例子。由于没有关闭channel,是会有问题的,不过例子里都解决了。先看下不用关闭channel可以怎么搞,然后再接着看关闭channel的用法:

package main import ( "time" "fmt" ) func calc(taskChan chan int, resChan chan int) { for v := range taskChan { // 判断是不是素数 flag := true for i := 2; i < v; i++ { if v % i == 0 { flag = false break } } if flag { resChan <- v } } } func main() { intChan := make(chan int, 1000) // 这个也是个goroutine go func(){ for i := 2; i < 100000; i++ { intChan <- i } }() // 管道满了之后,这个匿名函数会阻塞,但是不影响程序继续往下走 resultChan := make(chan int, 1000) // 同时起8个goroutine for i := 0; i < 8; i++ { go calc(intChan, resultChan) } // 再起一个取结果的goroutine,不阻塞主线程 go func(){ for v := range resultChan{ fmt.Println("素数:", v) } }() // 给上面的匿名函数几秒钟来输出结果 time.Sleep(time.Second * 5) } 

上面的例子里用了2个匿名函数,也都是起的goroutine。如果是在主线程里直接for循环的话,那个for循环就会变成死锁,程序不会自己往下走。所以运行在goroutine里的死循环,在主线程退出后也就结束了,不会有问题。后一个匿名函数是对channel的进行遍历,channel取空后,会进入阻塞,如果是运行在主线程里的话也会形成死锁。 range 遍历 channel 也可以使用 range 取值,并且会一直从 channel 中读取数据,直到有 goroutine 对改 channel 执行 close 操作,循环才会结束。

关闭 channel

golang 提供了内置的 close 函数对 channel 进行关闭操作:

ch := make(chan int) close(ch) 

关于 channel 的关闭,有以下的特点:

  • 关闭一个未初始化(nil) 的 channel 会产生 panic
  • 重复关闭同一个 channel 会产生 panic
  • 向一个已关闭的 channel 中发送消息会产生 panic
  • 可以从已关闭的 channel 里继续读取消息,若消息均已读出,则会读到类型的零值。从一个已关闭的 channel 中读取消息不会阻塞,并且会返回一个为 false 的 ok-idiom,可以用它来判断 channel 是否关闭
  • 关闭 channel 会产生一个广播机制,所有向 channel 读取消息的 goroutine 都会收到消息

有2种方式可以把管道里的数据都取出来,但是都需要把管道关闭:

  • 判断管道已关闭并且取完了
  • 遍历管道

关闭channel然后读取的示例:

package main import "fmt" func main() { var ch chan int ch = make(chan int, 5) for i := 0; i < 5; i++ { ch <- i } close(ch) for { var b int b, ok := <- ch fmt.Println(b, ok) if ok == false { break } } } /* 执行结果 PS H:\Go\src\go_dev\day8\channel\close_chan> go run main.go 0 true 1 true 2 true 3 true 4 true 0 false PS H:\Go\src\go_dev\day8\channel\close_chan> */ 

上面输出的最后一条,就是channel已经空了,读出来的就是类型的0值,并且ok变false了。

遍历channel的示例:

package main import "fmt" func main() { var ch chan int ch = make(chan int) // 这个管道没有无缓存 // 这个goroutine一次存一个,再存会阻塞,直到主线程后面的for循环遍历的时候取走数据 // 存完100个数后,这里的for循环结束,会关闭管道。主线程后面的for循环的遍历就能正常结束了 go func () { for i := 0; i < 100; i++ { ch <- i } close(ch) }() for v := range ch { fmt.Println(v) } } 

判断子线程结束 学到这里,再也不需要用Sleep等待子线程结束了,可以通过管道实现。可以单独定义一个专门用来判断子线程结束的管道。子线程完成任务后,就传个值给管道,主线程就阻塞的读管道里的信息,一旦读到信息,就说明子线程完成了,可以继续执行或者退出了。如果起了多个子线程,则主线程就是用for循环多读几次,就能判断出有多少子线程已经结束了。

channel 只读、只写

声明只读的channel:

var ch <-chan int 

声明只写的channel:

var ch chan<- int 

应用场景,管道需要能够可读可写。但是可以限制它在某个函数里的功能,也就是在定义函数的参数的时候,把管道的类型设置为只读或只写。或者把管道传给结构体,结构体里限制管道的读写限制? 下面是之前的一个例子,仅仅只是把2个函数在设置参数类型的时候把管道的读写限制加上了:

package main import ( "fmt" "time" ) func write(ch chan<- int) { var i int for { ch <- i i ++ time.Sleep(time.Millisecond) } } func read(ch <-chan int) { for { b := <- ch fmt.Println(b) } } func main() { intChan := make(chan int, 10) go write(intChan) go read(intChan) time.Sleep(time.Second * 5) } 

配合 select 使用

select 用法类似IO多路复用,可以同时监听多个 channel 的消息状态,用法如下:

select { case <- ch1: ... case <- ch2: ... case ch3 <- 10; ... default: ... } 

select 可以同时监听多个 channel 的写入或读取:

  • 若只有一个 case 通过(不阻塞),则执行这个 case 块
  • 若有多个 case 通过,则随机挑选一个 case 执行
  • 若所有 case 均阻塞,则执行 default 块。若未定义 default 块,则 select 语句阻塞,直到有 case 被唤醒
  • 使用 break 会跳出 select 块
  • select 不会循环,就只会执行一个块然后就继续往后执行了

select只会执行一次 这个例子只会输出一次,随机是1或者是2,然后接结束了:

package main import "fmt" func main() { ch1 := make(chan int, 1) ch1 <- 1 ch2 := make(chan int, 1) ch2 <- 2 select { case v := <- ch1: fmt.Println(v) case v := <- ch2: fmt.Println(v) default: fmt.Println(0) } } 

所以如果要把管道里的数取完,或者取多次,就要再套一层for循环。

for循环和break的效果 在select外面用for套了一层死循环,这样就是反复的执行select。不过break在这里就没效果了:

package main import ( "fmt" "time" ) func main() { var ch1, ch2 chan int ch1 = make(chan int, 10) ch2 = make(chan int, 10) for i := 0; i < cap(ch1); i++ { ch1 <- i ch2 <- i * i } // LABEL1: for { select { case v := <- ch1: fmt.Println("ch1", v) case v := <- ch2: fmt.Println("ch2", v) default: fmt.Println("所有元素都已经取完") break // 这个break没有意义,因为值是跳出select,而不是for循环 // break LABEL1 // 这个break可以直接跳出for循环 } time.Sleep(time.Second) } } 

如果要跳出for循环,可以配合标签。上面的代码里已经写好了只是注释掉了。

定时器

定时器是在 time 包里的,

package main import ( "fmt" "time" ) func main() { t := time.NewTicker(time.Second) for v := range t.C { fmt.Println(v) } } 

上面调用的NewTicker()方法返回的是个结构体,如下:

type Ticker struct { C <-chan Time // The channel on which the ticks are delivered. // contains filtered or unexported fields } 

上面的例子里遍历了 t.C 就是一个channel。time包内部应该是会产生一个goroutine,每隔一段时间就传一个数据进去。

设置超时时间 还有一个After()方法,和上面的方法是一样的。不过这个方法直接返回管道,即 NewTimer(d).C 。而NewTimer()方法的管道在返回的结构体的属性C里。这个After()方法用起来更方便。结合select正好可以做成一个设置任务超时时间的功能:

package main import ( "fmt" "time" ) func task(ch chan struct{}) { time.Sleep(time.Second * 3) ch <- struct{}{} } func main() { ch := make(chan struct{}) // 定义好信号的管道,传递空结构体 go task(ch) // 启动一个任务 select { case <- ch: fmt.Println("任务执行结束") case <- time.After(time.Second * 2): // 2秒后超时 fmt.Println("任务超时") } } 

goroutine 中使用 recover

程序里起的gorountine中如果panic了,并且这个goroutine里面没有捕获错误的话,整个程序就会挂掉。 下面的程序会报错(Panic),是gorountine里的产生的错误:

package main func divideZero(ch chan int) { zero := 0 ch <- 1 / zero } func main() { ch := make(chan int) go divideZero(ch) <- ch } 

在gorountine中运行错误了,是可以不影响其他线程和主线程的继续执行的。所以,好的习惯是每当产生一个goroutine,就在开头用defer插入recover, 这样在出现panic的时候,就只是自己退出而不影响整个程序。下面是优化后的代码,加入了recover来捕获错误:

package main import "fmt" func divideZero(ch chan int) { defer func () { if err := recover(); err != nil { fmt.Println(err) // 要给管道传值,否则主线程从空管道里取值会阻塞,形成死锁 ch <- 0 } }() zero := 0 ch <- 1 / zero } func main() { ch := make(chan int) go divideZero(ch) <- ch } 

单元测试

测试用例的文件名必须以_test.go结尾,测试的函数也必须以Test开头。符合命名规则,使用 go test 命令的时候就能自动运行测试用例。 这篇的单元测试比较粗糙,不过基本怎么用,以及用法示例都简单记下来了。

被测试的函数

先准备一个需要被测试的函数:

package main import "fmt" func get_fullname(first, last string) (fullname string) { fullname = first + " " + last return } func main() { fullname := get_fullname("Barry", "Allen") fmt.Println(fullname) } 

上面的 get_fullname() 函数就是接下来要进行单元测试的函数。

测试用例

package main import "testing" func TestName(t *testing.T) { r := get_fullname("Sara", "Lance") expect := "Sara Lance" if r != expect { t.Fatalf("ERROR: get_fullname expect: %s actual: %s", expect, r) } t.Log("测试成功") } 

执行测试

写完测试用例,就可以执行测试了,使用命令 go test。输出如下:

PS H:\Go\src\go_dev\day8\unit_test\name> go test PASS ok go_dev/day8/unit_test/name 0.058s PS H:\Go\src\go_dev\day8\unit_test\name> 

看到PASS了,但是t.Log()并没有输出,要看到更多信息,要用带上-v参数。使用命令 go test -v ,输出如下:

PS H:\Go\src\go_dev\day8\unit_test\name> go test -v === RUN TestName --- PASS: TestName (0.00s) name_test.go:11: 测试成功 PASS ok go_dev/day8/unit_test/name 0.053s PS H:\Go\src\go_dev\day8\unit_test\name> 

直接用go test命令,只显示测试的结果。如果有多个测试用例,也只有一个结果。可以用-v参数看到详细的信息,每个测试用例的的结果都会打印出来。 如果某个测试失败了,就会直接退出,不会继续测试下去。