0%

golang服务的优雅关闭和重启

后端服务程序在配置更新,程序修改后发布的过程中存在一些未处理完成的请求,和当前服务中为落地的资源(缓存、记录、日志等数据),为了减少这种情况带来的数据异常,需要有一种机制,在服务收到重启或者关闭信号的同时进行一些数据收尾处理。

对于优雅关闭:

  • 先标记为不接收和向下游发送新请求,新请求过来时直接报错,让客户端重试其它机器。
  • 程序中是否还有关键在运行(请求过来触发的逻辑、自身循环逻辑、定时任务逻辑等),如果有,等待上游请求触发的逻辑执行完成,如果超时,强制取消。对于自身内部的一些逻辑,通过上下文发送取消动作,如果超时,强制执行关闭。

对于优雅重启:

  • 不关闭现有连接(正在运行中的程序)
  • 新的进程启动并替代旧进程
  • 新的进程接管新的连接
  • 连接要随时响应用户的请求,当用户仍在请求旧进程时要保持连接,新用户应请求新进程,不可以出现拒绝请求的情况

1. 优雅关闭

1.1 代码

参考:https://github.com/gin-gonic/examples/tree/master/graceful-shutdown

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
package main

import (
"context"
"log"
"net/http"
"os"
"os/signal"
"syscall"
"time"

"github.com/gin-gonic/gin"
)

// 模拟慢请求
func sleep(ctx *gin.Context) {
t := ctx.Query("t")
s, err := strconv.Atoi(t)
if err != nil {
ctx.JSON(http.StatusBadRequest, gin.H{"msg": "参数错误: " + t})
return
}

time.Sleep(time.Duration(s) * time.Second)
ctx.JSON(http.StatusOK, gin.H{"msg": fmt.Sprintf("sleep %d s", s)})
}


const (
stateHealth = "health"
stateUnHealth = "unhealth"
)

var state = stateHealth

func health(ctx *gin.Context) {
status := http.StatusOK
if state == stateUnHealth {
status = http.StatusServiceUnavailable
}
ctx.JSON(status, gin.H{"data": state})
}


func main() {
e := gin.Default()
e.GET("/health", health)
e.GET("/sleep", sleep)

server := &http.Server{
Addr: ":8080",
Handler: e,
}

go func() {
if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
log.Fatalf("server run err: %+v", err)
}
}()

// 用于捕获退出信号
quit := make(chan os.Signal)
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
<-quit
log.Println("Shutting down server...")

// 捕获到退出信号之后将健康检查状态设置为 unhealth
state = stateUnHealth
log.Println("Shutting down state: ", state)

// 设置超时时间,两个心跳周期,假设一次心跳 3s
ctx, cancel := context.WithTimeout(context.Background(), 6*time.Second)
defer cancel()

// Shutdown 接口,如果没有新的连接了就会释放,传入超时 context
// 调用这个接口会关闭服务,但是不会中断活动连接
// 首先会将端口监听移除,然后会关闭所有的空闲连接,然后等待活动的连接变为空闲后关闭
// 如果等待时间超过了传入的 context 的超时时间,就会强制退出
// 调用这个接口 server 监听端口会返回 ErrServerClosed 错误
// 注意,这个接口不会关闭和等待websocket这种被劫持的链接,如果做一些处理。可以使用 RegisterOnShutdown 注册一些清理的方法
if err := server.Shutdown(ctx); err != nil {
log.Fatal("Server forced to shutdown:", err)
}

log.Println("Server exiting")
}

Shutdown 方法

go 在1.8后增加了http server 的shutdown方法,内部其实会调用注册的shutdown函数钩子。

1
2
3
4
5
6
7
8
9
10
func (srv *Server) Shutdown(ctx context.Context) error {
srv.inShutdown.Store(true)

srv.mu.Lock()
lnerr := srv.closeListenersLocked()
for _, f := range srv.onShutdown {
go f()
}
srv.mu.Unlock()
}

1.2 测试

1
2
3
4
5
6
7
# 1. 启动服务
go run main.go

# 2. 另外一个终端请求
curl http://localhost:8080/sleep\?t\=5

# 3. 在这个过程中,发送退出信号,不会立即退出

1.3 父ctx 传递并且cancel

对http server的逻辑可以用 Shutdown 处理。如果不是http server正在运行的逻辑,比如说自己的一些goroutine,通过父ctx 传递。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
package main

import (
"context"
"fmt"
"math/rand"
"os"
"os/signal"
"sync"
"syscall"
"time"
)

const (
TimeTemplate = "15:04:05.999999999"
)

type Service interface {
GetName() string
Serve(ctx context.Context)
Shutdown() error
}

type BusinessService struct {
}

func (b *BusinessService) GetName() string {
return "BusinessService"
}

func (b *BusinessService) Serve(ctx context.Context) {
for {
fmt.Printf("BusinessService serve run at %s\n", time.Now().Format(TimeTemplate))
select {
case <-ctx.Done():
fmt.Printf("111111111111cancel %s\n", time.Now().Format(TimeTemplate))
return
default:
}
time.Sleep(time.Second)
}
return
}

func (b *BusinessService) Shutdown() error {
fmt.Printf("BusinessService shutdown begin... at %s\n", time.Now().Format(TimeTemplate))
defer func() {
fmt.Printf("BusinessService shutdown end... at %s\n", time.Now().Format(TimeTemplate))
}()
return nil
}

type LogService struct {
buffer []string
}

func (l *LogService) Serve(ctx context.Context) {
for {
select {
case <-ctx.Done():
fmt.Printf("2222222222cancel %s\n", time.Now().Format(TimeTemplate))
return
default:
// 0.5s append one log
time.Sleep(500 * time.Millisecond)
l.buffer = append(l.buffer, fmt.Sprintf("Time: %d", time.Now().Unix()))
}
}
}

func (b *LogService) GetName() string {
return "LogService"
}

func (l *LogService) Shutdown() (err error) {
fmt.Printf("LogService shutdown begin... at %s\n", time.Now().Format(TimeTemplate))
defer fmt.Printf("LogService shutdown end... at %s\n", time.Now().Format(TimeTemplate))
if len(l.buffer) == 0 {
return
}
fmt.Printf("cache [%d] wait to send \n", len(l.buffer))
for _, log := range l.buffer {
fmt.Printf("send Log [%s]\n", log)
}
return
}

type ServiceGroup struct {
ctx context.Context
cancel func()
services []Service //service list
}

func NewServiceGroup(ctx context.Context) *ServiceGroup {
g := ServiceGroup{}
g.ctx, g.cancel = context.WithCancel(ctx)
return &g
}

func (s *ServiceGroup) Add(service Service) {
s.services = append(s.services, service)
}

func (s *ServiceGroup) run(service Service) (err error) {
defer func() {
if r := recover(); r != nil {
err = r.(error)
fmt.Printf("receive panic msg: %s\n", err.Error())
}
}()
//with cancel ctx to child context
service.Serve(s.ctx)
return
}

func (s *ServiceGroup) watchDog() {
signalChan := make(chan os.Signal, 1)
signal.Notify(signalChan, syscall.SIGINT, syscall.SIGTERM)
for {
select {
case signalData := <-signalChan:
switch signalData {
case syscall.SIGINT:
fmt.Println("receive signal sigint")
case syscall.SIGTERM:
fmt.Println("receive signal sigerm")
default:
fmt.Println("receive singal unknown")
}
// do cancel notify all services cancel
s.cancel()
goto CLOSE
case <-s.ctx.Done():
goto CLOSE
}
}
CLOSE:
for _, service := range s.services {
if err := service.Shutdown(); err != nil {
fmt.Printf("shutdown failed err: %s", err)
}
}
}

func (s *ServiceGroup) ServeAll() {
var wg sync.WaitGroup
for idx := range s.services {
service := s.services[idx]
wg.Add(1)
go func() {
defer wg.Done()
if err := s.run(service); err != nil {
fmt.Printf("receive service [%s] has error: 【%s】, do cancel\n", service.GetName(), err.Error())
s.cancel()
}
}()
}
wg.Add(1)
go func() {
defer wg.Done()
s.watchDog()
}()
wg.Wait()
}

func main() {
rand.Seed(time.Now().Unix())
ctx := context.Background()

g := NewServiceGroup(ctx)
g.Add(&LogService{})
g.Add(&BusinessService{})
g.ServeAll()
}

/*
BusinessService serve run at 18:59:07.839774
BusinessService serve run at 18:59:08.841111
BusinessService serve run at 18:59:09.841922

^Creceive signal sigint # 在这里关闭

LogService shutdown begin... at 18:59:09.868933
cache [4] wait to send
send Log [Time: 1719399548]
send Log [Time: 1719399548]
send Log [Time: 1719399549]
send Log [Time: 1719399549]
LogService shutdown end... at 18:59:09.869027

BusinessService shutdown begin... at 18:59:09.869044
BusinessService shutdown end... at 18:59:09.869047

2222222222cancel 18:59:10.343492

BusinessService serve run at 18:59:10.843066
111111111111cancel 18:59:10.843116
*/

2. 优雅重启

2.1 代码

我们可以使用 fvbock/endless 来替换默认的 ListenAndServe启动服务来实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
package main

import (
"log"
"net/http"
"time"

"github.com/fvbock/endless"
"github.com/gin-gonic/gin"
)

func main() {
router := gin.Default()
router.GET("/", func(c *gin.Context) {
time.Sleep(5 * time.Second)
c.String(http.StatusOK, "hello gin!")
})
// 默认endless服务器会监听下列信号:
// syscall.SIGHUP,syscall.SIGUSR1,syscall.SIGUSR2,syscall.SIGINT,syscall.SIGTERM和syscall.SIGTSTP
// 接收到 SIGHUP 信号将触发`fork/restart` 实现优雅重启(kill -1 pid会发送SIGHUP信号)
// 接收到 syscall.SIGINT或syscall.SIGTERM 信号将触发优雅关机
// 接收到 SIGUSR2 信号将触发HammerTime
// SIGUSR1 和 SIGTSTP 被用来触发一些用户自定义的hook函数
if err := endless.ListenAndServe(":8080", router); err != nil {
log.Fatalf("listen: %s\n", err)
}

log.Println("Server exiting")
}

2.2 测试

1
2
3
4
5
6
7
# 1. 启动服务
go run main.go

# 2. 另外一个终端请求
curl http://localhost:8080

# 3. 可以测试 kill -1 信号,当前进程挂掉,开了一个新进程。 退出和-9会优雅关机。

需要注意的是,此时程序的PID变化了,因为endless 是通过fork子进程处理新请求,待原进程处理完当前请求后再退出的方式实现优雅重启的。所以当你的项目是使用类似supervisor的软件管理进程时就不适用这种方式了。

3. 参考资料

可以加首页作者微信,咨询相关问题!