Signed-off-by: Eric Zhao <sczyh16@gmail.com>master
@@ -15,6 +15,8 @@ | |||||
*/ | */ | ||||
package com.alibaba.csp.sentinel.adapter.reactor; | package com.alibaba.csp.sentinel.adapter.reactor; | ||||
import java.util.Arrays; | |||||
import com.alibaba.csp.sentinel.EntryType; | import com.alibaba.csp.sentinel.EntryType; | ||||
import com.alibaba.csp.sentinel.util.AssertUtil; | import com.alibaba.csp.sentinel.util.AssertUtil; | ||||
@@ -26,6 +28,8 @@ public class EntryConfig { | |||||
private final String resourceName; | private final String resourceName; | ||||
private final EntryType entryType; | private final EntryType entryType; | ||||
private final int acquireCount; | |||||
private final Object[] args; | |||||
private final ContextConfig contextConfig; | private final ContextConfig contextConfig; | ||||
public EntryConfig(String resourceName) { | public EntryConfig(String resourceName) { | ||||
@@ -37,9 +41,18 @@ public class EntryConfig { | |||||
} | } | ||||
public EntryConfig(String resourceName, EntryType entryType, ContextConfig contextConfig) { | public EntryConfig(String resourceName, EntryType entryType, ContextConfig contextConfig) { | ||||
checkParams(resourceName, entryType); | |||||
this(resourceName, entryType, 1, new Object[0], contextConfig); | |||||
} | |||||
public EntryConfig(String resourceName, EntryType entryType, int acquireCount, Object[] args, | |||||
ContextConfig contextConfig) { | |||||
AssertUtil.assertNotBlank(resourceName, "resourceName cannot be blank"); | |||||
AssertUtil.notNull(entryType, "entryType cannot be null"); | |||||
AssertUtil.isTrue(acquireCount > 0, "acquireCount should be positive"); | |||||
this.resourceName = resourceName; | this.resourceName = resourceName; | ||||
this.entryType = entryType; | this.entryType = entryType; | ||||
this.acquireCount = acquireCount; | |||||
this.args = args; | |||||
// Constructed ContextConfig should be valid here. Null is allowed here. | // Constructed ContextConfig should be valid here. Null is allowed here. | ||||
this.contextConfig = contextConfig; | this.contextConfig = contextConfig; | ||||
} | } | ||||
@@ -52,18 +65,16 @@ public class EntryConfig { | |||||
return entryType; | return entryType; | ||||
} | } | ||||
public ContextConfig getContextConfig() { | |||||
return contextConfig; | |||||
public int getAcquireCount() { | |||||
return acquireCount; | |||||
} | } | ||||
public static void assertValid(EntryConfig config) { | |||||
AssertUtil.notNull(config, "entry config cannot be null"); | |||||
checkParams(config.resourceName, config.entryType); | |||||
public Object[] getArgs() { | |||||
return args; | |||||
} | } | ||||
private static void checkParams(String resourceName, EntryType entryType) { | |||||
AssertUtil.assertNotBlank(resourceName, "resourceName cannot be blank"); | |||||
AssertUtil.notNull(entryType, "entryType cannot be null"); | |||||
public ContextConfig getContextConfig() { | |||||
return contextConfig; | |||||
} | } | ||||
@Override | @Override | ||||
@@ -71,6 +82,8 @@ public class EntryConfig { | |||||
return "EntryConfig{" + | return "EntryConfig{" + | ||||
"resourceName='" + resourceName + '\'' + | "resourceName='" + resourceName + '\'' + | ||||
", entryType=" + entryType + | ", entryType=" + entryType + | ||||
", acquireCount=" + acquireCount + | |||||
", args=" + Arrays.toString(args) + | |||||
", contextConfig=" + contextConfig + | ", contextConfig=" + contextConfig + | ||||
'}'; | '}'; | ||||
} | } | ||||
@@ -15,6 +15,8 @@ | |||||
*/ | */ | ||||
package com.alibaba.csp.sentinel.adapter.reactor; | package com.alibaba.csp.sentinel.adapter.reactor; | ||||
import com.alibaba.csp.sentinel.util.AssertUtil; | |||||
import reactor.core.CoreSubscriber; | import reactor.core.CoreSubscriber; | ||||
import reactor.core.publisher.Flux; | import reactor.core.publisher.Flux; | ||||
import reactor.core.publisher.FluxOperator; | import reactor.core.publisher.FluxOperator; | ||||
@@ -29,7 +31,7 @@ public class FluxSentinelOperator<T> extends FluxOperator<T, T> { | |||||
public FluxSentinelOperator(Flux<? extends T> source, EntryConfig entryConfig) { | public FluxSentinelOperator(Flux<? extends T> source, EntryConfig entryConfig) { | ||||
super(source); | super(source); | ||||
EntryConfig.assertValid(entryConfig); | |||||
AssertUtil.notNull(entryConfig, "entryConfig cannot be null"); | |||||
this.entryConfig = entryConfig; | this.entryConfig = entryConfig; | ||||
} | } | ||||
@@ -15,6 +15,8 @@ | |||||
*/ | */ | ||||
package com.alibaba.csp.sentinel.adapter.reactor; | package com.alibaba.csp.sentinel.adapter.reactor; | ||||
import com.alibaba.csp.sentinel.util.AssertUtil; | |||||
import reactor.core.CoreSubscriber; | import reactor.core.CoreSubscriber; | ||||
import reactor.core.publisher.Mono; | import reactor.core.publisher.Mono; | ||||
import reactor.core.publisher.MonoOperator; | import reactor.core.publisher.MonoOperator; | ||||
@@ -29,7 +31,7 @@ public class MonoSentinelOperator<T> extends MonoOperator<T, T> { | |||||
public MonoSentinelOperator(Mono<? extends T> source, EntryConfig entryConfig) { | public MonoSentinelOperator(Mono<? extends T> source, EntryConfig entryConfig) { | ||||
super(source); | super(source); | ||||
EntryConfig.assertValid(entryConfig); | |||||
AssertUtil.notNull(entryConfig, "entryConfig cannot be null"); | |||||
this.entryConfig = entryConfig; | this.entryConfig = entryConfig; | ||||
} | } | ||||
@@ -23,6 +23,7 @@ import com.alibaba.csp.sentinel.SphU; | |||||
import com.alibaba.csp.sentinel.Tracer; | import com.alibaba.csp.sentinel.Tracer; | ||||
import com.alibaba.csp.sentinel.context.ContextUtil; | import com.alibaba.csp.sentinel.context.ContextUtil; | ||||
import com.alibaba.csp.sentinel.slots.block.BlockException; | import com.alibaba.csp.sentinel.slots.block.BlockException; | ||||
import com.alibaba.csp.sentinel.util.AssertUtil; | |||||
import com.alibaba.csp.sentinel.util.function.Supplier; | import com.alibaba.csp.sentinel.util.function.Supplier; | ||||
import org.reactivestreams.Subscription; | import org.reactivestreams.Subscription; | ||||
@@ -53,7 +54,7 @@ public class SentinelReactorSubscriber<T> extends InheritableBaseSubscriber<T> { | |||||
} | } | ||||
private void checkEntryConfig(EntryConfig config) { | private void checkEntryConfig(EntryConfig config) { | ||||
EntryConfig.assertValid(config); | |||||
AssertUtil.notNull(config, "entryConfig cannot be null"); | |||||
} | } | ||||
@Override | @Override | ||||
@@ -88,7 +89,8 @@ public class SentinelReactorSubscriber<T> extends InheritableBaseSubscriber<T> { | |||||
ContextUtil.enter(sentinelContextConfig.getContextName(), sentinelContextConfig.getOrigin()); | ContextUtil.enter(sentinelContextConfig.getContextName(), sentinelContextConfig.getOrigin()); | ||||
} | } | ||||
try { | try { | ||||
AsyncEntry entry = SphU.asyncEntry(entryConfig.getResourceName(), entryConfig.getEntryType()); | |||||
AsyncEntry entry = SphU.asyncEntry(entryConfig.getResourceName(), entryConfig.getEntryType(), | |||||
entryConfig.getAcquireCount(), entryConfig.getArgs()); | |||||
this.currentEntry = entry; | this.currentEntry = entry; | ||||
actual.onSubscribe(this); | actual.onSubscribe(this); | ||||
} catch (BlockException ex) { | } catch (BlockException ex) { | ||||
@@ -17,6 +17,8 @@ package com.alibaba.csp.sentinel.adapter.reactor; | |||||
import java.util.function.Function; | import java.util.function.Function; | ||||
import com.alibaba.csp.sentinel.util.AssertUtil; | |||||
import org.reactivestreams.Publisher; | import org.reactivestreams.Publisher; | ||||
import reactor.core.publisher.Flux; | import reactor.core.publisher.Flux; | ||||
import reactor.core.publisher.Mono; | import reactor.core.publisher.Mono; | ||||
@@ -36,7 +38,7 @@ public class SentinelReactorTransformer<T> implements Function<Publisher<T>, Pub | |||||
} | } | ||||
public SentinelReactorTransformer(EntryConfig entryConfig) { | public SentinelReactorTransformer(EntryConfig entryConfig) { | ||||
EntryConfig.assertValid(entryConfig); | |||||
AssertUtil.notNull(entryConfig, "entryConfig cannot be null"); | |||||
this.entryConfig = entryConfig; | this.entryConfig = entryConfig; | ||||
} | } | ||||