上篇文章提到固定時(shí)間窗口限流無(wú)法處理突然請(qǐng)求洪峰情況,本文講述的令牌桶線路算法則可以比較好的處理此場(chǎng)景。
工作原理
單位時(shí)間按照一定速率勻速的生產(chǎn) token 放入桶內(nèi),直到達(dá)到桶容量上限。
處理請(qǐng)求,每次嘗試獲取一個(gè)或多個(gè)令牌,如果拿到則處理請(qǐng)求,失敗則拒絕請(qǐng)求。
優(yōu)缺點(diǎn)
優(yōu)點(diǎn)
可以有效處理瞬間的突發(fā)流量,桶內(nèi)存量 token 即可作為流量緩沖區(qū)平滑處理突發(fā)流量。
缺點(diǎn)
實(shí)現(xiàn)較為復(fù)雜。
代碼實(shí)現(xiàn)
- core/limit/tokenlimit.go
分布式環(huán)境下考慮使用 redis 作為桶和令牌的存儲(chǔ)容器,采用 lua 腳本實(shí)現(xiàn)整個(gè)算法流程。
redis lua 腳本
- -- 每秒生成token數(shù)量即token生成速度
- local rate = tonumber(ARGV[1])
- -- 桶容量
- local capacity = tonumber(ARGV[2])
- -- 當(dāng)前時(shí)間戳
- local now = tonumber(ARGV[3])
- -- 當(dāng)前請(qǐng)求token數(shù)量
- local requested = tonumber(ARGV[4])
- -- 需要多少秒才能填滿桶
- local fill_time = capacity/rate
- -- 向下取整,ttl為填滿時(shí)間的2倍
- local ttl = math.floor(fill_time*2)
- -- 當(dāng)前時(shí)間桶容量
- local last_tokens = tonumber(redis.call("get", KEYS[1]))
- -- 如果當(dāng)前桶容量為0,說明是第一次進(jìn)入,則默認(rèn)容量為桶的最大容量
- if last_tokens == nil then
- last_tokens = capacity
- end
- -- 上一次刷新的時(shí)間
- local last_refreshed = tonumber(redis.call("get", KEYS[2]))
- -- 第一次進(jìn)入則設(shè)置刷新時(shí)間為0
- if last_refreshed == nil then
- last_refreshed = 0
- end
- -- 距離上次請(qǐng)求的時(shí)間跨度
- local delta = math.max(0, now-last_refreshed)
- -- 距離上次請(qǐng)求的時(shí)間跨度,總共能生產(chǎn)token的數(shù)量,如果超多最大容量則丟棄多余的token
- local filled_tokens = math.min(capacity, last_tokens+(delta*rate))
- -- 本次請(qǐng)求token數(shù)量是否足夠
- local allowed = filled_tokens >= requested
- -- 桶剩余數(shù)量
- local new_tokens = filled_tokens
- -- 允許本次token申請(qǐng),計(jì)算剩余數(shù)量
- if allowed then
- new_tokens = filled_tokens - requested
- end
- -- 設(shè)置剩余token數(shù)量
- redis.call("setex", KEYS[1], ttl, new_tokens)
- -- 設(shè)置刷新時(shí)間
- redis.call("setex", KEYS[2], ttl, now)
- return allowed
令牌桶限流器定義
- type TokenLimiter struct {
- // 每秒生產(chǎn)速率
- rate int
- // 桶容量
- burst int
- // 存儲(chǔ)容器
- store *redis.Redis
- // redis key
- tokenKey string
- // 桶刷新時(shí)間key
- timestampKey string
- // lock
- rescueLock sync.Mutex
- // redis健康標(biāo)識(shí)
- redisAlive uint32
- // redis故障時(shí)采用進(jìn)程內(nèi) 令牌桶限流器
- rescueLimiter *xrate.Limiter
- // redis監(jiān)控探測(cè)任務(wù)標(biāo)識(shí)
- monitorStarted bool
- }
- func NewTokenLimiter(rate, burst int, store *redis.Redis, key string) *TokenLimiter {
- tokenKey := fmt.Sprintf(tokenFormat, key)
- timestampKey := fmt.Sprintf(timestampFormat, key)
- return &TokenLimiter{
- rate: rate,
- burst: burst,
- store: store,
- tokenKey: tokenKey,
- timestampKey: timestampKey,
- redisAlive: 1,
- rescueLimiter: xrate.NewLimiter(xrate.Every(time.Second/time.Duration(rate)), burst),
- }
- }
獲取令牌
- func (lim *TokenLimiter) reserveN(now time.Time, n int) bool {
- // 判斷redis是否健康
- // redis故障時(shí)采用進(jìn)程內(nèi)限流器
- // 兜底保障
- if atomic.LoadUint32(&lim.redisAlive) == 0 {
- return lim.rescueLimiter.AllowN(now, n)
- }
- // 執(zhí)行腳本獲取令牌
- resp, err := lim.store.Eval(
- script,
- []string{
- lim.tokenKey,
- lim.timestampKey,
- },
- []string{
- strconv.Itoa(lim.rate),
- strconv.Itoa(lim.burst),
- strconv.FormatInt(now.Unix(), 10),
- strconv.Itoa(n),
- })
- // redis allowed == false
- // Lua boolean false -> r Nil bulk reply
- // 特殊處理key不存在的情況
- if err == redis.Nil {
- return false
- } else if err != nil {
- logx.Errorf("fail to use rate limiter: %s, use in-process limiter for rescue", err)
- // 執(zhí)行異常,開啟redis健康探測(cè)任務(wù)
- // 同時(shí)采用進(jìn)程內(nèi)限流器作為兜底
- lim.startMonitor()
- return lim.rescueLimiter.AllowN(now, n)
- }
- code, ok := resp.(int64)
- if !ok {
- logx.Errorf("fail to eval redis script: %v, use in-process limiter for rescue", resp)
- lim.startMonitor()
- return lim.rescueLimiter.AllowN(now, n)
- }
- // redis allowed == true
- // Lua boolean true -> r integer reply with value of 1
- return code == 1
- }
redis 故障時(shí)兜底策略
兜底策略的設(shè)計(jì)考慮得非常細(xì)節(jié),當(dāng) redis 不可用的時(shí)候,啟動(dòng)單機(jī)版的 ratelimit 做備用限流,確保基本的限流可用,服務(wù)不會(huì)被沖垮。
- // 開啟redis健康探測(cè)
- func (lim *TokenLimiter) startMonitor() {
- lim.rescueLock.Lock()
- defer lim.rescueLock.Unlock()
- // 防止重復(fù)開啟
- if lim.monitorStarted {
- return
- }
- // 設(shè)置任務(wù)和健康標(biāo)識(shí)
- lim.monitorStarted = true
- atomic.StoreUint32(&lim.redisAlive, 0)
- // 健康探測(cè)
- go lim.waitForRedis()
- }
- // redis健康探測(cè)定時(shí)任務(wù)
- func (lim *TokenLimiter) waitForRedis() {
- ticker := time.NewTicker(pingInterval)
- // 健康探測(cè)成功時(shí)回調(diào)此函數(shù)
- defer func() {
- ticker.Stop()
- lim.rescueLock.Lock()
- lim.monitorStarted = false
- lim.rescueLock.Unlock()
- }()
- for range ticker.C {
- // ping屬于redis內(nèi)置健康探測(cè)命令
- if lim.store.Ping() {
- // 健康探測(cè)成功,設(shè)置健康標(biāo)識(shí)
- atomic.StoreUint32(&lim.redisAlive, 1)
- return
- }
- }
- }
項(xiàng)目地址
https://github.com/zeromicro/go-zero
歡迎使用 go-zero 并 star 支持我們!
原文鏈接:https://mp.weixin.qq.com/s/ulGRw4qkWbGKdF83VaIb7A