- Add a new kind of control behavior `warm up + rate limiter`, behaving as both warm up and pace controlmaster
@@ -46,6 +46,7 @@ public final class RuleConstant { | |||||
public static final int CONTROL_BEHAVIOR_DEFAULT = 0; | public static final int CONTROL_BEHAVIOR_DEFAULT = 0; | ||||
public static final int CONTROL_BEHAVIOR_WARM_UP = 1; | public static final int CONTROL_BEHAVIOR_WARM_UP = 1; | ||||
public static final int CONTROL_BEHAVIOR_RATE_LIMITER = 2; | public static final int CONTROL_BEHAVIOR_RATE_LIMITER = 2; | ||||
public static final int CONTROL_BEHAVIOR_WARM_UP_RATE_LIMITER = 3; | |||||
public static final String LIMIT_APP_DEFAULT = "default"; | public static final String LIMIT_APP_DEFAULT = "default"; | ||||
public static final String LIMIT_APP_OTHER = "other"; | public static final String LIMIT_APP_OTHER = "other"; | ||||
@@ -40,7 +40,7 @@ import com.alibaba.csp.sentinel.slots.clusterbuilder.ClusterBuilderSlot; | |||||
*/ | */ | ||||
public class FlowRule extends AbstractRule { | public class FlowRule extends AbstractRule { | ||||
public FlowRule(){ | |||||
public FlowRule() { | |||||
super(); | super(); | ||||
setLimitApp(RuleConstant.LIMIT_APP_DEFAULT); | setLimitApp(RuleConstant.LIMIT_APP_DEFAULT); | ||||
} | } | ||||
@@ -71,7 +71,7 @@ public class FlowRule extends AbstractRule { | |||||
/** | /** | ||||
* Rate limiter control behavior. | * Rate limiter control behavior. | ||||
* 0. default, 1. warm up, 2. rate limiter | |||||
* 0. default(reject directly), 1. warm up, 2. rate limiter, 3. warm up + rate limiter | |||||
*/ | */ | ||||
private int controlBehavior = RuleConstant.CONTROL_BEHAVIOR_DEFAULT; | private int controlBehavior = RuleConstant.CONTROL_BEHAVIOR_DEFAULT; | ||||
@@ -213,7 +213,8 @@ public class FlowRule extends AbstractRule { | |||||
return node; | return node; | ||||
} | } | ||||
} else if (RuleConstant.LIMIT_APP_OTHER.equals(limitApp) && FlowRuleManager.isOtherOrigin(origin, getResource())) { | |||||
} else if (RuleConstant.LIMIT_APP_OTHER.equals(limitApp) | |||||
&& FlowRuleManager.isOtherOrigin(origin, getResource())) { | |||||
if (strategy == RuleConstant.STRATEGY_DIRECT) { | if (strategy == RuleConstant.STRATEGY_DIRECT) { | ||||
return context.getOriginNode(); | return context.getOriginNode(); | ||||
} | } | ||||
@@ -36,8 +36,9 @@ import com.alibaba.csp.sentinel.slotchain.ResourceWrapper; | |||||
import com.alibaba.csp.sentinel.slots.block.BlockException; | import com.alibaba.csp.sentinel.slots.block.BlockException; | ||||
import com.alibaba.csp.sentinel.slots.block.RuleConstant; | import com.alibaba.csp.sentinel.slots.block.RuleConstant; | ||||
import com.alibaba.csp.sentinel.slots.block.flow.controller.DefaultController; | import com.alibaba.csp.sentinel.slots.block.flow.controller.DefaultController; | ||||
import com.alibaba.csp.sentinel.slots.block.flow.controller.PaceController; | |||||
import com.alibaba.csp.sentinel.slots.block.flow.controller.RateLimiterController; | |||||
import com.alibaba.csp.sentinel.slots.block.flow.controller.WarmUpController; | import com.alibaba.csp.sentinel.slots.block.flow.controller.WarmUpController; | ||||
import com.alibaba.csp.sentinel.slots.block.flow.controller.WarmUpRateLimiterController; | |||||
/** | /** | ||||
* <p> | * <p> | ||||
@@ -126,8 +127,14 @@ public class FlowRuleManager { | |||||
} else if (rule.getGrade() == RuleConstant.FLOW_GRADE_QPS | } else if (rule.getGrade() == RuleConstant.FLOW_GRADE_QPS | ||||
&& rule.getControlBehavior() == RuleConstant.CONTROL_BEHAVIOR_RATE_LIMITER | && rule.getControlBehavior() == RuleConstant.CONTROL_BEHAVIOR_RATE_LIMITER | ||||
&& rule.getMaxQueueingTimeMs() > 0) { | && rule.getMaxQueueingTimeMs() > 0) { | ||||
rater = new PaceController(rule.getMaxQueueingTimeMs(), rule.getCount()); | |||||
rater = new RateLimiterController(rule.getMaxQueueingTimeMs(), rule.getCount()); | |||||
} else if (rule.getGrade() == RuleConstant.FLOW_GRADE_QPS | |||||
&& rule.getControlBehavior() == RuleConstant.CONTROL_BEHAVIOR_WARM_UP_RATE_LIMITER | |||||
&& rule.getMaxQueueingTimeMs() > 0 && rule.getWarmUpPeriodSec() > 0) { | |||||
rater = new WarmUpRateLimiterController(rule.getCount(), rule.getWarmUpPeriodSec(), | |||||
rule.getMaxQueueingTimeMs(), ColdFactorProperty.coldFactor); | |||||
} | } | ||||
rule.setRater(rater); | rule.setRater(rater); | ||||
String identity = rule.getResource(); | String identity = rule.getResource(); | ||||
@@ -25,13 +25,13 @@ import com.alibaba.csp.sentinel.node.Node; | |||||
/** | /** | ||||
* @author jialiang.linjl | * @author jialiang.linjl | ||||
*/ | */ | ||||
public class PaceController implements Controller { | |||||
public class RateLimiterController implements Controller { | |||||
private final int maxQueueingTimeMs; | private final int maxQueueingTimeMs; | ||||
private final double count; | private final double count; | ||||
private final AtomicLong latestPassedTime = new AtomicLong(-1); | private final AtomicLong latestPassedTime = new AtomicLong(-1); | ||||
public PaceController(int timeOut, double count) { | |||||
public RateLimiterController(int timeOut, double count) { | |||||
this.maxQueueingTimeMs = timeOut; | this.maxQueueingTimeMs = timeOut; | ||||
this.count = count; | this.count = count; | ||||
} | } |
@@ -63,21 +63,21 @@ import com.alibaba.csp.sentinel.slots.block.flow.Controller; | |||||
*/ | */ | ||||
public class WarmUpController implements Controller { | public class WarmUpController implements Controller { | ||||
private double count; | |||||
protected double count; | |||||
private int coldFactor; | private int coldFactor; | ||||
private int warningToken = 0; | |||||
protected int warningToken = 0; | |||||
private int maxToken; | private int maxToken; | ||||
private double slope; | |||||
protected double slope; | |||||
private AtomicLong storedTokens = new AtomicLong(0); | |||||
private AtomicLong lastFilledTime = new AtomicLong(0); | |||||
protected AtomicLong storedTokens = new AtomicLong(0); | |||||
protected AtomicLong lastFilledTime = new AtomicLong(0); | |||||
public WarmUpController(double count, int warmupPeriodInSec, int coldFactor) { | |||||
construct(count, warmupPeriodInSec, coldFactor); | |||||
public WarmUpController(double count, int warmUpPeriodInSec, int coldFactor) { | |||||
construct(count, warmUpPeriodInSec, coldFactor); | |||||
} | } | ||||
public WarmUpController(double count, int warmUpPeriodInMic) { | |||||
construct(count, warmUpPeriodInMic, 3); | |||||
public WarmUpController(double count, int warmUpPeriodInSec) { | |||||
construct(count, warmUpPeriodInSec, 3); | |||||
} | } | ||||
private void construct(double count, int warmUpPeriodInSec, int coldFactor) { | private void construct(double count, int warmUpPeriodInSec, int coldFactor) { | ||||
@@ -132,7 +132,7 @@ public class WarmUpController implements Controller { | |||||
return false; | return false; | ||||
} | } | ||||
private void syncToken(long passQps) { | |||||
protected void syncToken(long passQps) { | |||||
long currentTime = TimeUtil.currentTimeMillis(); | long currentTime = TimeUtil.currentTimeMillis(); | ||||
currentTime = currentTime - currentTime % 1000; | currentTime = currentTime - currentTime % 1000; | ||||
long oldLastFillTime = lastFilledTime.get(); | long oldLastFillTime = lastFilledTime.get(); | ||||
@@ -0,0 +1,85 @@ | |||||
/* | |||||
* Copyright 1999-2018 Alibaba Group Holding Ltd. | |||||
* | |||||
* Licensed under the Apache License, Version 2.0 (the "License"); | |||||
* you may not use this file except in compliance with the License. | |||||
* You may obtain a copy of the License at | |||||
* | |||||
* http://www.apache.org/licenses/LICENSE-2.0 | |||||
* | |||||
* Unless required by applicable law or agreed to in writing, software | |||||
* distributed under the License is distributed on an "AS IS" BASIS, | |||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |||||
* See the License for the specific language governing permissions and | |||||
* limitations under the License. | |||||
*/ | |||||
package com.alibaba.csp.sentinel.slots.block.flow.controller; | |||||
import java.util.concurrent.atomic.AtomicLong; | |||||
import com.alibaba.csp.sentinel.node.Node; | |||||
import com.alibaba.csp.sentinel.util.TimeUtil; | |||||
/** | |||||
* @author jialiang.linjl | |||||
*/ | |||||
public class WarmUpRateLimiterController extends WarmUpController { | |||||
final int timeOutInMs; | |||||
final AtomicLong latestPassedTime = new AtomicLong(-1); | |||||
/** | |||||
* @param count | |||||
* @param warmUpPeriodSec | |||||
*/ | |||||
public WarmUpRateLimiterController(double count, int warmUpPeriodSec, int timeOutMs, int coldFactor) { | |||||
super(count, warmUpPeriodSec, coldFactor); | |||||
this.timeOutInMs = timeOutMs; | |||||
} | |||||
@Override | |||||
public boolean canPass(Node node, int acquireCount) { | |||||
long previousQps = node.previousPassQps(); | |||||
syncToken(previousQps); | |||||
long currentTime = TimeUtil.currentTimeMillis(); | |||||
long restToken = storedTokens.get(); | |||||
long costTime = 0; | |||||
long expectedTime = 0; | |||||
if (restToken >= warningToken) { | |||||
long aboveToken = restToken - warningToken; | |||||
// current interval = restToken*slope+1/count | |||||
double warmingQps = Math.nextUp(1.0 / (aboveToken * slope + 1.0 / count)); | |||||
costTime = Math.round(1.0 * (acquireCount) / warmingQps * 1000); | |||||
} else { | |||||
costTime = Math.round(1.0 * (acquireCount) / count * 1000); | |||||
} | |||||
expectedTime = costTime + latestPassedTime.get(); | |||||
if (expectedTime <= currentTime) { | |||||
latestPassedTime.set(currentTime); | |||||
return true; | |||||
} else { | |||||
long waitTime = costTime + latestPassedTime.get() - currentTime; | |||||
if (waitTime >= timeOutInMs) { | |||||
return false; | |||||
} else { | |||||
long oldTime = latestPassedTime.addAndGet(costTime); | |||||
try { | |||||
waitTime = oldTime - TimeUtil.currentTimeMillis(); | |||||
if (waitTime >= timeOutInMs) { | |||||
latestPassedTime.addAndGet(-costTime); | |||||
return false; | |||||
} | |||||
Thread.sleep(waitTime); | |||||
return true; | |||||
} catch (InterruptedException e) { | |||||
} | |||||
} | |||||
} | |||||
return false; | |||||
} | |||||
} | |||||
@@ -25,16 +25,16 @@ import org.junit.Test; | |||||
import com.alibaba.csp.sentinel.util.TimeUtil; | import com.alibaba.csp.sentinel.util.TimeUtil; | ||||
import com.alibaba.csp.sentinel.node.Node; | import com.alibaba.csp.sentinel.node.Node; | ||||
import com.alibaba.csp.sentinel.slots.block.flow.controller.PaceController; | |||||
import com.alibaba.csp.sentinel.slots.block.flow.controller.RateLimiterController; | |||||
/** | /** | ||||
* @author jialiang.linjl | * @author jialiang.linjl | ||||
*/ | */ | ||||
public class PaceControllerTest { | |||||
public class RateLimiterControllerTest { | |||||
@Test | @Test | ||||
public void testPaceController_normal() throws InterruptedException { | public void testPaceController_normal() throws InterruptedException { | ||||
PaceController paceController = new PaceController(500, 10d); | |||||
RateLimiterController paceController = new RateLimiterController(500, 10d); | |||||
Node node = mock(Node.class); | Node node = mock(Node.class); | ||||
long start = TimeUtil.currentTimeMillis(); | long start = TimeUtil.currentTimeMillis(); | ||||
@@ -47,7 +47,7 @@ public class PaceControllerTest { | |||||
@Test | @Test | ||||
public void testPaceController_timeout() throws InterruptedException { | public void testPaceController_timeout() throws InterruptedException { | ||||
final PaceController paceController = new PaceController(500, 10d); | |||||
final RateLimiterController paceController = new RateLimiterController(500, 10d); | |||||
final Node node = mock(Node.class); | final Node node = mock(Node.class); | ||||
final AtomicInteger passcount = new AtomicInteger(); | final AtomicInteger passcount = new AtomicInteger(); |
@@ -0,0 +1,49 @@ | |||||
package com.alibaba.csp.sentinel.slots.block.flow; | |||||
import com.alibaba.csp.sentinel.node.Node; | |||||
import com.alibaba.csp.sentinel.node.StatisticNode; | |||||
import com.alibaba.csp.sentinel.slots.block.flow.controller.WarmUpRateLimiterController; | |||||
import org.junit.Test; | |||||
import static org.junit.Assert.assertFalse; | |||||
import static org.junit.Assert.assertTrue; | |||||
import static org.mockito.Mockito.mock; | |||||
import static org.mockito.Mockito.when; | |||||
/** | |||||
* @author CarpenterLee | |||||
*/ | |||||
public class WarmUpRateLimiterControllerTest { | |||||
@Test | |||||
public void testPace() throws InterruptedException { | |||||
WarmUpRateLimiterController controller = new WarmUpRateLimiterController(10, 10, 1000, 3); | |||||
Node node = mock(Node.class); | |||||
when(node.passQps()).thenReturn(100L); | |||||
when(node.previousPassQps()).thenReturn(100L); | |||||
assertTrue(controller.canPass(node, 1)); | |||||
long start = System.currentTimeMillis(); | |||||
assertTrue(controller.canPass(node, 1)); | |||||
long cost = System.currentTimeMillis() - start; | |||||
assertTrue(cost >= 100 && cost <= 110); | |||||
} | |||||
@Test | |||||
public void testPaceCanNotPass() throws InterruptedException { | |||||
WarmUpRateLimiterController controller = new WarmUpRateLimiterController(10, 10, 10, 3); | |||||
Node node = mock(Node.class); | |||||
when(node.passQps()).thenReturn(100L); | |||||
when(node.previousPassQps()).thenReturn(100L); | |||||
assertTrue(controller.canPass(node, 1)); | |||||
assertFalse(controller.canPass(node, 1)); | |||||
} | |||||
} |
@@ -0,0 +1,214 @@ | |||||
package com.alibaba.csp.sentinel.demo.flow; | |||||
import java.util.ArrayList; | |||||
import java.util.List; | |||||
import java.util.Random; | |||||
import java.util.concurrent.TimeUnit; | |||||
import java.util.concurrent.atomic.AtomicInteger; | |||||
import com.alibaba.csp.sentinel.Entry; | |||||
import com.alibaba.csp.sentinel.SphU; | |||||
import com.alibaba.csp.sentinel.slots.block.BlockException; | |||||
import com.alibaba.csp.sentinel.slots.block.RuleConstant; | |||||
import com.alibaba.csp.sentinel.slots.block.flow.FlowRule; | |||||
import com.alibaba.csp.sentinel.slots.block.flow.FlowRuleManager; | |||||
import com.alibaba.csp.sentinel.util.TimeUtil; | |||||
/** | |||||
* When {@link FlowRule#controlBehavior} set to {@link RuleConstant#CONTROL_BEHAVIOR_WARM_UP_RATE_LIMITER}, real passed | |||||
* qps will gradually increase to {@link FlowRule#count}, other than burst increasing, and after the passed qps reaches | |||||
* the threshold, the request will pass at a constant interval. | |||||
* <p> | |||||
* In short, {@link RuleConstant#CONTROL_BEHAVIOR_WARM_UP_RATE_LIMITER} behaves like | |||||
* {@link RuleConstant#CONTROL_BEHAVIOR_WARM_UP} + {@link RuleConstant#CONTROL_BEHAVIOR_RATE_LIMITER}. | |||||
* </p> | |||||
* | |||||
* <p/> | |||||
* Run this demo, results are as follows: | |||||
* <pre> | |||||
* ... | |||||
* 1541035848056, total:5, pass:5, block:0 // run in slow qps | |||||
* 1541035849061, total:0, pass:0, block:0 | |||||
* 1541035850066, total:6, pass:6, block:0 | |||||
* 1541035851068, total:2, pass:2, block:0 | |||||
* 1541035852073, total:3, pass:3, block:0 | |||||
* 1541035853078, total:3361, pass:7, block:3354 // request qps burst increase, warm up behavior triggered. | |||||
* 1541035854083, total:3414, pass:7, block:3407 | |||||
* 1541035855087, total:3377, pass:7, block:3370 | |||||
* 1541035856091, total:3366, pass:8, block:3358 | |||||
* 1541035857096, total:3259, pass:8, block:3251 | |||||
* 1541035858101, total:3066, pass:13, block:3054 | |||||
* 1541035859105, total:3042, pass:15, block:3026 | |||||
* 1541035860109, total:2946, pass:17, block:2929 | |||||
* 1541035861113, total:2909, pass:20, block:2889 // warm up process end, pass qps increased to {@link FlowRule#count} | |||||
* 1541035862117, total:2970, pass:20, block:2950 | |||||
* 1541035863122, total:2919, pass:20, block:2899 | |||||
* 1541035864127, total:2903, pass:21, block:2882 | |||||
* 1541035865133, total:2930, pass:20, block:2910 | |||||
* ... | |||||
* </pre> | |||||
* | |||||
* @author CarpenterLee | |||||
* @see WarmUpFlowDemo | |||||
* @see PaceFlowDemo | |||||
*/ | |||||
public class WarmUpRateLimiterFlowDemo { | |||||
private static final String KEY = "abc"; | |||||
private static AtomicInteger pass = new AtomicInteger(); | |||||
private static AtomicInteger block = new AtomicInteger(); | |||||
private static AtomicInteger total = new AtomicInteger(); | |||||
private static volatile boolean stop = false; | |||||
private static final int threadCount = 100; | |||||
private static int seconds = 100; | |||||
public static void main(String[] args) throws Exception { | |||||
initFlowRule(); | |||||
// trigger Sentinel internal init | |||||
Entry entry = null; | |||||
try { | |||||
entry = SphU.entry(KEY); | |||||
} catch (Exception e) { | |||||
} finally { | |||||
if (entry != null) { | |||||
entry.exit(); | |||||
} | |||||
} | |||||
Thread timer = new Thread(new TimerTask()); | |||||
timer.setName("sentinel-timer-task"); | |||||
timer.start(); | |||||
//first make the system run on a very low condition | |||||
for (int i = 0; i < 3; i++) { | |||||
Thread t = new Thread(new SlowTask()); | |||||
t.setName("sentinel-slow-task"); | |||||
t.start(); | |||||
} | |||||
Thread.sleep(5000); | |||||
// request qps burst increase, warm up behavior triggered. | |||||
for (int i = 0; i < threadCount; i++) { | |||||
Thread t = new Thread(new RunTask()); | |||||
t.setName("sentinel-run-task"); | |||||
t.start(); | |||||
} | |||||
} | |||||
private static void initFlowRule() { | |||||
List<FlowRule> rules = new ArrayList<FlowRule>(); | |||||
FlowRule rule1 = new FlowRule(); | |||||
rule1.setResource(KEY); | |||||
rule1.setCount(20); | |||||
rule1.setGrade(RuleConstant.FLOW_GRADE_QPS); | |||||
rule1.setLimitApp("default"); | |||||
rule1.setControlBehavior(RuleConstant.CONTROL_BEHAVIOR_WARM_UP_RATE_LIMITER); | |||||
rule1.setWarmUpPeriodSec(10); | |||||
rule1.setMaxQueueingTimeMs(100); | |||||
rules.add(rule1); | |||||
FlowRuleManager.loadRules(rules); | |||||
} | |||||
static class SlowTask implements Runnable { | |||||
@Override | |||||
public void run() { | |||||
while (!stop) { | |||||
Entry entry = null; | |||||
try { | |||||
entry = SphU.entry(KEY); | |||||
// token acquired, means pass | |||||
pass.addAndGet(1); | |||||
} catch (BlockException e1) { | |||||
block.incrementAndGet(); | |||||
} catch (Exception e2) { | |||||
// biz exception | |||||
} finally { | |||||
total.incrementAndGet(); | |||||
if (entry != null) { | |||||
entry.exit(); | |||||
} | |||||
} | |||||
Random random2 = new Random(); | |||||
try { | |||||
TimeUnit.MILLISECONDS.sleep(random2.nextInt(2000)); | |||||
} catch (InterruptedException e) { | |||||
// ignore | |||||
} | |||||
} | |||||
} | |||||
} | |||||
static class RunTask implements Runnable { | |||||
@Override | |||||
public void run() { | |||||
while (!stop) { | |||||
Entry entry = null; | |||||
try { | |||||
entry = SphU.entry(KEY); | |||||
pass.addAndGet(1); | |||||
} catch (BlockException e1) { | |||||
block.incrementAndGet(); | |||||
} catch (Exception e2) { | |||||
// biz exception | |||||
} finally { | |||||
total.incrementAndGet(); | |||||
if (entry != null) { | |||||
entry.exit(); | |||||
} | |||||
} | |||||
Random random2 = new Random(); | |||||
try { | |||||
TimeUnit.MILLISECONDS.sleep(random2.nextInt(50)); | |||||
} catch (InterruptedException e) { | |||||
// ignore | |||||
} | |||||
} | |||||
} | |||||
} | |||||
static class TimerTask implements Runnable { | |||||
@Override | |||||
public void run() { | |||||
long start = System.currentTimeMillis(); | |||||
System.out.println("begin to statistic!!!"); | |||||
long oldTotal = 0; | |||||
long oldPass = 0; | |||||
long oldBlock = 0; | |||||
while (!stop) { | |||||
try { | |||||
TimeUnit.SECONDS.sleep(1); | |||||
} catch (InterruptedException e) { | |||||
} | |||||
long globalTotal = total.get(); | |||||
long oneSecondTotal = globalTotal - oldTotal; | |||||
oldTotal = globalTotal; | |||||
long globalPass = pass.get(); | |||||
long oneSecondPass = globalPass - oldPass; | |||||
oldPass = globalPass; | |||||
long globalBlock = block.get(); | |||||
long oneSecondBlock = globalBlock - oldBlock; | |||||
oldBlock = globalBlock; | |||||
System.out.println(TimeUtil.currentTimeMillis() + ", total:" + oneSecondTotal | |||||
+ ", pass:" + oneSecondPass | |||||
+ ", block:" + oneSecondBlock); | |||||
if (seconds-- <= 0) { | |||||
stop = true; | |||||
} | |||||
} | |||||
long cost = System.currentTimeMillis() - start; | |||||
System.out.println("time cost: " + cost + " ms"); | |||||
System.out.println("total:" + total.get() + ", pass:" + pass.get() | |||||
+ ", block:" + block.get()); | |||||
System.exit(0); | |||||
} | |||||
} | |||||
} |