作為應對高并發的手段之一,限流并不是一個新鮮的話題了。從Guava的Ratelimiter到Hystrix,以及Sentinel都可作為限流的工具。
自適應限流
一般的限流常常需要指定一個固定值(qps)作為限流開關的閾值,這個值一是靠經驗判斷,二是靠通過大量的測試數據得出。但這個閾值,在流量激增、系統自動伸縮或者某某commit了一段有毒代碼后就有可能變得不那么合適了。并且一般業務方也不太能夠正確評估自己的容量,去設置一個合適的限流閾值。
而此時自適應限流就是解決這樣的問題的,限流閾值不需要手動指定,也不需要去預估系統的容量,并且閾值能夠隨著系統相關指標變化而變化。
自適應限流算法借鑒了TCP擁塞算法,根據各種指標預估限流的閾值,并且不斷調整。大致獲得的效果如下:
從圖上可以看到,首先以一個降低的初始并發值發送請求,同時通過增大限流窗口來探測系統更高的并發性。而一旦延遲增加到一定程度了,又會退回到較小的限流窗口。循環往復持續探測并發極限,從而產生類似鋸齒狀的時間關系函數。
TCP Vegas
vegas是一種主動調整cwnd的擁塞控制算法,主要是設置兩個閾值alpha 和 beta,然后通過計算目標速率和實際速率的差diff,再比較差diff與alpha和beta的關系,對cwnd進行調節。偽代碼如下:
- diff = cwnd*(1-baseRTT/RTT)
- if (diff < alpha)
- set: cwndcwnd = cwnd + 1
- else if (diff >= beta)
- set: cwndcwnd = cwnd - 1
- else
- set: cwndcwnd = cwnd
其中baseRTT指的是測量的最小往返時間,RTT指的是當前測量的往返時間,cwnd指的是當前的TCP窗口大小。通常在tcp中alpha會被設置成2-3,beta會被設置成4-6。這樣子,cwnd就保持在了一個平衡的狀態。
netflix-concuurency-limits
concuurency-limits是netflix推出的自適應限流組件,借鑒了TCP相關擁塞控制算法,主要是根據請求延時,及其直接影響到的排隊長度來進行限流窗口的動態調整。
alpha , beta & threshold
vegas算法實現在了VegasLimit類中。先看一下初始化相關代碼:
- private int initialLimit = 20;
- private int maxConcurrency = 1000;
- private MetricRegistry registry = EmptyMetricRegistry.INSTANCE;
- private double smoothing = 1.0;
- private Function<Integer, Integer> alphaFunc = (limit) -> 3 * LOG10.apply(limit.intValue());
- private Function<Integer, Integer> betaFunc = (limit) -> 6 * LOG10.apply(limit.intValue());
- private Function<Integer, Integer> thresholdFunc = (limit) -> LOG10.apply(limit.intValue());
- private Function<Double, Double> increaseFunc = (limit) -> limit + LOG10.apply(limit.intValue());
- private Function<Double, Double> decreaseFunc = (limit) -> limit - LOG10.apply(limit.intValue());
這里首先定義了一個初始化值initialLimit為20,以及極大值maxConcurrency1000。其次是三個閾值函數alphaFunc,betaFunc以及thresholdFunc。最后是兩個增減函數increaseFunc和decreaseFunc。
函數都是基于當前的并發值limit做運算的。
alphaFunc可類比vegas算法中的alpha,此處的實現是3*log limit。limit值從初始20增加到極大1000時候,相應的alpha從3.9增加到了9。
betaFunc則可類比為vegas算法中的beta,此處的實現是6*log limit。limit值從初始20增加到極大1000時候,相應的alpha從7.8增加到了18。
thresholdFunc算是新增的一個函數,表示一個較為初始的閾值,小于這個值的時候limit會采取激進一些的增量算法。這里的實現是1倍的log limit。mit值從初始20增加到極大1000時候,相應的alpha從1.3增加到了3。
這三個函數值可以認為確定了動態調整函數的四個區間范圍。當變量queueSize = limit × (1 − RTTnoLoad/RTTactual)落到這四個區間的時候應用不同的調整函數。
變量queueSize
其中變量為queueSize,計算方法即為limit × (1 − RTTnoLoad/RTTactual),為什么這么計算其實稍加領悟一下即可。
我們把系統處理請求的過程想象為一個水管,到來的請求是往這個水管灌水,當系統處理順暢的時候,請求不需要排隊,直接從水管中穿過,這個請求的RT是最短的,即RTTnoLoad;
反之,當請求堆積的時候,那么處理請求的時間則會變為:排隊時間+最短處理時間,即RTTactual = inQueueTime + RTTnoLoad。而顯然排隊的隊列長度為
總排隊時間/每個請求的處理時間及queueSize = (limit * inQueueTime) / (inQueueTime + RTTnoLoad) = limit × (1 − RTTnoLoad/RTTactual)。
再舉個栗子,因為假設當前延時即為最佳延時,那么自然是不用排隊的,即queueSize=0。而假設當前延時為最佳延時的一倍的時候,可以認為處理能力折半,100個流量進來會有一半即50個請求在排隊,及queueSize= 100 * (1 − 1/2)=50。
動態調整函數
調整函數中最重要的即增函數與減函數。從初始化的代碼中得知,增函數increaseFunc實現為limit+log limit,減函數decreaseFunc實現為limit-log limit,相對來說增減都是比較保守的。
看一下應用動態調整函數的相關代碼:
- private int updateEstimatedLimit(long rtt, int inflight, boolean didDrop) {
- final int queueSize = (int) Math.ceil(estimatedLimit * (1 - (double)rtt_noload / rtt));
- double newLimit;
- // Treat any drop (i.e timeout) as needing to reduce the limit
- // 發現錯誤直接應用減函數decreaseFunc
- if (didDrop) {
- newLimit = decreaseFunc.apply(estimatedLimit);
- // Prevent upward drift if not close to the limit
- } else if (inflight * 2 < estimatedLimit) {
- return (int)estimatedLimit;
- } else {
- int alpha = alphaFunc.apply((int)estimatedLimit);
- int beta = betaFunc.apply((int)estimatedLimit);
- int threshold = this.thresholdFunc.apply((int)estimatedLimit);
- // Aggressive increase when no queuing
- if (queueSize <= threshold) {
- newLimit = estimatedLimit + beta;
- // Increase the limit if queue is still manageable
- } else if (queueSize < alpha) {
- newLimit = increaseFunc.apply(estimatedLimit);
- // Detecting latency so decrease
- } else if (queueSize > beta) {
- newLimit = decreaseFunc.apply(estimatedLimit);
- // We're within he sweet spot so nothing to do
- } else {
- return (int)estimatedLimit;
- }
- }
- newLimit = Math.max(1, Math.min(maxLimit, newLimit));
- newLimit = (1 - smoothing) * estimatedLimit + smoothing * newLimit;
- if ((int)newLimit != (int)estimatedLimit && LOG.isDebugEnabled()) {
- LOG.debug("New limit={} minRtt={} ms winRtt={} ms queueSize={}",
- (int)newLimit,
- TimeUnit.NANOSECONDS.toMicros(rtt_noload) / 1000.0,
- TimeUnit.NANOSECONDS.toMicros(rtt) / 1000.0,
- queueSize);
- }
- estimatedLimit = newLimit;
- return (int)estimatedLimit;
- }
動態調整函數規則如下:
當變量queueSize < threshold時,選取較激進的增量函數,newLimit = limit+beta
當變量queueSize < alpha時,需要增大限流窗口,選擇增函數increaseFunc,即newLimit = limit + log limit
當變量queueSize處于alpha,beta之間時候,limit不變
當變量queueSize大于beta時候,需要收攏限流窗口,選擇減函數decreaseFunc,即newLimit = limit - log limit
平滑遞減 smoothingDecrease
注意到可以設置變量smoothing,這里初始值為1,表示平滑遞減不起作用。如果有需要的話可以按需設置,比如設置smoothing為0.5時候,那么效果就是采用減函數decreaseFunc時候效果減半,實現方式為newLimitAfterSmoothing = 0.5 newLimit + 0.5 limit。
原文地址:https://mp.weixin.qq.com/s/y4jU6DuDFskOVqotnLouJA