0%

golang的http限流实战

1. golang限流实现

1.1 实现方案

  1. uber 开源库中基于漏桶算法实现了一个限流器。https://github.com/uber-go/ratelimit

  2. 滴滴开源实现了一个对http请求的限流器中间件。可以基于以下模式限流。

    • 基于IP,路径,方法,header,授权用户等限流

    • 通过自定义方法限流

    • 还支持基于 http header 设置限流数据

    • 实现方式是基于 github/go/time 实现的,不同类别的数据都存储在一个带超时时间的数据池中。

    • 代码地址 https://github.com/didip/tollbooth

  3. golang 网络包中还有基于信号量实现的限流器。 https://github.com/golang/net/blob/master/netutil/listen.go 也值得我们去学习下。

1.2 http 令牌桶限流

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
func main() {
limiter := rate.NewLimiter(rate.Every(100*time.Millisecond), 3) // 最多3个令牌,1秒放10个
http.HandleFunc("/ping", func(w http.ResponseWriter, r *http.Request) {
if limiter.Allow() { // do something
fmt.Println(time.Now().Format("2006-01-02 15:04:05"), "say hello")
} else {
fmt.Println(time.Now().Format("2006-01-02 15:04:05"), "limit")
}
})

go func() {
for {
time.Sleep(time.Second)
Req()
}
}()

_ = http.ListenAndServe(":13100", nil)
}

func Req() {
// 1秒请求10次
for i := 0; i < 10; i++ {
_, _ = resty.New().R().Get("http://localhost:13100/ping")
}
}



/*
2023-09-11 17:17:44 say hello
2023-09-11 17:17:44 say hello
2023-09-11 17:17:44 say hello
2023-09-11 17:17:44 limit
2023-09-11 17:17:44 limit
2023-09-11 17:17:44 limit
2023-09-11 17:17:44 limit
2023-09-11 17:17:44 limit
2023-09-11 17:17:44 limit
2023-09-11 17:17:44 limit
2023-09-11 17:17:45 say hello
2023-09-11 17:17:45 say hello
2023-09-11 17:17:45 say hello
2023-09-11 17:17:45 limit
2023-09-11 17:17:45 limit
2023-09-11 17:17:45 limit
2023-09-11 17:17:45 limit
2023-09-11 17:17:45 limit
2023-09-11 17:17:45 limit
2023-09-11 17:17:45 limit
2023-09-11 17:17:46 say hello
2023-09-11 17:17:46 say hello
2023-09-11 17:17:46 say hello
2023-09-11 17:17:46 limit
2023-09-11 17:17:46 limit
2023-09-11 17:17:46 limit
2023-09-11 17:17:46 limit
2023-09-11 17:17:46 limit
2023-09-11 17:17:46 limit
2023-09-11 17:17:46 limit
*/
  • 如果1秒放10个,最多存10个。那么每一秒可以打印10个sayhello。
  • 如果1秒放1个,最多存10个。第一次10个sayhello,后面每一秒一个sayhello。

1.3 Gin 中间件限流

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package main

import (
"time"

"github.com/didip/tollbooth"
"github.com/didip/tollbooth/limiter"
"github.com/gin-gonic/gin"
)

func main() {
r := gin.New()

lmt := tollbooth.NewLimiter(1, &limiter.ExpirableOptions{DefaultExpirationTTL: time.Second * 5})
lmt.SetIPLookups([]string{"RemoteAddr", "X-Forwarded-For", "X-Real-IP"})
lmt.SetMethods([]string{"POST", "GET"}) //放开更精准限制,但是也放松了流量。

r.Use(LimitHandler(lmt))
r.GET("/", func(c *gin.Context) {
c.String(200, "Get Hello, world!")
})
r.POST("/", func(c *gin.Context) {
c.String(200, "Post Hello, world!")
})
r.Run(":12345")
}

func LimitHandler(lmt *limiter.Limiter) gin.HandlerFunc {
return func(c *gin.Context) {
httpError := tollbooth.LimitByRequest(lmt, c.Writer, c.Request)
if httpError != nil {
c.Data(httpError.StatusCode, lmt.GetMessageContentType(), []byte(httpError.Message))
c.Abort()
} else {
c.Next()
}
}
}

1. 根据 IP 和 ReqPath 等配置限流

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
//"github.com/didip/tollbooth"
//"github.com/didip/tollbooth/errors"
// github.com/didip/tollbooth/limiter"

lmt := tollbooth.NewLimiter(10, &limiter.ExpirableOptions{ // 每秒 10 个请求
DefaultExpirationTTL: time.Hour * 24, // token过期的时间,放在cache里,可以节省内存
})

func wrapGinLimitHandler(lmt *limiter.Limiter) gin.HandlerFunc {
limitOptions := []string{"ip","token","device","version","platform","lang"}
allowPathList := configure.Global.HttpConfig.AllowPathList
denyPathList := configure.Global.HttpConfig.DenyPathList
return func(c *gin.Context) {
path := c.Request.URL.Path
// allow and deny path list config
for _, v := range allowPathList {
if strings.Contains(path, v) {
c.Next()
return
}
}
for _, v := range denyPathList {
if path == v {
httpError := &errors.HTTPError{Message: lmt.GetMessage(), StatusCode: lmt.GetStatusCode()}
c.Data(httpError.StatusCode, lmt.GetMessageContentType(), []byte(httpError.Message))
c.Abort()
return
}
}

// get remote ip
remoteIP := c.Request.Header.Get("X-Real-IP")
if remoteIP == "" {
remoteIP = c.ClientIP()
}
// filter ips
if remoteIP == "127.0.0.1" || remoteIP == "localhost" {
c.Next()
return
}

// get limit keys
var keys []string
for _, v := range limitOptions {
if strings.Contains(v, "ip") {
keys = append(keys, remoteIP)
} else if strings.Contains(v, "token") {
token := c.GetHeader("Token")
if token == "" {
token = c.GetHeader("Access-Token")
}
keys = append(keys, token)
} else if strings.Contains(v, "device") {
keys = append(keys, c.GetHeader("Device-Id"))
} else if strings.Contains(v, "version") {
keys = append(keys, c.GetHeader("Appversion"))
} else if strings.Contains(v, "platform") {
keys = append(keys, c.GetHeader("Platform"))
} else if strings.Contains(v, "lang") {
keys = append(keys, c.GetHeader("Lang"))
}
}
// if null default ip
if len(keys) == 0 {
keys = append(keys, remoteIP)
}
// keys included path
keys = append(keys, path)

// limit by keys
httpError := tollbooth.LimitByKeys(lmt, keys)
if httpError != nil {
strKeys := path
lqlog.WarnCtx(c.Request.Context(), "[wrapGinLimitHandler] keys: (%v)", strKeys)
c.Data(httpError.StatusCode, lmt.GetMessageContentType(), []byte(httpError.Message))
if _, ok := LimitKeysMap.Load(strKeys); ok {
// nothing
} else {
LimitKeysMap.Store(strKeys, "1")
text := fmt.Sprintf(`"%s %s" trigger http limit, keys:%+v`, c.Request.Method, path, keys)
text += fmt.Sprintf("\nAppversion: %v", c.GetHeader("Appversion"))
text += fmt.Sprintf("\nPlatform: %v", c.GetHeader("Platform"))
FeishuAlarmText(c, text)
}
c.Abort()
} else {
c.Next()
}
}
}

2. 根据 IP Redis全局限流

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
// ratelimit "github.com/JGLTechnologies/gin-rate-limit"
request.POST("/create_info", middleware.LimitIPRate(time.Hour*6, 3), handler.AddUserInfo)


func LimitIPRate(rate time.Duration, limit uint) gin.HandlerFunc {
if !configure.Global.HttpConfig.IPRateLimit {
return func(c *gin.Context) { c.Next() }
}

// 本地测试需要关闭
skipFunc := func(c *gin.Context) bool {
remoteIP := GetGinClientIP(c)
if remoteIP == "127.0.0.1" || remoteIP == "localhost" {
return true
}
return false
}

// 内存限制
//store := ratelimit.InMemoryStore(&ratelimit.InMemoryOptions{Rate: rate, Limit: limit, Skip: skipFunc})
// Redis限制
store := ratelimit.RedisStore(&ratelimit.RedisOptions{RedisClient: dao.LimitRedisClient, Rate: rate, Limit: limit, Skip: skipFunc})

mw := ratelimit.RateLimiter(store, &ratelimit.Options{
ErrorHandler: func(c *gin.Context, info ratelimit.Info) {
c.String(429, "Too many requests. Try again in "+time.Until(info.ResetTime).String())
key := "LimitIPRate" + c.ClientIP() + c.FullPath()
if _, ok := LimitKeysMap.Load(key); !ok {
LimitKeysMap.Store(key, "1")
text := fmt.Sprintf(`"%s %s" trigger ipRate limit`, c.Request.Method, key)
text += fmt.Sprintf("\nAppversion: %v", c.GetHeader("Appversion"))
text += fmt.Sprintf("\nPlatform: %v", c.GetHeader("Platform"))
lqlog.WarnCtx(c, text)
}
},
KeyFunc: func(c *gin.Context) string {
return "LimitIPRate:" + GetGinClientIP(c)
},
})
return mw
}

func GetGinClientIP(c *gin.Context) string {
remoteIP := c.Request.Header.Get("X-Real-IP")
if remoteIP == "" {
remoteIP = c.ClientIP()
}
return remoteIP
}

2. 限流测试

2.1 vegeta测试

有一个非常棒的工具称作 vegeta,我喜欢在 HTTP 负载测试中使用(它也是用 Go 编写的)。

1
brew install vegeta

我们需要创建一个简单的配置文件,声明我们想要发送的请求。

1
GET http://localhost:12345/

然后,以每个时间单元 100 个请求的速率攻击 10 秒。

1
2
3
4
5
6
7
vegeta attack -duration=10s -rate=100 -targets=vegeta.conf | vegeta report

echo "http://localhost:12345" | vegeta attack -rate=500 -connections=100 -duration=10s | tee results.bin | vegeta report


// vegeta attack -duration=10s -rate=100 -targets=vegeta.conf | vegeta report 1s只成功了一个
// Status Codes [code:count] 200:10 429:990

结果,你将看到一些请求返回 200,但大多数返回 429。

2.2 wrk测试

先在本地安装wrk。

1
2
3
4
git clone https://github.com/wg/wrk
cd wrk
make
ln -s $PWD/wrk /usr/local/bin/

我的mac是6核,线程数不要太多,是核数的 2 到 4 倍即可。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
wrk -t6 -c10 -d10s  --latency http://127.0.0.1:9061/health


Running 10s test @ http://127.0.0.1:9061/health
6 threads and 10 connections
Thread Stats Avg Stdev Max +/- Stdev
Latency 1.28ms 14.43ms 399.37ms 99.59%
Req/Sec 2.06k 262.21 2.72k 84.25%
Latency Distribution
50% 448.00us
75% 620.00us
90% 797.00us
99% 1.38ms
123958 requests in 10.10s, 20.21MB read
Non-2xx or 3xx responses: 123847
Requests/sec: 12270.53
Transfer/sec: 2.00MB

# 限流后,可以看到,一共123958个, 失败了 123847 个

3. 参考资料

可以加首页作者微信,咨询相关问题!