0%

常见限流算法和golang的令牌桶算法

限流就是通过对并发访问 / 请求进行限速,或者对一个时间窗口内的请求进行限速来保护系统,一旦达到限制速率则可以拒绝服务、排队或等待、降级等处理。

例如秒杀网站,限制 22 点 5 分 – 22 点 10 分 秒杀 999 份产品, 限制放行 5w 个请求,若在该段时间内,请求在第 5w 以后的请求,直接拒之门外, 也就是我们在进入网站的时候显示,系统繁忙。

1. 常见限流算法

1.1 固定时间窗口

  • 给定一个变量 counter 来记录当前接口处理的请求数量,初始值为 0(代表接口当前 1 分钟内还未处理请求)。
  • 1 分钟之内每处理一个请求之后就将 counter+1 ,当 counter=33 之后(也就是说在这 1 分钟内接口已经被访问 33 次的话),后续的请求就会被全部拒绝。
  • 等到 1 分钟结束后,将 counter 重置 0,重新开始计数。
image-20240707221901062

1.2 滑动窗口算法

滑动窗口计数器是通过将窗口再细分,并能够去平滑一下处理的任务数量。

例如我们的接口限流每分钟处理 60 个请求,我们可以把 1 分钟分为 60 个窗口。每隔 1 秒移动一次,每个窗口一秒只能处理 不大于 60(请求数)/60(窗口数) 的请求, 如果当前窗口的请求计数总和超过了限制的数量的话就不再处理其他请求。

算法可以用 zset 实现,跳跃表。

滑动窗口计数器算法

1.3 漏桶算法(请求到时才放雨滴)

漏桶是有缓存的,有请求就会放到缓存中,我们可以往桶中以任意速率流入水,但是桶是以一定速率流出水。

漏桶以固定的速率往外漏水,若桶空了则停止漏水。比如说,1s 漏 1000 滴水,正如 1s 处理 1000 个请求。如果漏桶慢了,则多余的水滴也会被直接舍弃。

实现这个算法的话也很简单,准备一个队列用来保存请求,然后我们定期从队列中拿请求来执行就好了(和消息队列削峰/限流的思想是一样的)。

4

1.4 令牌桶算法(请求前就可以放令牌,有一定缓存)

令牌桶算法通过定期向桶中添加令牌(按设定的速率),即使在没有请求的情况下,令牌也会不断积累,直到达到桶的容量上限。

当突发请求到来时,如果桶中有足够的令牌,这些请求可以立即获得令牌并被处理,而不需要等待。因为桶中可能已经积累了大量的令牌,这些令牌可以在短时间内迅速消耗掉,从而允许大量请求在短时间内通过。

令牌桶实现的限流器算法,相较于漏桶算法可以在一定程度上允许突发的流量进入我们的应用中,所以在web应用中最为广泛。

3

2. golang的令牌桶实现

golang 标准库中就自带了限流算法的实现,golang.org/x/time/rate,该限流器是基于 Token Bucket (令牌桶) 实现的。

令牌桶就是我们上面说的桶,里面装令牌,系统会以恒定速率向桶中放令牌,桶满则暂时不放。 用户请求就要向桶里面拿令牌。

2.1 创建令牌桶

1
2
3
4
5
6
7
8
func NewLimiter(r Limit, b int) *Limiter {
return &Limiter{
limit: r,
burst: b,
}
}

limiter := NewLimiter(5, 1);
  • 第一个参数是 r Limit,这是代表每秒可以向令牌桶中产生多少令牌
  • 第二个参数是 b int,这是代表令牌桶的容量大小

我们来看一个案例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
package main

import (
"context"
"log"
"time"

"golang.org/x/time/rate"
)


func main() {
l := rate.NewLimiter(1, 2) // 一秒放1个,最多寸2个令牌。
for i := 0; i < 50; i++ {

c, _ := context.WithTimeout(context.Background(), time.Second*2)
if err := l.Wait(c); err != nil {
log.Println("limiter wait error : " + err.Error())
}
log.Println("Wait success:", i)

// Reserve返回等待时间,再去取令牌
r := l.Reserve()
log.Println("reserve time :", r.Delay())

// Allow判断当前是否可以取到令牌
a := l.Allow()
log.Println("Allow == ", a)
}
}

2.2 令牌桶消费函数

1. Wait (阻塞等待)

Wait ,等于 WaitN(ctx,1)

若此时桶内令牌数组不足 (小于N),那么 Wait 方法将会阻塞一段时间,直至令牌满足条件,否则就一直阻塞,若满足条件,则直接返回结果。Wait 的 context 参数,可以设置超时时间。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import (
"context"
"fmt"
"time"

"golang.org/x/time/rate"
)

func main() {
l := rate.NewLimiter(1, 3) // 1秒1个,最多3个

c, _ := context.WithCancel(context.TODO())
for {
l.WaitN(c, 1)
fmt.Println(time.Now().Format("2006-01-02 15:04:05"))
}
}


2023-09-11 17:02:13
2023-09-11 17:02:13
2023-09-11 17:02:13 // 拿完3个后,再阻塞了
2023-09-11 17:02:14
2023-09-11 17:02:15
2023-09-11 17:02:16
2023-09-11 17:02:17

2. Allow (立马返回)

Allow 等于 AllowN(time.Now(),1), 当前取一个令牌,若满足,则为 true,否则 false。

AllowN 方法 指的是,截止到某一时刻,目前桶中令牌数目是否至少为 N 个,满足则返回 true,同时从桶中消费 N 个令牌。 反之返回不消费令牌,返回 false

如果你需要在事件超出频率的时候丢弃或跳过事件,就使用AllowN,否则使用 Reserve 或Wait。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
package main

import (
"fmt"
"time"

"golang.org/x/time/rate"
)

func main() {
l := rate.NewLimiter(1, 3)

for {
if l.AllowN(time.Now(), 1) {
fmt.Println("ok", time.Now().Format("2006-01-02 15:04:05"))
} else {
time.Sleep(time.Second / 3)
fmt.Println("false", time.Now().Format("2006-01-02 15:04:05"))
}
}
}


ok 2023-09-11 17:07:53
ok 2023-09-11 17:07:53
ok 2023-09-11 17:07:53
false 2023-09-11 17:07:53
false 2023-09-11 17:07:54
false 2023-09-11 17:07:54
ok 2023-09-11 17:07:54
false 2023-09-11 17:07:54
false 2023-09-11 17:07:55
false 2023-09-11 17:07:55
ok 2023-09-11 17:07:55
false 2023-09-11 17:07:55
false 2023-09-11 17:07:56
false 2023-09-11 17:07:56
ok 2023-09-11 17:07:56
false 2023-09-11 17:07:56
false 2023-09-11 17:07:57
false 2023-09-11 17:07:57

3. Reserve (自己控制)

Reserve , 等于 `ReserveN(time.Now(), 1)

ReserveN 当调用完成后,无论令牌是否充足,都会返回一个 Reservation * 对象

我们可以调用该对象的 Delay() 方法,该方法返回了需要等待的时间。如果等待时间为 0 秒,则说明不用等待,若大于 0 秒,则必须等到等待时间之后,才能向后进行。
当然,若不想等待,你可以归还令牌,一个都不能少,调用该对象的 Cancel() 方法即可。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
package main

import (
"fmt"
"time"

"golang.org/x/time/rate"
)

func main() {
l := rate.NewLimiter(1, 3)

for {
r := l.ReserveN(time.Now(), 1)
s := r.Delay()
time.Sleep(s)
fmt.Println(s, time.Now().Format("04:05.000"))
}
}



0s 2023-09-11 17:10:47
0s 2023-09-11 17:10:47
0s 2023-09-11 17:10:47
999.811042ms 2023-09-11 17:10:48
999.675375ms 2023-09-11 17:10:49
998.794834ms 2023-09-11 17:10:50
998.750334ms 2023-09-11 17:10:51
998.832875ms 2023-09-11 17:10:52

3. 参考资料

可以加首页作者微信,咨询相关问题!