0%

Go微服务实战04-golang的并发编程

1. gorutine

1.1 创建 go 的原则

  1. 不要创建一个你不知道何时退出的 goroutine。(你一定要知道什么时候结束)
  2. 你能不能结束它?(要能控制他结束)

1.2 自己做比委托go 更放心

如果你的goroutine在从另一个goroutine获得结果之前无法取得进展,那么通常情况下,你自己去做这项工作比委托它(go func())更简单。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// 正常
func main() {
go func() {
if err := http.ListenAndServe(":8080", mux); err != nil {
log.Panicf("http server err: %+v", err)
return
}
}()
select{}
}

// 修改后
func main() {
if err := http.ListenAndServe(":8080", mux); err != nil {
log.Panicf("http server err: %+v", err)
return
}
}

1.3 让调用者决定是否go

请把是否并发的选择权交给你的调用者,而不是自己就直接悄悄的用上了 goroutine。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// 正常
func pprof(){
go func(){
// ....
}
}
func main() {
pprof()
select {}
}


// 修改后
func main() {
go pprof()
select {}
}

1.4 确保创建 goroutine 的工作已经完成

1
2
3
4
5
6
7
8
9
10
11
12
13
14
type Reporter struct {
}
var reporter Reporter

func (r Reporter) report(data string) {
time.Sleep(time.Second)
fmt.Printf("report: %s\n", data)
}

mux.HandleFunc("/ping", func(w http.ResponseWriter, r *http.Request) {
go reporter.report("ping pong") // 在请求中异步调用
fmt.Println("ping")
w.Write([]byte("pong"))
})

异步上报服务后不再管理,非常不好,例如程序退出了,异步上报的逻辑可能没执行上。

我们实现类似 http 的 shutdown,等待所有的异步上报完成之后,我们再退出。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
type Reporter struct {
worker int
messages chan string
wg sync.WaitGroup
closed bool
}

func NewReporter(worker, buffer int) *Reporter {
return &Reporter{worker: worker, messages: make(chan string, buffer)}
}

// 生产者
func (r *Reporter) report(data string) {
if r.closed {
return
}
r.messages <- data
}

// 关闭操作
func (r *Reporter) shutdown() {
r.closed = true
close(r.messages) // 注意,这个一定要在主服务结束之后再执行,避免关闭 channel 还有其他地方在啊写入
}

// 消费者
func (r *Reporter) run(stop <-chan struct{}) {
go func() {
<-stop // 接受stop信号,然后shutdown
r.shutdown() // 关闭channel,让生产者和消费者都停下来
}()

for i := 0; i < r.worker; i++ {
r.wg.Add(1)
go func() {
for msg := range r.messages {
time.Sleep(5 * time.Second)
fmt.Printf("report: %s\n", msg)
}
r.wg.Done()
}()
}
r.wg.Wait()
}

然后在 main 函数中我们加上

1
2
3
go func() {
reporter.run(stop)
}()

2. data race

一个有趣的数据竞争例子。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
package main

import "fmt"

type IceCreamMaker interface {
// Great a customer.
Hello()
}

type Ben struct {
name string
}

func (b *Ben) Hello() {
fmt.Printf("Ben says, \"Hello my name is %s\"\n", b.name)
}

type Jerry struct {
name string
}

func (j *Jerry) Hello() {
fmt.Printf("Jerry says, \"Hello my name is %s\"\n", j.name)
}

func main() {
var ben = &Ben{name: "Ben"}
var jerry = &Jerry{"Jerry"}
var maker IceCreamMaker = ben

var loop0, loop1 func()

loop0 = func() {
maker = ben
go loop1()
}

loop1 = func() {
maker = jerry
go loop0()
}

go loop0()

for {
maker.Hello()
}
}

这个例子有趣的点在于,最后输出的结果会有这种例子

1
2
Ben says, "Hello my name is Jerry"
Ben says, "Hello my name is Jerry"

interface 在 go 中其实是一个结构体,它包含了 type 和 data 两个部分,所以它的复制也不是原子的,会出现问题。

1
2
3
4
type interface struct {
Type uintptr // points to the type of the interface implementation
Data uintptr // holds the data for the interface's receiver
}

这个案例有趣的点还在于,这个案例的两个结构体的内存布局一模一样所以出现错误也不会 panic 退出。

如果在另外一个结构再加入一个 string 的字段,去读取就会导致 panic。因为两个类型不一样了 ,会发生panic。

3. mutex

3.1 互斥锁的实现方式

  • Barging(桥接):当锁被释放时,它会唤醒第一个等待者,然后把锁给第一个等待者或者给第一个请求锁的人。
  • Handoff(切换):当锁释放时候,锁会一直持有直到第一个等待者准备好获取锁。它降低了吞吐量,但可以解决公平性的问题。
  • Spining(自旋):自旋在等待队列为空,减少上下文切换,但是用占用CPU。

3.2 Go Mutex 实现原理

  1. Go1.8 使用了Barging 和 Spining 的结合实现。

    当试图获取已经被持有的锁时,如果本地队列为空并且P的数量大于1,goroutine将自旋几次(用一个P旋转会阻塞程序)。自旋后,goroutine park。在程序高频使用锁的情况下, 它充当了一个快速路径。

  2. Go1.9 添加一个新的饥饿模式,该模式将会在释放时候触发 handsoff。

    所有等待锁超过一毫秒的goroutine(也称为有界等待)将被诊断为饥饿。

    当被标记为饥饿状态时,unlock方法会handsoff 把锁直接扔给第一个等待者。在饥饿模式下,自旋也被停用,因为传入的 goroutines将没有机会获取为下一个等待者保留的锁。

3.3 源码分析

1
2
3
4
type Mutex struct {
state int32 // state 字段的最低三位表示三种状态,分别是 锁定,唤醒,饥饿
sema uint32
}

当我们调用 Lock 方法的时候,会先尝试走 Fast Path,也就是如果当前互斥锁如果处于未加锁的状态,尝试加锁,只要加锁成功就直接返回,否则的话就进入 slow path。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
func (m *Mutex) Lock() {
// Fast path: grab unlocked mutex.
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
return
}
// Slow path (outlined so that the fast path can be inlined)
m.lockSlow()
}

func (m *Mutex) lockSlow() {
var waitStartTime int64 // 等待时间
starving := false // 是否处于饥饿状态
awoke := false // 是否处于唤醒状态
iter := 0 // 自旋迭代次数
old := m.state
for {
// Don't spin in starvation mode, ownership is handed off to waiters
// so we won't be able to acquire the mutex anyway.
if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
// Active spinning makes sense.
// Try to set mutexWoken flag to inform Unlock
// to not wake other blocked goroutines.
if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
awoke = true
}
runtime_doSpin()
iter++
old = m.state
continue
}

lockSlow 方法中我们可以看到,有一个大的 for 循环,不断的尝试去获取互斥锁,在循环的内部,第一步就是判断能否自旋状态。

  • 当前互斥锁的状态是非饥饿状态,并且已经被锁定了
  • 自旋次数不超过 4 次
  • cpu 个数大于一,必须要是多核 cpu
  • 当前正在执行当中,并且队列空闲的 p 的个数大于等于一

如果可以进入自旋状态之后就会调用 runtime_doSpin 方法进入自旋。

自旋结束之后就会去计算当前互斥锁的状态,如果当前处在饥饿模式下则不会去请求锁,而是会将当前 goroutine 放到队列的末端。

4. errgroup

是对sync.WaitGroup的一个封装,有一个协程发生错误,就会调用cancel函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
type Group struct {
cancel func() // 这个存的是context的cancel方法

wg sync.WaitGroup // 封装sync.WaitGroup

errOnce sync.Once // 保证只接受一次错误
err error // 保存第一个返回的错误
}


func (g *Group) Go(f func() error) {
g.wg.Add(1)
go func() {
defer g.wg.Done()
if err := f(); err != nil {
g.errOnce.Do(func() {
g.err = err
if g.cancel != nil {
g.cancel()
}
})
}
}()
}

func (g *Group) Wait() error {
g.wg.Wait()
if g.cancel != nil {
g.cancel()
}
return g.err
}
  • 示例1
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
package main

import (
"fmt"
"log"
"net/http"

"golang.org/x/sync/errgroup"
)

func main() {
eg := errgroup.Group{}
eg.Go(func() error {
return getPage("https://www.xx.xx")
})
eg.Go(func() error {
return getPage("https://google.com")
})
if err := eg.Wait(); err != nil {
log.Fatalf("get error: %v", err)
}
}

func getPage(url string) error {
resp, err := http.Get(url)
if err != nil {
return err
}
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("fail to get page: %s, wrong statusCode: %d", url, resp.StatusCode)
}
log.Printf("success get page %s", url)
return nil
}

/*
2024/06/27 22:24:46 success get page https://google.com
2024/06/27 22:24:46 get error: Get "https://www.xx.xx": EOF
exit status 1
*/
  • 示例2
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
package main

import (
"context"
"fmt"
"log"
"net/http"
"os"
"os/signal"
"syscall"
"time"

"golang.org/x/sync/errgroup"
)

func main() {
g, ctx := errgroup.WithContext(context.Background())

mux := http.NewServeMux()
mux.HandleFunc("/ping", func(w http.ResponseWriter, r *http.Request) {
w.Write([]byte("pong"))
})

// 模拟单个服务错误退出
serverOut := make(chan struct{})
mux.HandleFunc("/shutdown", func(w http.ResponseWriter, r *http.Request) {
serverOut <- struct{}{}
})

server := http.Server{
Handler: mux,
Addr: ":8080",
}

// g1
// g1 退出了所有的协程都能退出么?
// g1 退出后, context 将不再阻塞,g2, g3 都会随之退出
// 然后 main 函数中的 g.Wait() 退出,所有协程都会退出
g.Go(func() error {
return server.ListenAndServe()
})

// g2
// g2 退出了所有的协程都能退出么?
// g2 退出时,调用了 shutdown,g1 会退出
// g2 退出后, context 将不再阻塞,g3 会随之退出
// 然后 main 函数中的 g.Wait() 退出,所有协程都会退出
g.Go(func() error {
select {
case <-ctx.Done():
log.Println("errgroup exit...")
case <-serverOut:
log.Println("server will out...")
}

timeoutCtx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
// 这里不是必须的,但是如果使用 _ 的话静态扫描工具会报错,加上也无伤大雅
defer cancel()

log.Println("shutting down server...")
return server.Shutdown(timeoutCtx)
})

// g3
// g3 捕获到 os 退出信号将会退出
// g3 退出了所有的协程都能退出么?
// g3 退出后, context 将不再阻塞,g2 会随之退出
// g2 退出时,调用了 shutdown,g1 会退出
// 然后 main 函数中的 g.Wait() 退出,所有协程都会退出
g.Go(func() error {
quit := make(chan os.Signal, 0)
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)

select {
case <-ctx.Done():
return ctx.Err()
case sig := <-quit:
return fmt.Errorf("get os signal: %v", sig)
}
})

fmt.Printf("errgroup exiting: %+v\n", g.Wait())
}

测试:

1
2
3
4
5
6
7
8
9
10
# 1. curl http://127.0.0.1:8080/shutdown
2024/06/27 22:32:04 server will out...
2024/06/27 22:32:04 shutting down server...
errgroup exiting: http: Server closed


# 2. ctrl + c
2024/06/27 22:33:01 errgroup exit...
2024/06/27 22:33:01 shutting down server...
errgroup exiting: get os signal: interrupt

5. 参考资料

可以加首页作者微信,咨询相关问题!