Browse Source

Reactive: add Sentinel Reactor adapter module (#545)

* Add reactive adapter implementation for Project Reactor (Mono/Flux) including a reactor transformer and an experimental `ReactorSphU`
* Add an `InheritableBaseSubscriber` that derives from the original BaseSubscriber of reactor-core
* Add basic test cases for reactor adapter
* Add Sentinel context enter support for EntryConfig in reactor transformer

Signed-off-by: Eric Zhao <sczyh16@gmail.com>
master
Eric Zhao GitHub 5 years ago
parent
commit
e857b5cef7
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 1139 additions and 0 deletions
  1. +2
    -0
      sentinel-adapter/pom.xml
  2. +44
    -0
      sentinel-adapter/sentinel-reactor-adapter/pom.xml
  3. +57
    -0
      sentinel-adapter/sentinel-reactor-adapter/src/main/java/com/alibaba/csp/sentinel/adapter/reactor/ContextConfig.java
  4. +77
    -0
      sentinel-adapter/sentinel-reactor-adapter/src/main/java/com/alibaba/csp/sentinel/adapter/reactor/EntryConfig.java
  5. +40
    -0
      sentinel-adapter/sentinel-reactor-adapter/src/main/java/com/alibaba/csp/sentinel/adapter/reactor/FluxSentinelOperator.java
  6. +243
    -0
      sentinel-adapter/sentinel-reactor-adapter/src/main/java/com/alibaba/csp/sentinel/adapter/reactor/InheritableBaseSubscriber.java
  7. +40
    -0
      sentinel-adapter/sentinel-reactor-adapter/src/main/java/com/alibaba/csp/sentinel/adapter/reactor/MonoSentinelOperator.java
  8. +72
    -0
      sentinel-adapter/sentinel-reactor-adapter/src/main/java/com/alibaba/csp/sentinel/adapter/reactor/ReactorSphU.java
  9. +27
    -0
      sentinel-adapter/sentinel-reactor-adapter/src/main/java/com/alibaba/csp/sentinel/adapter/reactor/SentinelReactorConstants.java
  10. +166
    -0
      sentinel-adapter/sentinel-reactor-adapter/src/main/java/com/alibaba/csp/sentinel/adapter/reactor/SentinelReactorSubscriber.java
  11. +54
    -0
      sentinel-adapter/sentinel-reactor-adapter/src/main/java/com/alibaba/csp/sentinel/adapter/reactor/SentinelReactorTransformer.java
  12. +75
    -0
      sentinel-adapter/sentinel-reactor-adapter/src/test/java/com/alibaba/csp/sentinel/adapter/reactor/FluxSentinelOperatorTestIntegrationTest.java
  13. +168
    -0
      sentinel-adapter/sentinel-reactor-adapter/src/test/java/com/alibaba/csp/sentinel/adapter/reactor/MonoSentinelOperatorIntegrationTest.java
  14. +74
    -0
      sentinel-adapter/sentinel-reactor-adapter/src/test/java/com/alibaba/csp/sentinel/adapter/reactor/ReactorSphUTest.java

+ 2
- 0
sentinel-adapter/pom.xml View File

@@ -19,6 +19,7 @@
<module>sentinel-dubbo-adapter</module>
<module>sentinel-grpc-adapter</module>
<module>sentinel-zuul-adapter</module>
<module>sentinel-reactor-adapter</module>
</modules>

<dependencyManagement>
@@ -38,6 +39,7 @@
<artifactId>sentinel-web-servlet</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>


+ 44
- 0
sentinel-adapter/sentinel-reactor-adapter/pom.xml View File

@@ -0,0 +1,44 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>sentinel-adapter</artifactId>
<groupId>com.alibaba.csp</groupId>
<version>1.5.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>sentinel-reactor-adapter</artifactId>

<properties>
<java.source.version>1.8</java.source.version>
<java.target.version>1.8</java.target.version>
<reactor.version>3.2.6.RELEASE</reactor.version>
</properties>

<dependencies>
<dependency>
<groupId>com.alibaba.csp</groupId>
<artifactId>sentinel-core</artifactId>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>${reactor.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<version>${reactor.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>

+ 57
- 0
sentinel-adapter/sentinel-reactor-adapter/src/main/java/com/alibaba/csp/sentinel/adapter/reactor/ContextConfig.java View File

@@ -0,0 +1,57 @@
/*
* Copyright 1999-2019 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.csp.sentinel.adapter.reactor;

import com.alibaba.csp.sentinel.util.AssertUtil;
import com.alibaba.csp.sentinel.util.StringUtil;

/**
* @author Eric Zhao
*/
public class ContextConfig {

private final String contextName;
private final String origin;

public ContextConfig(String contextName) {
this(contextName, "");
}

public ContextConfig(String contextName, String origin) {
AssertUtil.assertNotBlank(contextName, "contextName cannot be blank");
this.contextName = contextName;
if (StringUtil.isBlank(origin)) {
origin = "";
}
this.origin = origin;
}

public String getContextName() {
return contextName;
}

public String getOrigin() {
return origin;
}

@Override
public String toString() {
return "ContextConfig{" +
"contextName='" + contextName + '\'' +
", origin='" + origin + '\'' +
'}';
}
}

+ 77
- 0
sentinel-adapter/sentinel-reactor-adapter/src/main/java/com/alibaba/csp/sentinel/adapter/reactor/EntryConfig.java View File

@@ -0,0 +1,77 @@
/*
* Copyright 1999-2019 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.csp.sentinel.adapter.reactor;

import com.alibaba.csp.sentinel.EntryType;
import com.alibaba.csp.sentinel.util.AssertUtil;

/**
* @author Eric Zhao
* @since 1.5.0
*/
public class EntryConfig {

private final String resourceName;
private final EntryType entryType;
private final ContextConfig contextConfig;

public EntryConfig(String resourceName) {
this(resourceName, EntryType.OUT);
}

public EntryConfig(String resourceName, EntryType entryType) {
this(resourceName, entryType, null);
}

public EntryConfig(String resourceName, EntryType entryType, ContextConfig contextConfig) {
checkParams(resourceName, entryType);
this.resourceName = resourceName;
this.entryType = entryType;
// Constructed ContextConfig should be valid here. Null is allowed here.
this.contextConfig = contextConfig;
}

public String getResourceName() {
return resourceName;
}

public EntryType getEntryType() {
return entryType;
}

public ContextConfig getContextConfig() {
return contextConfig;
}

public static void assertValid(EntryConfig config) {
AssertUtil.notNull(config, "entry config cannot be null");
checkParams(config.resourceName, config.entryType);
}

private static void checkParams(String resourceName, EntryType entryType) {
AssertUtil.assertNotBlank(resourceName, "resourceName cannot be blank");
AssertUtil.notNull(entryType, "entryType cannot be null");
}

@Override
public String toString() {
return "EntryConfig{" +
"resourceName='" + resourceName + '\'' +
", entryType=" + entryType +
", contextConfig=" + contextConfig +
'}';
}
}

+ 40
- 0
sentinel-adapter/sentinel-reactor-adapter/src/main/java/com/alibaba/csp/sentinel/adapter/reactor/FluxSentinelOperator.java View File

@@ -0,0 +1,40 @@
/*
* Copyright 1999-2019 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.csp.sentinel.adapter.reactor;

import reactor.core.CoreSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxOperator;

/**
* @author Eric Zhao
* @since 1.5.0
*/
public class FluxSentinelOperator<T> extends FluxOperator<T, T> {

private final EntryConfig entryConfig;

public FluxSentinelOperator(Flux<? extends T> source, EntryConfig entryConfig) {
super(source);
EntryConfig.assertValid(entryConfig);
this.entryConfig = entryConfig;
}

@Override
public void subscribe(CoreSubscriber<? super T> actual) {
source.subscribe(new SentinelReactorSubscriber<>(entryConfig, actual, false));
}
}

+ 243
- 0
sentinel-adapter/sentinel-reactor-adapter/src/main/java/com/alibaba/csp/sentinel/adapter/reactor/InheritableBaseSubscriber.java View File

@@ -0,0 +1,243 @@
/*
* Copyright 1999-2019 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.csp.sentinel.adapter.reactor;

import java.util.Objects;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;

import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.Disposable;
import reactor.core.Exceptions;
import reactor.core.publisher.Operators;
import reactor.core.publisher.SignalType;

/**
* <p>
* Copied from {@link reactor.core.publisher.BaseSubscriber} of reactor-core,
* but allow sub-classes to override {@code onSubscribe}, {@code onNext},
* {@code onError} and {@code onComplete} method for customization.
* </p>
* <p>This base subscriber also provides predicate for {@code onErrorDropped} hook as a workaround for Sentinel.</p>
*/
abstract class InheritableBaseSubscriber<T> implements CoreSubscriber<T>, Subscription, Disposable {

volatile Subscription subscription;

static AtomicReferenceFieldUpdater<InheritableBaseSubscriber, Subscription> S =
AtomicReferenceFieldUpdater.newUpdater(InheritableBaseSubscriber.class, Subscription.class,
"subscription");

/**
* Return current {@link Subscription}
*
* @return current {@link Subscription}
*/
protected Subscription upstream() {
return subscription;
}

@Override
public boolean isDisposed() {
return subscription == Operators.cancelledSubscription();
}

/**
* {@link Disposable#dispose() Dispose} the {@link Subscription} by
* {@link Subscription#cancel() cancelling} it.
*/
@Override
public void dispose() {
cancel();
}

/**
* Hook for further processing of onSubscribe's Subscription. Implement this method
* to call {@link #request(long)} as an initial request. Values other than the
* unbounded {@code Long.MAX_VALUE} imply that you'll also call request in
* {@link #hookOnNext(Object)}.
* <p> Defaults to request unbounded Long.MAX_VALUE as in {@link #requestUnbounded()}
*
* @param subscription the subscription to optionally process
*/
protected void hookOnSubscribe(Subscription subscription) {
subscription.request(Long.MAX_VALUE);
}

/**
* Hook for processing of onNext values. You can call {@link #request(long)} here
* to further request data from the source {@code org.reactivestreams.Publisher} if
* the {@link #hookOnSubscribe(Subscription) initial request} wasn't unbounded.
* <p>Defaults to doing nothing.
*
* @param value the emitted value to process
*/
protected void hookOnNext(T value) {
// NO-OP
}

/**
* Optional hook for completion processing. Defaults to doing nothing.
*/
protected void hookOnComplete() {
// NO-OP
}

/**
* Optional hook for error processing. Default is to call
* {@link Exceptions#errorCallbackNotImplemented(Throwable)}.
*
* @param throwable the error to process
*/
protected void hookOnError(Throwable throwable) {
throw Exceptions.errorCallbackNotImplemented(throwable);
}

/**
* Optional hook executed when the subscription is cancelled by calling this
* Subscriber's {@link #cancel()} method. Defaults to doing nothing.
*/
protected void hookOnCancel() {
//NO-OP
}

/**
* Optional hook executed after any of the termination events (onError, onComplete,
* cancel). The hook is executed in addition to and after {@link #hookOnError(Throwable)},
* {@link #hookOnComplete()} and {@link #hookOnCancel()} hooks, even if these callbacks
* fail. Defaults to doing nothing. A failure of the callback will be caught by
* {@code Operators#onErrorDropped(Throwable, reactor.util.context.Context)}.
*
* @param type the type of termination event that triggered the hook
* ({@link SignalType#ON_ERROR}, {@link SignalType#ON_COMPLETE} or
* {@link SignalType#CANCEL})
*/
protected void hookFinally(SignalType type) {
//NO-OP
}

@Override
public void onSubscribe(Subscription s) {
if (Operators.setOnce(S, this, s)) {
try {
hookOnSubscribe(s);
} catch (Throwable throwable) {
onError(Operators.onOperatorError(s, throwable, currentContext()));
}
}
}

@Override
public void onNext(T value) {
Objects.requireNonNull(value, "onNext");
try {
hookOnNext(value);
} catch (Throwable throwable) {
onError(Operators.onOperatorError(subscription, throwable, value, currentContext()));
}
}

protected boolean shouldCallErrorDropHook() {
return true;
}

@Override
public void onError(Throwable t) {
Objects.requireNonNull(t, "onError");

if (S.getAndSet(this, Operators.cancelledSubscription()) == Operators
.cancelledSubscription()) {
// Already cancelled concurrently

// Workaround for Sentinel BlockException:
// Here we add a predicate method to decide whether exception should be dropped implicitly
// or call the {@code onErrorDropped} hook.
if (shouldCallErrorDropHook()) {
Operators.onErrorDropped(t, currentContext());
}

return;
}

try {
hookOnError(t);
} catch (Throwable e) {
e = Exceptions.addSuppressed(e, t);
Operators.onErrorDropped(e, currentContext());
} finally {
safeHookFinally(SignalType.ON_ERROR);
}
}

@Override
public void onComplete() {
if (S.getAndSet(this, Operators.cancelledSubscription()) != Operators
.cancelledSubscription()) {
//we're sure it has not been concurrently cancelled
try {
hookOnComplete();
} catch (Throwable throwable) {
//onError itself will short-circuit due to the CancelledSubscription being push above
hookOnError(Operators.onOperatorError(throwable, currentContext()));
} finally {
safeHookFinally(SignalType.ON_COMPLETE);
}
}
}

@Override
public final void request(long n) {
if (Operators.validate(n)) {
Subscription s = this.subscription;
if (s != null) {
s.request(n);
}
}
}

/**
* {@link #request(long) Request} an unbounded amount.
*/
public final void requestUnbounded() {
request(Long.MAX_VALUE);
}

@Override
public final void cancel() {
if (Operators.terminate(S, this)) {
try {
hookOnCancel();
} catch (Throwable throwable) {
hookOnError(Operators.onOperatorError(subscription, throwable, currentContext()));
} finally {
safeHookFinally(SignalType.CANCEL);
}
}
}

void safeHookFinally(SignalType type) {
try {
hookFinally(type);
} catch (Throwable finallyFailure) {
Operators.onErrorDropped(finallyFailure, currentContext());
}
}

@Override
public String toString() {
return getClass().getSimpleName();
}
}

+ 40
- 0
sentinel-adapter/sentinel-reactor-adapter/src/main/java/com/alibaba/csp/sentinel/adapter/reactor/MonoSentinelOperator.java View File

@@ -0,0 +1,40 @@
/*
* Copyright 1999-2019 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.csp.sentinel.adapter.reactor;

import reactor.core.CoreSubscriber;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoOperator;

/**
* @author Eric Zhao
* @since 1.5.0
*/
public class MonoSentinelOperator<T> extends MonoOperator<T, T> {

private final EntryConfig entryConfig;

public MonoSentinelOperator(Mono<? extends T> source, EntryConfig entryConfig) {
super(source);
EntryConfig.assertValid(entryConfig);
this.entryConfig = entryConfig;
}

@Override
public void subscribe(CoreSubscriber<? super T> actual) {
source.subscribe(new SentinelReactorSubscriber<>(entryConfig, actual, true));
}
}

+ 72
- 0
sentinel-adapter/sentinel-reactor-adapter/src/main/java/com/alibaba/csp/sentinel/adapter/reactor/ReactorSphU.java View File

@@ -0,0 +1,72 @@
/*
* Copyright 1999-2019 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.csp.sentinel.adapter.reactor;

import java.util.concurrent.atomic.AtomicReference;

import com.alibaba.csp.sentinel.AsyncEntry;
import com.alibaba.csp.sentinel.EntryType;
import com.alibaba.csp.sentinel.SphU;
import com.alibaba.csp.sentinel.Tracer;
import com.alibaba.csp.sentinel.context.Context;
import com.alibaba.csp.sentinel.slots.block.BlockException;

import reactor.core.publisher.Mono;

/**
* A {@link SphU} adapter with Project Reactor.
*
* @author Eric Zhao
* @since 1.5.0
*/
public final class ReactorSphU {

public static <R> Mono<R> entryWith(String resourceName, Mono<R> actual) {
return entryWith(resourceName, EntryType.OUT, actual);
}

public static <R> Mono<R> entryWith(String resourceName, EntryType entryType, Mono<R> actual) {
final AtomicReference<AsyncEntry> entryWrapper = new AtomicReference<>(null);
return Mono.defer(() -> {
try {
AsyncEntry entry = SphU.asyncEntry(resourceName, entryType);
entryWrapper.set(entry);
return actual.subscriberContext(context -> {
if (entry == null) {
return context;
}
Context sentinelContext = entry.getAsyncContext();
if (sentinelContext == null) {
return context;
}
// TODO: check GC friendly?
return context.put(SentinelReactorConstants.SENTINEL_CONTEXT_KEY, sentinelContext);
}).doOnSuccessOrError((o, t) -> {
if (entry != null && entryWrapper.compareAndSet(entry, null)) {
if (t != null) {
Tracer.traceContext(t, 1, entry.getAsyncContext());
}
entry.exit();
}
});
} catch (BlockException ex) {
return Mono.error(ex);
}
});
}

private ReactorSphU() {}
}

+ 27
- 0
sentinel-adapter/sentinel-reactor-adapter/src/main/java/com/alibaba/csp/sentinel/adapter/reactor/SentinelReactorConstants.java View File

@@ -0,0 +1,27 @@
/*
* Copyright 1999-2019 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.csp.sentinel.adapter.reactor;

/**
* @author Eric Zhao
* @since 1.5.0
*/
public final class SentinelReactorConstants {

public static final String SENTINEL_CONTEXT_KEY = "_sentinel_context";

private SentinelReactorConstants() {}
}

+ 166
- 0
sentinel-adapter/sentinel-reactor-adapter/src/main/java/com/alibaba/csp/sentinel/adapter/reactor/SentinelReactorSubscriber.java View File

@@ -0,0 +1,166 @@
/*
* Copyright 1999-2019 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.csp.sentinel.adapter.reactor;

import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;

import com.alibaba.csp.sentinel.AsyncEntry;
import com.alibaba.csp.sentinel.SphU;
import com.alibaba.csp.sentinel.Tracer;
import com.alibaba.csp.sentinel.context.ContextUtil;
import com.alibaba.csp.sentinel.slots.block.BlockException;
import com.alibaba.csp.sentinel.util.function.Supplier;

import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.util.context.Context;

/**
* @author Eric Zhao
* @since 1.5.0
*/
public class SentinelReactorSubscriber<T> extends InheritableBaseSubscriber<T> {

private final EntryConfig entryConfig;

private final CoreSubscriber<? super T> actual;
private final boolean unary;

private volatile AsyncEntry currentEntry;
private final AtomicBoolean entryExited = new AtomicBoolean(false);

public SentinelReactorSubscriber(EntryConfig entryConfig,
CoreSubscriber<? super T> actual,
boolean unary) {
checkEntryConfig(entryConfig);
this.entryConfig = entryConfig;
this.actual = actual;
this.unary = unary;
}

private void checkEntryConfig(EntryConfig config) {
EntryConfig.assertValid(config);
}

@Override
public Context currentContext() {
if (currentEntry == null || entryExited.get()) {
return actual.currentContext();
}
com.alibaba.csp.sentinel.context.Context sentinelContext = currentEntry.getAsyncContext();
if (sentinelContext == null) {
return actual.currentContext();
}
return actual.currentContext()
.put(SentinelReactorConstants.SENTINEL_CONTEXT_KEY, currentEntry.getAsyncContext());
}

private void doWithContextOrCurrent(Supplier<Optional<com.alibaba.csp.sentinel.context.Context>> contextSupplier,
Runnable f) {
Optional<com.alibaba.csp.sentinel.context.Context> contextOpt = contextSupplier.get();
if (!contextOpt.isPresent()) {
// Provided context is absent, use current context.
f.run();
} else {
// Run on provided context.
ContextUtil.runOnContext(contextOpt.get(), f);
}
}

private void entryWhenSubscribed() {
ContextConfig sentinelContextConfig = entryConfig.getContextConfig();
if (sentinelContextConfig != null) {
// If current we're already in a context, the context config won't work.
ContextUtil.enter(sentinelContextConfig.getContextName(), sentinelContextConfig.getOrigin());
}
try {
AsyncEntry entry = SphU.asyncEntry(entryConfig.getResourceName());
this.currentEntry = entry;
actual.onSubscribe(this);
} catch (BlockException ex) {
// Mark as completed (exited) explicitly.
entryExited.set(true);
// Signal cancel and propagate the {@code BlockException}.
cancel();
actual.onSubscribe(this);
actual.onError(ex);
} finally {
if (sentinelContextConfig != null) {
ContextUtil.exit();
}
}
}

@Override
protected void hookOnSubscribe(Subscription subscription) {
doWithContextOrCurrent(() -> currentContext().getOrEmpty(SentinelReactorConstants.SENTINEL_CONTEXT_KEY),
this::entryWhenSubscribed);
}

@Override
protected void hookOnNext(T value) {
if (isDisposed()) {
tryCompleteEntry();
return;
}
doWithContextOrCurrent(() -> Optional.ofNullable(currentEntry).map(AsyncEntry::getAsyncContext),
() -> actual.onNext(value));

if (unary) {
// For some cases of unary operator (Mono), we have to do this during onNext hook.
// e.g. this kind of order: onSubscribe() -> onNext() -> cancel() -> onComplete()
// the onComplete hook will not be executed so we'll need to complete the entry in advance.
tryCompleteEntry();
}
}

@Override
protected void hookOnComplete() {
tryCompleteEntry();
actual.onComplete();
}

@Override
protected boolean shouldCallErrorDropHook() {
// When flow control triggered or stream terminated, the incoming
// deprecated exceptions should be dropped implicitly, so we'll not call the `onErrorDropped` hook.
return !entryExited.get();
}

@Override
protected void hookOnError(Throwable t) {
if (currentEntry != null && currentEntry.getAsyncContext() != null) {
// Normal requests with non-BlockException will go through here.
Tracer.traceContext(t, 1, currentEntry.getAsyncContext());
}
tryCompleteEntry();
actual.onError(t);
}

@Override
protected void hookOnCancel() {

}

private boolean tryCompleteEntry() {
if (currentEntry != null && entryExited.compareAndSet(false, true)) {
currentEntry.exit();
return true;
}
return false;
}
}

+ 54
- 0
sentinel-adapter/sentinel-reactor-adapter/src/main/java/com/alibaba/csp/sentinel/adapter/reactor/SentinelReactorTransformer.java View File

@@ -0,0 +1,54 @@
/*
* Copyright 1999-2019 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.csp.sentinel.adapter.reactor;

import java.util.function.Function;

import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/**
* A transformer that transforms given {@code Publisher} to a wrapped Sentinel reactor operator.
*
* @author Eric Zhao
* @since 1.5.0
*/
public class SentinelReactorTransformer<T> implements Function<Publisher<T>, Publisher<T>> {

private final EntryConfig entryConfig;

public SentinelReactorTransformer(String resourceName) {
this(new EntryConfig(resourceName));
}

public SentinelReactorTransformer(EntryConfig entryConfig) {
EntryConfig.assertValid(entryConfig);
this.entryConfig = entryConfig;
}

@Override
public Publisher<T> apply(Publisher<T> publisher) {
if (publisher instanceof Mono) {
return new MonoSentinelOperator<>((Mono<T>) publisher, entryConfig);
}
if (publisher instanceof Flux) {
return new FluxSentinelOperator<>((Flux<T>) publisher, entryConfig);
}

throw new IllegalStateException("Publisher type is not supported: " + publisher.getClass().getCanonicalName());
}
}

+ 75
- 0
sentinel-adapter/sentinel-reactor-adapter/src/test/java/com/alibaba/csp/sentinel/adapter/reactor/FluxSentinelOperatorTestIntegrationTest.java View File

@@ -0,0 +1,75 @@
package com.alibaba.csp.sentinel.adapter.reactor;

import java.util.ArrayList;
import java.util.Collections;

import com.alibaba.csp.sentinel.node.ClusterNode;
import com.alibaba.csp.sentinel.slots.block.BlockException;
import com.alibaba.csp.sentinel.slots.block.flow.FlowRule;
import com.alibaba.csp.sentinel.slots.block.flow.FlowRuleManager;
import com.alibaba.csp.sentinel.slots.clusterbuilder.ClusterBuilderSlot;

import org.junit.Test;
import reactor.core.publisher.Flux;
import reactor.test.StepVerifier;

import static org.junit.Assert.*;

/**
* @author Eric Zhao
*/
public class FluxSentinelOperatorTestIntegrationTest {

@Test
public void testEmitMultipleValueSuccess() {
String resourceName = createResourceName("testEmitMultipleSuccess");
StepVerifier.create(Flux.just(1, 2)
.map(e -> e * 2)
.transform(new SentinelReactorTransformer<>(resourceName)))
.expectNext(2)
.expectNext(4)
.verifyComplete();

ClusterNode cn = ClusterBuilderSlot.getClusterNode(resourceName);
assertNotNull(cn);
assertEquals(1, cn.passQps(), 0.01);
}

@Test
public void testEmitFluxError() {
String resourceName = createResourceName("testEmitFluxError");
StepVerifier.create(Flux.error(new IllegalAccessException("oops"))
.transform(new SentinelReactorTransformer<>(resourceName)))
.expectError(IllegalAccessException.class)
.verify();

ClusterNode cn = ClusterBuilderSlot.getClusterNode(resourceName);
assertNotNull(cn);
assertEquals(1, cn.passQps());
assertEquals(1, cn.totalException());
}

@Test
public void testEmitMultipleValuesWhenFlowControlTriggered() {
String resourceName = createResourceName("testEmitMultipleValuesWhenFlowControlTriggered");
FlowRuleManager.loadRules(Collections.singletonList(
new FlowRule(resourceName).setCount(0)
));
StepVerifier.create(Flux.just(1, 3, 5)
.map(e -> e * 2)
.transform(new SentinelReactorTransformer<>(resourceName)))
.expectError(BlockException.class)
.verify();

ClusterNode cn = ClusterBuilderSlot.getClusterNode(resourceName);
assertNotNull(cn);
assertEquals(0, cn.passQps(), 0.01);
assertEquals(1, cn.blockRequest());

FlowRuleManager.loadRules(new ArrayList<>());
}

private String createResourceName(String resourceName) {
return "reactor_test_flux_" + resourceName;
}
}

+ 168
- 0
sentinel-adapter/sentinel-reactor-adapter/src/test/java/com/alibaba/csp/sentinel/adapter/reactor/MonoSentinelOperatorIntegrationTest.java View File

@@ -0,0 +1,168 @@
package com.alibaba.csp.sentinel.adapter.reactor;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;

import com.alibaba.csp.sentinel.Constants;
import com.alibaba.csp.sentinel.EntryType;
import com.alibaba.csp.sentinel.node.ClusterNode;
import com.alibaba.csp.sentinel.node.EntranceNode;
import com.alibaba.csp.sentinel.slots.block.BlockException;
import com.alibaba.csp.sentinel.slots.block.flow.FlowRule;
import com.alibaba.csp.sentinel.slots.block.flow.FlowRuleManager;
import com.alibaba.csp.sentinel.slots.clusterbuilder.ClusterBuilderSlot;

import org.junit.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;

import static org.junit.Assert.*;

/**
* @author Eric Zhao
*/
public class MonoSentinelOperatorIntegrationTest {

@Test
public void testTransformMonoWithSentinelContextEnter() {
String resourceName = createResourceName("testTransformMonoWithSentinelContextEnter");
String contextName = "test_reactive_context";
String origin = "originA";
FlowRuleManager.loadRules(Collections.singletonList(
new FlowRule(resourceName).setCount(0).setLimitApp(origin).as(FlowRule.class)
));
StepVerifier.create(Mono.just(2)
.transform(new SentinelReactorTransformer<>(
// Customized context with origin.
new EntryConfig(resourceName, EntryType.OUT, new ContextConfig(contextName, origin))))
)
.expectError(BlockException.class)
.verify();

ClusterNode cn = ClusterBuilderSlot.getClusterNode(resourceName);
assertNotNull(cn);
assertEquals(0, cn.passQps(), 0.01);
assertEquals(1, cn.blockRequest());
assertTrue(Constants.ROOT.getChildList()
.stream()
.filter(node -> node instanceof EntranceNode)
.map(e -> (EntranceNode)e)
.anyMatch(e -> e.getId().getName().equals(contextName))
);

FlowRuleManager.loadRules(new ArrayList<>());
}

@Test
public void testFluxToMonoNextThenCancelSuccess() {
String resourceName = createResourceName("testFluxToMonoNextThenCancelSuccess");
StepVerifier.create(Flux.range(1, 10)
.map(e -> e * 2)
.next()
.transform(new SentinelReactorTransformer<>(resourceName)))
.expectNext(2)
.verifyComplete();

ClusterNode cn = ClusterBuilderSlot.getClusterNode(resourceName);
assertNotNull(cn);
assertEquals(1, cn.passQps(), 0.01);
}

@Test
public void testEmitSingleLongTimeRt() {
String resourceName = createResourceName("testEmitSingleLongTimeRt");
StepVerifier.create(Mono.just(2)
.delayElement(Duration.ofMillis(1000))
.map(e -> e * 2)
.transform(new SentinelReactorTransformer<>(resourceName)))
.expectNext(4)
.verifyComplete();

ClusterNode cn = ClusterBuilderSlot.getClusterNode(resourceName);
assertNotNull(cn);
assertEquals(1000, cn.avgRt(), 20);
}

@Test
public void testEmitEmptySuccess() {
String resourceName = createResourceName("testEmitEmptySuccess");
StepVerifier.create(Mono.empty()
.transform(new SentinelReactorTransformer<>(resourceName)))
.verifyComplete();

ClusterNode cn = ClusterBuilderSlot.getClusterNode(resourceName);
assertNotNull(cn);
assertEquals(1, cn.passQps(), 0.01);
}

@Test
public void testEmitSingleSuccess() {
String resourceName = createResourceName("testEmitSingleSuccess");
StepVerifier.create(Mono.just(1)
.transform(new SentinelReactorTransformer<>(resourceName)))
.expectNext(1)
.verifyComplete();

ClusterNode cn = ClusterBuilderSlot.getClusterNode(resourceName);
assertNotNull(cn);
assertEquals(1, cn.passQps(), 0.01);
}

@Test
public void testEmitSingleValueWhenFlowControlTriggered() {
String resourceName = createResourceName("testEmitSingleValueWhenFlowControlTriggered");
FlowRuleManager.loadRules(Collections.singletonList(
new FlowRule(resourceName).setCount(0)
));
StepVerifier.create(Mono.just(1)
.map(e -> e * 2)
.transform(new SentinelReactorTransformer<>(resourceName)))
.expectError(BlockException.class)
.verify();

ClusterNode cn = ClusterBuilderSlot.getClusterNode(resourceName);
assertNotNull(cn);
assertEquals(0, cn.passQps(), 0.01);
assertEquals(1, cn.blockRequest());

FlowRuleManager.loadRules(new ArrayList<>());
}

@Test
public void testEmitExceptionWhenFlowControlTriggered() {
String resourceName = createResourceName("testEmitExceptionWhenFlowControlTriggered");
FlowRuleManager.loadRules(Collections.singletonList(
new FlowRule(resourceName).setCount(0)
));
StepVerifier.create(Mono.error(new IllegalStateException("some"))
.transform(new SentinelReactorTransformer<>(resourceName)))
.expectError(BlockException.class)
.verify();

ClusterNode cn = ClusterBuilderSlot.getClusterNode(resourceName);
assertNotNull(cn);
assertEquals(0, cn.passQps(), 0.01);
assertEquals(1, cn.blockRequest());

FlowRuleManager.loadRules(new ArrayList<>());
}

@Test
public void testEmitSingleError() {
String resourceName = createResourceName("testEmitSingleError");
StepVerifier.create(Mono.error(new IllegalStateException())
.transform(new SentinelReactorTransformer<>(resourceName)))
.expectError(IllegalStateException.class)
.verify();

ClusterNode cn = ClusterBuilderSlot.getClusterNode(resourceName);
assertNotNull(cn);
assertEquals(1, cn.totalException());
}

private String createResourceName(String resourceName) {
return "reactor_test_mono_" + resourceName;
}
}

+ 74
- 0
sentinel-adapter/sentinel-reactor-adapter/src/test/java/com/alibaba/csp/sentinel/adapter/reactor/ReactorSphUTest.java View File

@@ -0,0 +1,74 @@
package com.alibaba.csp.sentinel.adapter.reactor;

import java.util.ArrayList;
import java.util.Collections;

import com.alibaba.csp.sentinel.node.ClusterNode;
import com.alibaba.csp.sentinel.slots.block.BlockException;
import com.alibaba.csp.sentinel.slots.block.flow.FlowRule;
import com.alibaba.csp.sentinel.slots.block.flow.FlowRuleManager;
import com.alibaba.csp.sentinel.slots.clusterbuilder.ClusterBuilderSlot;

import org.junit.Test;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import reactor.test.StepVerifier;

import static org.junit.Assert.*;

/**
* @author Eric Zhao
*/
public class ReactorSphUTest {

@Test
public void testReactorEntryNormalWhenFlowControlTriggered() {
String resourceName = createResourceName("testReactorEntryNormalWhenFlowControlTriggered");
FlowRuleManager.loadRules(Collections.singletonList(
new FlowRule(resourceName).setCount(0)
));
StepVerifier.create(ReactorSphU.entryWith(resourceName, Mono.just(60))
.subscribeOn(Schedulers.elastic())
.map(e -> e * 3))
.expectError(BlockException.class)
.verify();

ClusterNode cn = ClusterBuilderSlot.getClusterNode(resourceName);
assertNotNull(cn);
assertEquals(0, cn.passQps(), 0.01);
assertEquals(1, cn.blockRequest());

FlowRuleManager.loadRules(new ArrayList<>());
}

@Test
public void testReactorEntryWithCommon() {
String resourceName = createResourceName("testReactorEntryWithCommon");
StepVerifier.create(ReactorSphU.entryWith(resourceName, Mono.just(60))
.subscribeOn(Schedulers.elastic())
.map(e -> e * 3))
.expectNext(180)
.verifyComplete();

ClusterNode cn = ClusterBuilderSlot.getClusterNode(resourceName);
assertNotNull(cn);
assertEquals(1, cn.passQps(), 0.01);
}

@Test
public void testReactorEntryWithBizException() {
String resourceName = createResourceName("testReactorEntryWithBizException");
StepVerifier.create(ReactorSphU.entryWith(resourceName, Mono.error(new IllegalStateException())))
.expectError(IllegalStateException.class)
.verify();

ClusterNode cn = ClusterBuilderSlot.getClusterNode(resourceName);
assertNotNull(cn);
assertEquals(1, cn.passQps(), 0.01);
assertEquals(1, cn.totalException());
}

private String createResourceName(String resourceName) {
return "reactor_test_SphU_" + resourceName;
}
}

Loading…
Cancel
Save