0%

令牌桶算法和golang的http限流实战

限流就是通过对并发访问 / 请求进行限速,或者对一个时间窗口内的请求进行限速来保护系统,一旦达到限制速率则可以拒绝服务、排队或等待、降级等处理。

例如秒杀网站,限制 22 点 5 分 – 22 点 10 分 秒杀 999 份产品, 限制放行 5w 个请求,若在该段时间内,请求在第 5w 以后的请求,直接拒之门外, 也就是我们在进入网站的时候显示,系统繁忙。

1. 常见限流算法

1.1 固定时间窗口控制

最简单的是 使用计数器来控制,设置固定的时间内,处理固定的请求数。

img

1.2 滑动窗口计数器算法

能够去平滑一下处理的任务数量。滑动窗口计数器是通过将窗口再细分,并且按照时间滑动,这种算法避免了固定窗口算法带来的双倍突发请求,但时间区间精度越高,算法所需的空间容量越大。

img

1.3 漏桶算法

漏桶是有缓存的,有请求就会放到缓存中。漏桶以固定的速率往外漏水,若桶空了则停止漏水。比如说,1s 漏 1000 滴水,正如 1s 处理 1000 个请求。如果漏桶慢了,则多余的水滴也会被直接舍弃。

img

如图,水滴即为请求的事件,如果漏桶可以缓存 5000 个事件,实际服务器 1s 处理 1000 个事件,那么在高峰期的时候,响应时间最多等 5 秒,但是不能一直是高峰期,否则,一直响应时间都是 5s,就会是很慢的时间了,这个时间也是很影响体验的。

如果桶满了,还有请求过来的话,则会被直接丢弃,这种做法,还是丢弃了请求。

1.4 令牌桶算法

通过动态控制令牌的数量,来更好的服务客户端的请求事情,令牌的生成数量和生产速率都是可以灵活控制的

img

令牌桶和漏桶不同的地方在于令牌桶可以自己控制生成令牌的速率,例如高峰期就可以多生成一些令牌来满足客户端的需求。

令牌桶实现的限流器算法,相较于漏桶算法可以在一定程度上允许突发的流量进入我们的应用中,所以在web应用中最为广泛。

2. golang的令牌桶库

golang 标准库中就自带了限流算法的实现,golang.org/x/time/rate,该限流器是基于 Token Bucket (令牌桶) 实现的。

令牌桶就是我们上面说的桶,里面装令牌,系统会以恒定速率向桶中放令牌,桶满则暂时不放。 用户请求就要向桶里面拿令牌。

2.1 创建令牌桶

1
2
3
4
5
6
7
8
func NewLimiter(r Limit, b int) *Limiter {
return &Limiter{
limit: r,
burst: b,
}
}

limiter := NewLimiter(5, 1);
  • 第一个参数是 r Limit,这是代表每秒可以向令牌桶中产生多少令牌
  • 第二个参数是 b int,这是代表令牌桶的容量大小

我们来看一个案例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
package main

import (
"context"
"log"
"time"

"golang.org/x/time/rate"
)


func main() {
l := rate.NewLimiter(1, 2) // 一秒放1个,最多寸2个令牌。
for i := 0; i < 50; i++ {

c, _ := context.WithTimeout(context.Background(), time.Second*2)
if err := l.Wait(c); err != nil {
log.Println("limiter wait error : " + err.Error())
}
log.Println("Wait success:", i)

// Reserve返回等待时间,再去取令牌
r := l.Reserve()
log.Println("reserve time :", r.Delay())

// Allow判断当前是否可以取到令牌
a := l.Allow()
log.Println("Allow == ", a)
}
}

2.2 令牌桶消费函数

Wait (阻塞等待)

Wait ,等于 WaitN(ctx,1)

若此时桶内令牌数组不足 (小于N),那么 Wait 方法将会阻塞一段时间,直至令牌满足条件,否则就一直阻塞,若满足条件,则直接返回结果。Wait 的 context 参数,可以设置超时时间。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import (
"context"
"fmt"
"time"

"golang.org/x/time/rate"
)

func main() {
l := rate.NewLimiter(1, 3) // 1秒1个,最多3个

c, _ := context.WithCancel(context.TODO())
for {
l.WaitN(c, 1)
fmt.Println(time.Now().Format("2006-01-02 15:04:05"))
}
}


2023-09-11 17:02:13
2023-09-11 17:02:13
2023-09-11 17:02:13 // 拿完3个后,再阻塞了
2023-09-11 17:02:14
2023-09-11 17:02:15
2023-09-11 17:02:16
2023-09-11 17:02:17
Allow (立马返回)

Allow 等于 AllowN(time.Now(),1), 当前取一个令牌,若满足,则为 true,否则 false。

AllowN 方法 指的是,截止到某一时刻,目前桶中令牌数目是否至少为 N 个,满足则返回 true,同时从桶中消费 N 个令牌。 反之返回不消费令牌,返回 false

如果你需要在事件超出频率的时候丢弃或跳过事件,就使用AllowN,否则使用 Reserve 或Wait。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
package main

import (
"fmt"
"time"

"golang.org/x/time/rate"
)

func main() {
l := rate.NewLimiter(1, 3)

for {
if l.AllowN(time.Now(), 1) {
fmt.Println("ok", time.Now().Format("2006-01-02 15:04:05"))
} else {
time.Sleep(time.Second / 3)
fmt.Println("false", time.Now().Format("2006-01-02 15:04:05"))
}
}
}


ok 2023-09-11 17:07:53
ok 2023-09-11 17:07:53
ok 2023-09-11 17:07:53
false 2023-09-11 17:07:53
false 2023-09-11 17:07:54
false 2023-09-11 17:07:54
ok 2023-09-11 17:07:54
false 2023-09-11 17:07:54
false 2023-09-11 17:07:55
false 2023-09-11 17:07:55
ok 2023-09-11 17:07:55
false 2023-09-11 17:07:55
false 2023-09-11 17:07:56
false 2023-09-11 17:07:56
ok 2023-09-11 17:07:56
false 2023-09-11 17:07:56
false 2023-09-11 17:07:57
false 2023-09-11 17:07:57
Reserve (自己控制)

Reserve , 等于 `ReserveN(time.Now(), 1)

ReserveN 当调用完成后,无论令牌是否充足,都会返回一个 Reservation * 对象

我们可以调用该对象的 Delay() 方法,该方法返回了需要等待的时间。如果等待时间为 0 秒,则说明不用等待,若大于 0 秒,则必须等到等待时间之后,才能向后进行。
当然,若不想等待,你可以归还令牌,一个都不能少,调用该对象的 Cancel() 方法即可。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
package main

import (
"fmt"
"time"

"golang.org/x/time/rate"
)

func main() {
l := rate.NewLimiter(1, 3)

for {
r := l.ReserveN(time.Now(), 1)
s := r.Delay()
time.Sleep(s)
fmt.Println(s, time.Now().Format("04:05.000"))
}
}



0s 2023-09-11 17:10:47
0s 2023-09-11 17:10:47
0s 2023-09-11 17:10:47
999.811042ms 2023-09-11 17:10:48
999.675375ms 2023-09-11 17:10:49
998.794834ms 2023-09-11 17:10:50
998.750334ms 2023-09-11 17:10:51
998.832875ms 2023-09-11 17:10:52

3. golang限流实现

3.1 实现方案

  1. uber 开源库中基于漏桶算法实现了一个限流器。漏桶算法可以限制流量的请求速度,并起到削峰填谷的作用。 https://github.com/uber-go/ratelimit
  2. 滴滴开源实现了一个对http请求的限流器中间件。可以基于以下模式限流。
    • 基于IP,路径,方法,header,授权用户等限流
    • 通过自定义方法限流
    • 还支持基于 http header 设置限流数据
    • 实现方式是基于 github/go/time 实现的,不同类别的数据都存储在一个带超时时间的数据池中。
    • 代码地址 https://github.com/didip/tollbooth
  3. golang 网络包中还有基于信号量实现的限流器。 https://github.com/golang/net/blob/master/netutil/listen.go 也值得我们去学习下。

3.2 http 令牌桶限流

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
func main() {
limiter := rate.NewLimiter(rate.Every(100*time.Millisecond), 3) // 最多3个令牌,1秒放10个
http.HandleFunc("/ping", func(w http.ResponseWriter, r *http.Request) {
if limiter.Allow() { // do something
fmt.Println(time.Now().Format("2006-01-02 15:04:05"), "say hello")
} else {
fmt.Println(time.Now().Format("2006-01-02 15:04:05"), "limit")
}
})

go func() {
for {
time.Sleep(time.Second)
Req()
}
}()

_ = http.ListenAndServe(":13100", nil)
}

func Req() {
// 1秒请求10次
for i := 0; i < 10; i++ {
_, _ = resty.New().R().Get("http://localhost:13100/ping")
}
}



/*
2023-09-11 17:17:44 say hello
2023-09-11 17:17:44 say hello
2023-09-11 17:17:44 say hello
2023-09-11 17:17:44 limit
2023-09-11 17:17:44 limit
2023-09-11 17:17:44 limit
2023-09-11 17:17:44 limit
2023-09-11 17:17:44 limit
2023-09-11 17:17:44 limit
2023-09-11 17:17:44 limit
2023-09-11 17:17:45 say hello
2023-09-11 17:17:45 say hello
2023-09-11 17:17:45 say hello
2023-09-11 17:17:45 limit
2023-09-11 17:17:45 limit
2023-09-11 17:17:45 limit
2023-09-11 17:17:45 limit
2023-09-11 17:17:45 limit
2023-09-11 17:17:45 limit
2023-09-11 17:17:45 limit
2023-09-11 17:17:46 say hello
2023-09-11 17:17:46 say hello
2023-09-11 17:17:46 say hello
2023-09-11 17:17:46 limit
2023-09-11 17:17:46 limit
2023-09-11 17:17:46 limit
2023-09-11 17:17:46 limit
2023-09-11 17:17:46 limit
2023-09-11 17:17:46 limit
2023-09-11 17:17:46 limit
*/
  • 如果1秒放10个,最多存10个。那么每一秒可以打印10个sayhello。
  • 如果1秒放1个,最多存10个。第一次10个sayhello,后面每一秒一个sayhello。

3.3 Gin 中间件限流

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package main

import (
"time"

"github.com/didip/tollbooth"
"github.com/didip/tollbooth/limiter"
"github.com/gin-gonic/gin"
)

func main() {
r := gin.New()

lmt := tollbooth.NewLimiter(1, &limiter.ExpirableOptions{DefaultExpirationTTL: time.Second * 5})
lmt.SetIPLookups([]string{"RemoteAddr", "X-Forwarded-For", "X-Real-IP"})
lmt.SetMethods([]string{"POST", "GET"}) //放开更精准限制,但是也放松了流量。

r.Use(LimitHandler(lmt))
r.GET("/", func(c *gin.Context) {
c.String(200, "Get Hello, world!")
})
r.POST("/", func(c *gin.Context) {
c.String(200, "Post Hello, world!")
})
r.Run(":12345")
}

func LimitHandler(lmt *limiter.Limiter) gin.HandlerFunc {
return func(c *gin.Context) {
httpError := tollbooth.LimitByRequest(lmt, c.Writer, c.Request)
if httpError != nil {
c.Data(httpError.StatusCode, lmt.GetMessageContentType(), []byte(httpError.Message))
c.Abort()
} else {
c.Next()
}
}
}
根据 IP 和 ReqPath 等配置限流
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
//"github.com/didip/tollbooth"
//"github.com/didip/tollbooth/errors"
// github.com/didip/tollbooth/limiter"

lmt := tollbooth.NewLimiter(10, &limiter.ExpirableOptions{ // 每秒 10 个请求
DefaultExpirationTTL: time.Hour * 24, // token过期的时间,放在cache里,可以节省内存
})

func wrapGinLimitHandler(lmt *limiter.Limiter) gin.HandlerFunc {
limitOptions := []string{"ip","token","device","version","platform","lang"}
allowPathList := configure.Global.HttpConfig.AllowPathList
denyPathList := configure.Global.HttpConfig.DenyPathList
return func(c *gin.Context) {
path := c.Request.URL.Path
// allow and deny path list config
for _, v := range allowPathList {
if strings.Contains(path, v) {
c.Next()
return
}
}
for _, v := range denyPathList {
if path == v {
httpError := &errors.HTTPError{Message: lmt.GetMessage(), StatusCode: lmt.GetStatusCode()}
c.Data(httpError.StatusCode, lmt.GetMessageContentType(), []byte(httpError.Message))
c.Abort()
return
}
}

// get remote ip
remoteIP := c.Request.Header.Get("X-Real-IP")
if remoteIP == "" {
remoteIP = c.ClientIP()
}
// filter ips
if remoteIP == "127.0.0.1" || remoteIP == "localhost" {
c.Next()
return
}

// get limit keys
var keys []string
for _, v := range limitOptions {
if strings.Contains(v, "ip") {
keys = append(keys, remoteIP)
} else if strings.Contains(v, "token") {
token := c.GetHeader("Token")
if token == "" {
token = c.GetHeader("Access-Token")
}
keys = append(keys, token)
} else if strings.Contains(v, "device") {
keys = append(keys, c.GetHeader("Device-Id"))
} else if strings.Contains(v, "version") {
keys = append(keys, c.GetHeader("Appversion"))
} else if strings.Contains(v, "platform") {
keys = append(keys, c.GetHeader("Platform"))
} else if strings.Contains(v, "lang") {
keys = append(keys, c.GetHeader("Lang"))
}
}
// if null default ip
if len(keys) == 0 {
keys = append(keys, remoteIP)
}
// keys included path
keys = append(keys, path)

// limit by keys
httpError := tollbooth.LimitByKeys(lmt, keys)
if httpError != nil {
strKeys := path
lqlog.WarnCtx(c.Request.Context(), "[wrapGinLimitHandler] keys: (%v)", strKeys)
c.Data(httpError.StatusCode, lmt.GetMessageContentType(), []byte(httpError.Message))
if _, ok := LimitKeysMap.Load(strKeys); ok {
// nothing
} else {
LimitKeysMap.Store(strKeys, "1")
text := fmt.Sprintf(`"%s %s" trigger http limit, keys:%+v`, c.Request.Method, path, keys)
text += fmt.Sprintf("\nAppversion: %v", c.GetHeader("Appversion"))
text += fmt.Sprintf("\nPlatform: %v", c.GetHeader("Platform"))
FeishuAlarmText(c, text)
}
c.Abort()
} else {
c.Next()
}
}
}
根据 IP Redis全局限流
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
// ratelimit "github.com/JGLTechnologies/gin-rate-limit"
request.POST("/create_info", middleware.LimitIPRate(time.Hour*6, 3), handler.AddUserInfo)


func LimitIPRate(rate time.Duration, limit uint) gin.HandlerFunc {
if !configure.Global.HttpConfig.IPRateLimit {
return func(c *gin.Context) { c.Next() }
}

// 本地测试需要关闭
skipFunc := func(c *gin.Context) bool {
remoteIP := GetGinClientIP(c)
if remoteIP == "127.0.0.1" || remoteIP == "localhost" {
return true
}
return false
}

// 内存限制
//store := ratelimit.InMemoryStore(&ratelimit.InMemoryOptions{Rate: rate, Limit: limit, Skip: skipFunc})
// Redis限制
store := ratelimit.RedisStore(&ratelimit.RedisOptions{RedisClient: dao.LimitRedisClient, Rate: rate, Limit: limit, Skip: skipFunc})

mw := ratelimit.RateLimiter(store, &ratelimit.Options{
ErrorHandler: func(c *gin.Context, info ratelimit.Info) {
c.String(429, "Too many requests. Try again in "+time.Until(info.ResetTime).String())
key := "LimitIPRate" + c.ClientIP() + c.FullPath()
if _, ok := LimitKeysMap.Load(key); !ok {
LimitKeysMap.Store(key, "1")
text := fmt.Sprintf(`"%s %s" trigger ipRate limit`, c.Request.Method, key)
text += fmt.Sprintf("\nAppversion: %v", c.GetHeader("Appversion"))
text += fmt.Sprintf("\nPlatform: %v", c.GetHeader("Platform"))
lqlog.WarnCtx(c, text)
}
},
KeyFunc: func(c *gin.Context) string {
return "LimitIPRate:" + GetGinClientIP(c)
},
})
return mw
}

func GetGinClientIP(c *gin.Context) string {
remoteIP := c.Request.Header.Get("X-Real-IP")
if remoteIP == "" {
remoteIP = c.ClientIP()
}
return remoteIP
}

3.4 vegeta测试

有一个非常棒的工具称作 vegeta,我喜欢在 HTTP 负载测试中使用(它也是用 Go 编写的)。

1
brew install vegeta

我们需要创建一个简单的配置文件,声明我们想要发送的请求。

1
GET http://localhost:12345/

然后,以每个时间单元 100 个请求的速率攻击 10 秒。

1
2
3
4
5
6
7
vegeta attack -duration=10s -rate=100 -targets=vegeta.conf | vegeta report

echo "http://localhost:12345" | vegeta attack -rate=500 -connections=100 -duration=10s | tee results.bin | vegeta report


// vegeta attack -duration=10s -rate=100 -targets=vegeta.conf | vegeta report 1s只成功了一个
// Status Codes [code:count] 200:10 429:990

结果,你将看到一些请求返回 200,但大多数返回 429。

3.5 wrk测试

先在本地安装wrk。

1
2
3
4
git clone https://github.com/wg/wrk
cd wrk
make
ln -s $PWD/wrk /usr/local/bin/

我的mac是6核,线程数不要太多,是核数的 2 到 4 倍即可。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
wrk -t6 -c10 -d10s  --latency http://127.0.0.1:9061/health


Running 10s test @ http://127.0.0.1:9061/health
6 threads and 10 connections
Thread Stats Avg Stdev Max +/- Stdev
Latency 1.28ms 14.43ms 399.37ms 99.59%
Req/Sec 2.06k 262.21 2.72k 84.25%
Latency Distribution
50% 448.00us
75% 620.00us
90% 797.00us
99% 1.38ms
123958 requests in 10.10s, 20.21MB read
Non-2xx or 3xx responses: 123847
Requests/sec: 12270.53
Transfer/sec: 2.00MB

# 限流后,可以看到,一共123958个, 失败了 123847 个

4. 参考资料

给作者打赏,可以加首页微信,咨询作者相关问题!