限流就是通过对并发访问 / 请求进行限速,或者对一个时间窗口内的请求进行限速来保护系统,一旦达到限制速率则可以拒绝服务、排队或等待、降级等处理。
例如秒杀网站,限制 22 点 5 分 – 22 点 10 分 秒杀 999 份产品, 限制放行 5w 个请求,若在该段时间内,请求在第 5w 以后的请求,直接拒之门外, 也就是我们在进入网站的时候显示,系统繁忙。
1. 常见限流算法 1.1 固定时间窗口控制 最简单的是 使用计数器来控制,设置固定的时间内,处理固定的请求数。
1.2 滑动窗口计数器算法 能够去平滑一下处理的任务数量。滑动窗口计数器是通过将窗口再细分,并且按照时间滑动,这种算法避免了固定窗口算法带来的双倍突发请求,但时间区间精度越高,算法所需的空间容量越大。
1.3 漏桶算法 漏桶是有缓存的,有请求就会放到缓存中。漏桶以固定的速率往外漏水,若桶空了则停止漏水。比如说,1s 漏 1000 滴水,正如 1s 处理 1000 个请求。如果漏桶慢了,则多余的水滴也会被直接舍弃。
如图,水滴即为请求的事件,如果漏桶可以缓存 5000 个事件,实际服务器 1s 处理 1000 个事件,那么在高峰期的时候,响应时间最多等 5 秒,但是不能一直是高峰期,否则,一直响应时间都是 5s,就会是很慢的时间了,这个时间也是很影响体验的。
如果桶满了,还有请求过来的话,则会被直接丢弃 ,这种做法,还是丢弃了请求。
1.4 令牌桶算法 通过动态控制令牌的数量,来更好的服务客户端的请求事情,令牌的生成数量和生产速率都是可以灵活控制的
令牌桶和漏桶不同的地方在于 令牌桶可以自己控制生成令牌的速率,例如高峰期就可以多生成一些令牌来满足客户端的需求。
令牌桶实现的限流器算法,相较于漏桶算法可以在一定程度上允许突发的流量进入我们的应用中,所以在web应用中最为广泛。
2. golang的令牌桶库 golang 标准库中就自带了限流算法的实现,golang.org/x/time/rate
,该限流器是基于 Token Bucket (令牌桶) 实现的。
令牌桶就是我们上面说的桶,里面装令牌,系统会以恒定速率向桶中放令牌,桶满则暂时不放。 用户请求就要向桶里面拿令牌。
2.1 创建令牌桶 1 2 3 4 5 6 7 8 func NewLimiter (r Limit, b int ) *Limiter { return &Limiter{ limit: r, burst: b, } } limiter := NewLimiter(5 , 1 );
第一个参数是 r Limit,这是代表每秒可以向令牌桶 中产生多少令牌 第二个参数是 b int,这是代表令牌桶 的容量大小
我们来看一个案例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 package mainimport ( "context" "log" "time" "golang.org/x/time/rate" ) func main () { l := rate.NewLimiter(1 , 2 ) for i := 0 ; i < 50 ; i++ { c, _ := context.WithTimeout(context.Background(), time.Second*2 ) if err := l.Wait(c); err != nil { log.Println("limiter wait error : " + err.Error()) } log.Println("Wait success:" , i) r := l.Reserve() log.Println("reserve time :" , r.Delay()) a := l.Allow() log.Println("Allow == " , a) } }
2.2 令牌桶消费函数 Wait (阻塞等待) Wait ,等于 WaitN(ctx,1)
若此时桶内令牌数组不足 (小于N),那么 Wait 方法将会阻塞一段时间,直至令牌满足条件,否则就一直阻塞,若满足条件,则直接返回结果。Wait 的 context 参数,可以设置超时时间。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 import ( "context" "fmt" "time" "golang.org/x/time/rate" ) func main () { l := rate.NewLimiter(1 , 3 ) c, _ := context.WithCancel(context.TODO()) for { l.WaitN(c, 1 ) fmt.Println(time.Now().Format("2006-01-02 15:04:05" )) } } 2023 -09 -11 17 :02 :13 2023 -09 -11 17 :02 :13 2023 -09 -11 17 :02 :13 2023 -09 -11 17 :02 :14 2023 -09 -11 17 :02 :15 2023 -09 -11 17 :02 :16 2023 -09 -11 17 :02 :17
Allow (立马返回) Allow
等于 AllowN(time.Now(),1)
, 当前取一个令牌,若满足,则为 true,否则 false。
AllowN
方法 指的是,截止到某一时刻,目前桶中令牌数目是否至少为 N 个,满足则返回 true,同时从桶中消费 N 个令牌。 反之返回不消费令牌,返回 false
如果你需要在事件超出频率的时候丢弃或跳过事件,就使用AllowN,否则使用 Reserve 或Wait。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 package mainimport ( "fmt" "time" "golang.org/x/time/rate" ) func main () { l := rate.NewLimiter(1 , 3 ) for { if l.AllowN(time.Now(), 1 ) { fmt.Println("ok" , time.Now().Format("2006-01-02 15:04:05" )) } else { time.Sleep(time.Second / 3 ) fmt.Println("false" , time.Now().Format("2006-01-02 15:04:05" )) } } } ok 2023 -09 -11 17 :07 :53 ok 2023 -09 -11 17 :07 :53 ok 2023 -09 -11 17 :07 :53 false 2023 -09 -11 17 :07 :53 false 2023 -09 -11 17 :07 :54 false 2023 -09 -11 17 :07 :54 ok 2023 -09 -11 17 :07 :54 false 2023 -09 -11 17 :07 :54 false 2023 -09 -11 17 :07 :55 false 2023 -09 -11 17 :07 :55 ok 2023 -09 -11 17 :07 :55 false 2023 -09 -11 17 :07 :55 false 2023 -09 -11 17 :07 :56 false 2023 -09 -11 17 :07 :56 ok 2023 -09 -11 17 :07 :56 false 2023 -09 -11 17 :07 :56 false 2023 -09 -11 17 :07 :57 false 2023 -09 -11 17 :07 :57
Reserve (自己控制) Reserve
, 等于 `ReserveN(time.Now(), 1)
ReserveN
当调用完成后,无论令牌是否充足,都会返回一个 Reservation * 对象
我们可以调用该对象的 Delay() 方法,该方法返回了需要等待的时间。如果等待时间为 0 秒,则说明不用等待,若大于 0 秒,则必须等到等待时间之后,才能向后进行。 当然,若不想等待,你可以归还令牌,一个都不能少,调用该对象的 Cancel() 方法即可。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 package mainimport ( "fmt" "time" "golang.org/x/time/rate" ) func main () { l := rate.NewLimiter(1 , 3 ) for { r := l.ReserveN(time.Now(), 1 ) s := r.Delay() time.Sleep(s) fmt.Println(s, time.Now().Format("04:05.000" )) } } 0s 2023 -09 -11 17 :10 :47 0s 2023 -09 -11 17 :10 :47 0s 2023 -09 -11 17 :10 :47 999.811042 ms 2023 -09 -11 17 :10 :48 999.675375 ms 2023 -09 -11 17 :10 :49 998.794834 ms 2023 -09 -11 17 :10 :50 998.750334 ms 2023 -09 -11 17 :10 :51 998.832875 ms 2023 -09 -11 17 :10 :52
3. golang限流实现 3.1 实现方案 uber 开源库中基于漏桶算法实现了一个限流器。漏桶算法可以限制流量的请求速度,并起到削峰填谷的作用。 https://github.com/uber-go/ratelimit
滴滴开源实现了一个对http请求的限流器中间件。可以基于以下模式限流。基于IP,路径,方法,header,授权用户等限流 通过自定义方法限流 还支持基于 http header 设置限流数据 实现方式是基于 github/go/time
实现的,不同类别的数据都存储在一个带超时时间的数据池中。 代码地址 https://github.com/didip/tollbooth
golang 网络包中还有基于信号量实现的限流器。 https://github.com/golang/net/blob/master/netutil/listen.go
也值得我们去学习下。 3.2 http 令牌桶限流 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 func main () { limiter := rate.NewLimiter(rate.Every(100 *time.Millisecond), 3 ) http.HandleFunc("/ping" , func (w http.ResponseWriter, r *http.Request) { if limiter.Allow() { fmt.Println(time.Now().Format("2006-01-02 15:04:05" ), "say hello" ) } else { fmt.Println(time.Now().Format("2006-01-02 15:04:05" ), "limit" ) } }) go func () { for { time.Sleep(time.Second) Req() } }() _ = http.ListenAndServe(":13100" , nil ) } func Req () { for i := 0 ; i < 10 ; i++ { _, _ = resty.New().R().Get("http://localhost:13100/ping" ) } }
如果1秒放10个,最多存10个。那么每一秒可以打印10个sayhello。 如果1秒放1个,最多存10个。第一次10个sayhello,后面每一秒一个sayhello。 3.3 Gin 中间件限流 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 package mainimport ( "time" "github.com/didip/tollbooth" "github.com/didip/tollbooth/limiter" "github.com/gin-gonic/gin" ) func main () { r := gin.New() lmt := tollbooth.NewLimiter(1 , &limiter.ExpirableOptions{DefaultExpirationTTL: time.Second * 5 }) lmt.SetIPLookups([]string {"RemoteAddr" , "X-Forwarded-For" , "X-Real-IP" }) lmt.SetMethods([]string {"POST" , "GET" }) r.Use(LimitHandler(lmt)) r.GET("/" , func (c *gin.Context) { c.String(200 , "Get Hello, world!" ) }) r.POST("/" , func (c *gin.Context) { c.String(200 , "Post Hello, world!" ) }) r.Run(":12345" ) } func LimitHandler (lmt *limiter.Limiter) gin .HandlerFunc { return func (c *gin.Context) { httpError := tollbooth.LimitByRequest(lmt, c.Writer, c.Request) if httpError != nil { c.Data(httpError.StatusCode, lmt.GetMessageContentType(), []byte (httpError.Message)) c.Abort() } else { c.Next() } } }
根据 IP 和 ReqPath 等配置限流 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 lmt := tollbooth.NewLimiter(10 , &limiter.ExpirableOptions{ DefaultExpirationTTL: time.Hour * 24 , }) func wrapGinLimitHandler (lmt *limiter.Limiter) gin .HandlerFunc { limitOptions := []string {"ip" ,"token" ,"device" ,"version" ,"platform" ,"lang" } allowPathList := configure.Global.HttpConfig.AllowPathList denyPathList := configure.Global.HttpConfig.DenyPathList return func (c *gin.Context) { path := c.Request.URL.Path for _, v := range allowPathList { if strings.Contains(path, v) { c.Next() return } } for _, v := range denyPathList { if path == v { httpError := &errors.HTTPError{Message: lmt.GetMessage(), StatusCode: lmt.GetStatusCode()} c.Data(httpError.StatusCode, lmt.GetMessageContentType(), []byte (httpError.Message)) c.Abort() return } } remoteIP := c.Request.Header.Get("X-Real-IP" ) if remoteIP == "" { remoteIP = c.ClientIP() } if remoteIP == "127.0.0.1" || remoteIP == "localhost" { c.Next() return } var keys []string for _, v := range limitOptions { if strings.Contains(v, "ip" ) { keys = append (keys, remoteIP) } else if strings.Contains(v, "token" ) { token := c.GetHeader("Token" ) if token == "" { token = c.GetHeader("Access-Token" ) } keys = append (keys, token) } else if strings.Contains(v, "device" ) { keys = append (keys, c.GetHeader("Device-Id" )) } else if strings.Contains(v, "version" ) { keys = append (keys, c.GetHeader("Appversion" )) } else if strings.Contains(v, "platform" ) { keys = append (keys, c.GetHeader("Platform" )) } else if strings.Contains(v, "lang" ) { keys = append (keys, c.GetHeader("Lang" )) } } if len (keys) == 0 { keys = append (keys, remoteIP) } keys = append (keys, path) httpError := tollbooth.LimitByKeys(lmt, keys) if httpError != nil { strKeys := path lqlog.WarnCtx(c.Request.Context(), "[wrapGinLimitHandler] keys: (%v)" , strKeys) c.Data(httpError.StatusCode, lmt.GetMessageContentType(), []byte (httpError.Message)) if _, ok := LimitKeysMap.Load(strKeys); ok { } else { LimitKeysMap.Store(strKeys, "1" ) text := fmt.Sprintf(`"%s %s" trigger http limit, keys:%+v` , c.Request.Method, path, keys) text += fmt.Sprintf("\nAppversion: %v" , c.GetHeader("Appversion" )) text += fmt.Sprintf("\nPlatform: %v" , c.GetHeader("Platform" )) FeishuAlarmText(c, text) } c.Abort() } else { c.Next() } } }
根据 IP Redis全局限流 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 request.POST("/create_info" , middleware.LimitIPRate(time.Hour*6 , 3 ), handler.AddUserInfo) func LimitIPRate (rate time.Duration, limit uint ) gin .HandlerFunc { if !configure.Global.HttpConfig.IPRateLimit { return func (c *gin.Context) { c.Next() } } skipFunc := func (c *gin.Context) bool { remoteIP := GetGinClientIP(c) if remoteIP == "127.0.0.1" || remoteIP == "localhost" { return true } return false } store := ratelimit.RedisStore(&ratelimit.RedisOptions{RedisClient: dao.LimitRedisClient, Rate: rate, Limit: limit, Skip: skipFunc}) mw := ratelimit.RateLimiter(store, &ratelimit.Options{ ErrorHandler: func (c *gin.Context, info ratelimit.Info) { c.String(429 , "Too many requests. Try again in " +time.Until(info.ResetTime).String()) key := "LimitIPRate" + c.ClientIP() + c.FullPath() if _, ok := LimitKeysMap.Load(key); !ok { LimitKeysMap.Store(key, "1" ) text := fmt.Sprintf(`"%s %s" trigger ipRate limit` , c.Request.Method, key) text += fmt.Sprintf("\nAppversion: %v" , c.GetHeader("Appversion" )) text += fmt.Sprintf("\nPlatform: %v" , c.GetHeader("Platform" )) lqlog.WarnCtx(c, text) } }, KeyFunc: func (c *gin.Context) string { return "LimitIPRate:" + GetGinClientIP(c) }, }) return mw } func GetGinClientIP (c *gin.Context) string { remoteIP := c.Request.Header.Get("X-Real-IP" ) if remoteIP == "" { remoteIP = c.ClientIP() } return remoteIP }
3.4 vegeta测试 有一个非常棒的工具称作 vegeta,我喜欢在 HTTP 负载测试中使用(它也是用 Go 编写的)。
我们需要创建一个简单的配置文件,声明我们想要发送的请求。
1 GET http://localhost:12345/
然后,以每个时间单元 100 个请求的速率攻击 10 秒。
1 2 3 4 5 6 7 vegeta attack -duration=10s -rate=100 -targets=vegeta.conf | vegeta report echo "http://localhost:12345" | vegeta attack -rate=500 -connections=100 -duration=10s | tee results.bin | vegeta report// vegeta attack -duration=10s -rate=100 -targets=vegeta.conf | vegeta report 1s只成功了一个 // Status Codes [code:count] 200:10 429:990
结果,你将看到一些请求返回 200,但大多数返回 429。
3.5 wrk测试 先在本地安装wrk。
1 2 3 4 git clone https://github.com/wg/wrk cd wrkmake ln -s $PWD /wrk /usr/local /bin/
我的mac是6核,线程数不要太多,是核数的 2 到 4 倍即可。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 wrk -t6 -c10 -d10s --latency http://127.0.0.1:9061/health Running 10s test @ http://127.0.0.1:9061/health 6 threads and 10 connections Thread Stats Avg Stdev Max +/- Stdev Latency 1.28ms 14.43ms 399.37ms 99.59% Req/Sec 2.06k 262.21 2.72k 84.25% Latency Distribution 50% 448.00us 75% 620.00us 90% 797.00us 99% 1.38ms 123958 requests in 10.10s, 20.21MB read Non-2xx or 3xx responses: 123847 Requests/sec: 12270.53 Transfer/sec: 2.00MB
4. 参考资料