第八章 Goroutines 和Channels
Goroutines
- 在Go语言中,每一个并发的执行单元叫做一个goroutine。
- 使用关键字go创建新的goroutine
- 下面是一个求斐波那契数列的程序,
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35package main
import (
"fmt"
"time"
)
func main() {
go spinner(100 * time.Millisecond)
const n = 45
fibN := fib(n)
fmt.Printf("\rFibonacci(%d) = %d\n", n, fibN)
}
func spinner(delay time.Duration) {
/*
time.Duration
持续时间(Duration)表示两个瞬间之间经过的时间
作为int64纳秒计数。该表示限制了
最大可代表的持续时间约为290年。
*/
for {
for _, r := range `-\|/` {
fmt.Printf("\r%c", r)
time.Sleep(delay)
}
}
}
func fib(x int) int {
if x < 2 {
return x
}
return fib(x-1) + fib(x-2)
} - 值得注意的是, 当主程序返回或者直接终止时, 所有goroutine 都会终止
并发的clock服务
首先是顺序的始终服务
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33package main
import (
"io"
"log"
"net"
"time"
)
func handleConn(c net.Conn) {
defer c.Close() // 延迟调用
for {
_, err := io.WriteString(c, time.Now().Format("15:04:05\n"))
if err != nil {
return
}
time.Sleep(1 * time.Second)
}
}
func main() {
listener, err := net.Listen("tcp", "localhost:8000")
if err != nil {
log.Fatal(err)
}
for {
conn, err := listener.Accept()
if err != nil {
log.Print(err) // e.g., connection aborted
continue
}
go handleConn(conn)
}
}下面是并发的
1
2
3
4
5
6
7
8for {
conn, err := listener.Accept()
if err != nil {
log.Print(err) // e.g., connection aborted
continue
}
handleConn(conn)
}
练习8.1
- 修改clock2来支持传入参数作为端口号,然后写一个clockwall的程序,这个程序可以同时与多个clock服务器通信,从多个服务器中读取时间,并且在一个表格中一次显示所有服务器传回的结果,类似于你在某些办公室里看到的时钟墙。如果你有地理学上分布式的服务器可以用的话,让这些服务器跑在不同的机器上面;或者在同一台机器上跑多个不同的实例,这些实例监听不同的端口,假装自己在不同的时区。
1
练习8.2
- 实现一个并发FTP服务器。服务器应该解析客户端发来的一些命令,比如cd命令来切换目录,ls来列出目录内文件,get和send来传输文件,close来关闭连接。你可以用标准的ftp命令来作为客户端,或者也可以自己实现一个。
并发的echo服务
- 前面的clock服务器在每一个连接都会运行一个goroutine, 而在本节中我们在每个连接中运行多个goroutine
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40package main
import (
"bufio"
"fmt"
"log"
"net"
"strings"
"time"
)
func echo(c net.Conn, shout string, delay time.Duration) {
fmt.Fprintln(c, "\t", strings.ToUpper(shout))
time.Sleep(delay)
fmt.Fprintln(c, "\t", shout)
time.Sleep(delay)
fmt.Fprintln(c, "\t", strings.ToLower(shout))
}
func handleConn(c net.Conn) {
input := bufio.NewScanner(c)
for input.Scan() {
echo(c, input.Text(), 1*time.Second)
}
c.Close()
}
func main() {
listener, err := net.Listen("tcp", "localhost:8000")
if err != nil {
log.Fatal(err)
}
for {
conn, err := listener.Accept()
if err != nil {
log.Print(err) // e.g., connection aborted
continue
}
go handleConn(conn)
}
}
1 | \\并发的netcat |
- 也可以同时并行处理echo
1 | package main |
Channels
- Channel是一个通信机制,允许goroutine之间传递数据。
- 使用make 函数创建一个channel
- channel 可以发送和接收, 其中从channel 获取的时候可以不获取接收结果。
- channel 可以关闭, 关闭后还可以从其接收,但是不可以发送。
- Channel 分为带缓存的和不带缓存的,
不带缓存的Channels
- 发送时会使得发送者的goroutine阻塞, 直到另一个goroutine 在相同的Channel上进行接收操作。
- 反之,如果一个goroutine先进行接收操作, 照样会堵塞
- 当值传送成功之后, 两个goroutine 裁可以进行后面的语句。
1 | package main |
串联的Channels (pipeline)
- Channel可以传递任意类型的值, 以下是个示例, 由三个goroutine 使用两个channel 串联起来, 第一个是计数器,第二个是平方, 第三个是输出
1 | package main |
- 我们加上标记后, 可以看到输出的顺序。
- 试图关闭一个关闭的通道会宕机
- 在通知接收方goroutine所有数据都发送完毕的时候可以关闭通道, 关闭通道并不是必须的
1 | package main |
单向通道
- 可以定义只输入或者输出的管道, 作用是避免误用
1 | package main |
缓冲通道
- 缓冲通道内部是一个队列, 知道这个就好
- 有一种Bug,叫做goroutines泄露, 这是因为goroutines没有被关闭, 通道一直被堵塞没有被关闭, 导致goroutines一直运行, 直到程序退出
并行循环
- 由一些完全独立的子问题组成的问题称为高度并行, 高度并行的问题最容易实现并行。
- 并行循环常见例子, 对于循环中每个运算, 都可以开一个goroutine运行,这样做经常会出现一个错误,就是循环结束后程序直接就结束了, 我们的goroutine当然也就没有执行完成。
- 解决方法是设置一个共享通道, 读出循环个数次(当其中由错误中断的时候可能有问题)
- 书上又给出了一个例子, 使用通道获取出现的第一个错误并返回, 而这样也会堵塞引起goroutine泄露的错误。
- 解决方法给出了两种, 一种是开足够大的缓冲通道, 另一种是返回时建立一个goroutine来读完通道,
并发的Web爬虫
- 这里再提一下, 对于匿名函数一般两种方法传参, 一种是后面括号直接跟着, 另一种是赋值给函数变量。然后再调用
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
package main
import (
"fmt"
"os"
"gopl.io/ch5/links"
)
func crawl(url string) []string {
fmt.Println(url)
list, err := links.Extract(url)
if err != nil {
fmt.Println(err)
}
return list
}
func main() {
worklist := make(chan []string)
go func(){
worklist <- os.Args[1:]
}()
seen := make(map[string]bool)
for list := range worklist {
for _, link := range list {
if !seen[link] {
seen[link] = true
go func(link string) {
worklist <- crawl(link)
}(link)
}
}
}
} - 该程序会不断爬取相关网页, 书上说会执行若干秒后出现错误,1是出现某网页解析错误, 二是连接数目过多性能不够。 这里两个我都没有遇到, 三是该程序永远不会结束, 这是因为worklist没有关闭, 循环会一直堵塞, 直到程序退出。
- 第一个改进是减少并行goroutine的数量, 采用空闲槽获取令牌来限制并发数量
- 对于第三个的改进是通过计数器来判断worklist是否结束
1 | package main |
- 另一种限制并发的思路是使用常驻的crawler goroutine
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43package main
import (
"fmt"
"os"
"gopl.io/ch5/links"
)
func crawl(url string) []string {
fmt.Println(url)
list, err := links.Extract(url)
if err != nil {
fmt.Println(err)
}
return list
}
func main() {
worklist := make(chan []string)
unseenLinks := make(chan string)
go func(){
worklist <- os.Args[1:]
}()
for i := 0; i < 20; i++ {
go func() {
for link := range unseenLinks { // 公用通道, for range 可以在通道被关闭的时候正常退出,
foundLinks := crawl(link)
go func() {worklist <- foundLinks}()
}
}()
}
seen := make(map[string]bool)
for list := range worklist {
for _, link := range list {
if !seen[link] {
seen[link] = true
unseenLinks <- link //
}
}
}
}
select 多路复用
- 一个简单的倒计时发射程序如下
1 | package main |
- 现在我们想在倒计时结束前, 可以允许按下回车键来取消发射
- 朴素来想, 我们现在西药新建一个goroutine 来监听stdin并新设置一个channel 传递信息, 然后在循环中同时监听tick 和新通道
- 这时候可以用select来解决,
- select 会等待case中有能够执行的case 时再去执行, 这时候其他通信不会进行
- 多个程序同时满足时,select会随机选择一个。
- 使用select实现我们上面要求的功能
1 | package main |
示例: 并发的目录遍历
朴素来说是这样
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53package main
import (
"flag"
"fmt"
"io/ioutil"
"os"
"path/filepath"
)
func walkDir(dir string, fileSize chan<- int64) {
for _, entry := range dirents(dir) {
if entry.IsDir() {
subdir := filepath.Join(dir, entry.Name())
walkDir(subdir, fileSize)
} else {
fileSize <- entry.Size() // 增加
}
}
}
func dirents(dir string) []os.FileInfo {
entries, err := ioutil.ReadDir(dir)
if err != nil {
fmt.Fprintf(os.Stderr, "du1: %v\n", err)
}
return entries
}
func main() {
flag.Parse()
roots := flag.Args()
if len(roots) == 0{
roots = []string{"."}
}
fileSizes := make(chan int64)
go func(){
for _, root := range roots {
walkDir(root, fileSizes)
}
close(fileSizes)
}()
var nfiles, nbytes int64
for size := range fileSizes {
nfiles++
nbytes += size
}
printDiskUsage(nfiles, nbytes)
}
func printDiskUsage(nfiles, nbytes int64) {
fmt.Printf("%d files %.1f GB\n", nfiles, float64(nbytes) / 1e9)
}会经历很久的运行时间,然后输出信息, 我们希望中途输出更多相关信息, 但不希望会直接输出一大坨
使用上一章的select 进行改造
1 |
|
- 然后对遍历操作进行并发改造
- 速度明显变快了
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78package main
import (
"flag"
"fmt"
"io/ioutil"
"os"
"path/filepath"
"sync"
"time"
)
func walkDir(dir string, n *sync.WaitGroup, fileSize chan<- int64) {
defer n.Done() // 正好每个都能减到
for _, entry := range dirents(dir) {
if entry.IsDir() {
n.Add(1)
subdir := filepath.Join(dir, entry.Name())
go walkDir(subdir, n, fileSize)
} else {
fileSize <- entry.Size() // 传输到主逻辑增加。
}
}
}
func dirents(dir string) []os.FileInfo {
entries, err := ioutil.ReadDir(dir)
if err != nil {
fmt.Fprintf(os.Stderr, "du1: %v\n", err)
}
return entries
}
var verbose = flag.Bool("v", false, "show progress")
func main() {
flag.Parse()
roots := flag.Args()
if len(roots) == 0{
roots = []string{"."}
}
fileSizes := make(chan int64)
var n sync.WaitGroup
for _, root := range roots {
n.Add(1) // 组的数目
go walkDir(root, &n, fileSizes)
}
go func(){
n.Wait()
close(fileSizes)
}()
var tick <-chan time.Time
if *verbose {
tick = time.Tick(10 * time.Millisecond)
}
var nfiles, nbytes int64
loop:
for {
select {
case size, ok := <-fileSizes:
if !ok {
break loop //该处为关闭后返回的信息。loop 可以 goto 和 break 棘啊
}
nfiles++
nbytes += size
case <-tick:
printDiskUsage(nfiles, nbytes)
}
}
printDiskUsage(nfiles, nbytes)
}
func printDiskUsage(nfiles, nbytes int64) {
fmt.Printf("%d files %.1f GB\n", nfiles, float64(nbytes) / 1e9)
}
并发的退出
- 使用通道来广播退出信息, 方法是平常不往里面传值, 终止时把通道关闭, 就能从通道获取零值 获取已退出信息
- 要注意后台goroutine 针对信息快速终止
示例 聊天服务
1 | package main |