1. gorutine 1.1 创建 go 的原则 不要创建一个你不知道何时退出的 goroutine。(你一定要知道什么时候结束) 你能不能结束它?(要能控制他结束) 1.2 自己做比委托go 更放心 如果你的goroutine在从另一个goroutine获得结果之前无法取得进展,那么通常情况下,你自己去做这项工作比委托它(go func())更简单。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 func main () { go func () { if err := http.ListenAndServe(":8080" , mux); err != nil { log.Panicf("http server err: %+v" , err) return } }() select {} } func main () { if err := http.ListenAndServe(":8080" , mux); err != nil { log.Panicf("http server err: %+v" , err) return } }
1.3 让调用者决定是否go 请把是否并发的选择权交给你的调用者,而不是自己就直接悄悄的用上了 goroutine。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 func pprof () { go func () { } } func main () { pprof() select {} } func main () { go pprof() select {} }
1.4 确保创建 goroutine 的工作已经完成 1 2 3 4 5 6 7 8 9 10 11 12 13 14 type Reporter struct {} var reporter Reporterfunc (r Reporter) report(data string ) { time.Sleep(time.Second) fmt.Printf("report: %s\n" , data) } mux.HandleFunc("/ping" , func (w http.ResponseWriter, r *http.Request) { go reporter.report("ping pong" ) fmt.Println("ping" ) w.Write([]byte ("pong" )) })
异步上报服务后不再管理,非常不好,例如程序退出了,异步上报的逻辑可能没执行上。
我们实现类似 http 的 shutdown,等待所有的异步上报完成之后,我们再退出。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 type Reporter struct { worker int messages chan string wg sync.WaitGroup closed bool } func NewReporter (worker, buffer int ) *Reporter { return &Reporter{worker: worker, messages: make (chan string , buffer)} } func (r *Reporter) report(data string ) { if r.closed { return } r.messages <- data } func (r *Reporter) shutdown() { r.closed = true close (r.messages) } func (r *Reporter) run(stop <-chan struct {}) { go func () { <-stop r.shutdown() }() for i := 0 ; i < r.worker; i++ { r.wg.Add(1 ) go func () { for msg := range r.messages { time.Sleep(5 * time.Second) fmt.Printf("report: %s\n" , msg) } r.wg.Done() }() } r.wg.Wait() }
然后在 main 函数中我们加上
1 2 3 go func () { reporter.run(stop) }()
2. data race 一个有趣的数据竞争例子。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 package mainimport "fmt" type IceCreamMaker interface { Hello() } type Ben struct { name string } func (b *Ben) Hello() { fmt.Printf("Ben says, \"Hello my name is %s\"\n" , b.name) } type Jerry struct { name string } func (j *Jerry) Hello() { fmt.Printf("Jerry says, \"Hello my name is %s\"\n" , j.name) } func main () { var ben = &Ben{name: "Ben" } var jerry = &Jerry{"Jerry" } var maker IceCreamMaker = ben var loop0, loop1 func () loop0 = func () { maker = ben go loop1() } loop1 = func () { maker = jerry go loop0() } go loop0() for { maker.Hello() } }
这个例子有趣的点在于,最后输出的结果会有这种例子
1 2 Ben says, "Hello my name is Jerry" Ben says, "Hello my name is Jerry"
interface 在 go 中其实是一个结构体,它包含了 type 和 data 两个部分,所以它的复制也不是原子的,会出现问题。
1 2 3 4 type interface struct { Type uintptr Data uintptr }
这个案例有趣的点还在于,这个案例的两个结构体的内存布局一模一样所以出现错误也不会 panic 退出。
如果在另外一个结构再加入一个 string 的字段,去读取就会导致 panic。因为两个类型不一样了 ,会发生panic。
3. mutex 3.1 互斥锁的实现方式 Barging(桥接):当锁被释放时,它会唤醒第一个等待者,然后把锁给第一个等待者或者给第一个请求锁的人。 Handoff(切换):当锁释放时候,锁会一直持有直到第一个等待者准备好获取锁。它降低了吞吐量,但可以解决公平性的问题。 Spining(自旋):自旋在等待队列为空,减少上下文切换,但是用占用CPU。 3.2 Go Mutex 实现原理 Go1.8 使用了Barging 和 Spining 的结合实现。
当试图获取已经被持有的锁时,如果本地队列为空并且P的数量大于1,goroutine将自旋几次(用一个P旋转会阻塞程序)。自旋后,goroutine park。在程序高频使用锁的情况下, 它充当了一个快速路径。
Go1.9 添加一个新的饥饿模式,该模式将会在释放时候触发 handsoff。
所有等待锁超过一毫秒的goroutine(也称为有界等待)将被诊断为饥饿。
当被标记为饥饿状态时,unlock方法会handsoff 把锁直接扔给第一个等待者。在饥饿模式下,自旋也被停用,因为传入的 goroutines将没有机会获取为下一个等待者保留的锁。
3.3 源码分析 1 2 3 4 type Mutex struct { state int32 sema uint32 }
当我们调用 Lock 方法的时候,会先尝试走 Fast Path,也就是如果当前互斥锁如果处于未加锁的状态,尝试加锁,只要加锁成功就直接返回,否则的话就进入 slow path。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 func (m *Mutex) Lock() { if atomic.CompareAndSwapInt32(&m.state, 0 , mutexLocked) { return } m.lockSlow() } func (m *Mutex) lockSlow() { var waitStartTime int64 starving := false awoke := false iter := 0 old := m.state for { if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) { if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 && atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) { awoke = true } runtime_doSpin() iter++ old = m.state continue }
在 lockSlow
方法中我们可以看到,有一个大的 for 循环,不断的尝试去获取互斥锁,在循环的内部,第一步就是判断能否自旋状态。
当前互斥锁的状态是非饥饿状态,并且已经被锁定了 自旋次数不超过 4 次 cpu 个数大于一,必须要是多核 cpu 当前正在执行当中,并且队列空闲的 p 的个数大于等于一 如果可以进入自旋状态之后就会调用 runtime_doSpin
方法进入自旋。
自旋结束之后就会去计算当前互斥锁的状态,如果当前处在饥饿模式下则不会去请求锁,而是会将当前 goroutine 放到队列的末端。
4. errgroup 是对sync.WaitGroup的一个封装,有一个协程发生错误,就会调用cancel函数。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 type Group struct { cancel func () wg sync.WaitGroup errOnce sync.Once err error } func (g *Group) Go(f func () error ) { g.wg.Add(1 ) go func () { defer g.wg.Done() if err := f(); err != nil { g.errOnce.Do(func () { g.err = err if g.cancel != nil { g.cancel() } }) } }() } func (g *Group) Wait() error { g.wg.Wait() if g.cancel != nil { g.cancel() } return g.err }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 package mainimport ( "fmt" "log" "net/http" "golang.org/x/sync/errgroup" ) func main () { eg := errgroup.Group{} eg.Go(func () error { return getPage("https://www.xx.xx" ) }) eg.Go(func () error { return getPage("https://google.com" ) }) if err := eg.Wait(); err != nil { log.Fatalf("get error: %v" , err) } } func getPage (url string ) error { resp, err := http.Get(url) if err != nil { return err } if resp.StatusCode != http.StatusOK { return fmt.Errorf("fail to get page: %s, wrong statusCode: %d" , url, resp.StatusCode) } log.Printf("success get page %s" , url) return nil }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 package main import ( "context" "fmt" "log" "net/http" "os" "os/signal" "syscall" "time" "golang.org/x/sync/errgroup" ) func main () { g, ctx := errgroup.WithContext(context.Background()) mux := http.NewServeMux() mux.HandleFunc("/ping" , func (w http.ResponseWriter, r *http.Request) { w.Write([]byte ("pong" )) }) serverOut := make (chan struct {}) mux.HandleFunc("/shutdown" , func (w http.ResponseWriter, r *http.Request) { serverOut <- struct {}{} }) server := http.Server{ Handler: mux, Addr: ":8080" , } g.Go(func () error { return server.ListenAndServe() }) g.Go(func () error { select { case <-ctx.Done(): log.Println("errgroup exit..." ) case <-serverOut: log.Println("server will out..." ) } timeoutCtx, cancel := context.WithTimeout(context.Background(), 3 *time.Second) defer cancel() log.Println("shutting down server..." ) return server.Shutdown(timeoutCtx) }) g.Go(func () error { quit := make (chan os.Signal, 0 ) signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM) select { case <-ctx.Done(): return ctx.Err() case sig := <-quit: return fmt.Errorf("get os signal: %v" , sig) } }) fmt.Printf("errgroup exiting: %+v\n" , g.Wait()) }
测试:
1 2 3 4 5 6 7 8 9 10 2024/06/27 22:32:04 server will out... 2024/06/27 22:32:04 shutting down server... errgroup exiting: http: Server closed 2024/06/27 22:33:01 errgroup exit ... 2024/06/27 22:33:01 shutting down server... errgroup exiting: get os signal: interrupt
5. 参考资料