@@ -15,28 +15,20 @@ | |||
*/ | |||
package com.alibaba.csp.sentinel.adapter.grpc; | |||
import javax.annotation.Nullable; | |||
import com.alibaba.csp.sentinel.Entry; | |||
import com.alibaba.csp.sentinel.AsyncEntry; | |||
import com.alibaba.csp.sentinel.EntryType; | |||
import com.alibaba.csp.sentinel.SphU; | |||
import com.alibaba.csp.sentinel.Tracer; | |||
import com.alibaba.csp.sentinel.context.ContextUtil; | |||
import com.alibaba.csp.sentinel.slots.block.BlockException; | |||
import io.grpc.CallOptions; | |||
import io.grpc.Channel; | |||
import io.grpc.ClientCall; | |||
import io.grpc.ClientInterceptor; | |||
import io.grpc.ForwardingClientCall; | |||
import io.grpc.*; | |||
import io.grpc.ForwardingClientCallListener.SimpleForwardingClientCallListener; | |||
import io.grpc.Metadata; | |||
import io.grpc.MethodDescriptor; | |||
import io.grpc.Status; | |||
import javax.annotation.Nullable; | |||
/** | |||
* <p>gRPC client interceptor for Sentinel. Currently it only works with unary methods.</p> | |||
* | |||
* <p> | |||
* Example code: | |||
* <pre> | |||
* public class ServiceClient { | |||
@@ -52,7 +44,7 @@ import io.grpc.Status; | |||
* | |||
* } | |||
* </pre> | |||
* | |||
* <p> | |||
* For server interceptor, see {@link SentinelGrpcServerInterceptor}. | |||
* | |||
* @author Eric Zhao | |||
@@ -60,18 +52,20 @@ import io.grpc.Status; | |||
public class SentinelGrpcClientInterceptor implements ClientInterceptor { | |||
private static final Status FLOW_CONTROL_BLOCK = Status.UNAVAILABLE.withDescription( | |||
"Flow control limit exceeded (client side)"); | |||
"Flow control limit exceeded (client side)"); | |||
@Override | |||
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> methodDescriptor, | |||
CallOptions callOptions, Channel channel) { | |||
String resourceName = methodDescriptor.getFullMethodName(); | |||
Entry entry = null; | |||
AsyncEntry asyncEntry = null; | |||
try { | |||
entry = SphU.entry(resourceName, EntryType.OUT); | |||
asyncEntry = SphU.asyncEntry(resourceName, EntryType.OUT); | |||
final AsyncEntry tempEntry = asyncEntry; | |||
// Allow access, forward the call. | |||
return new ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>( | |||
channel.newCall(methodDescriptor, callOptions)) { | |||
channel.newCall(methodDescriptor, callOptions)) { | |||
@Override | |||
public void start(Listener<RespT> responseListener, Metadata headers) { | |||
super.start(new SimpleForwardingClientCallListener<RespT>(responseListener) { | |||
@@ -85,18 +79,14 @@ public class SentinelGrpcClientInterceptor implements ClientInterceptor { | |||
super.onClose(status, trailers); | |||
// Record the exception metrics. | |||
if (!status.isOk()) { | |||
recordException(status.asRuntimeException()); | |||
recordException(status.asRuntimeException(), tempEntry); | |||
} | |||
tempEntry.exit(); | |||
} | |||
}, headers); | |||
} | |||
@Override | |||
public void cancel(@Nullable String message, @Nullable Throwable cause) { | |||
super.cancel(message, cause); | |||
// Record the exception metrics. | |||
recordException(cause); | |||
} | |||
}; | |||
} catch (BlockException e) { | |||
// Flow control threshold exceeded, block the call. | |||
@@ -126,14 +116,25 @@ public class SentinelGrpcClientInterceptor implements ClientInterceptor { | |||
} | |||
}; | |||
} finally { | |||
if (entry != null) { | |||
entry.exit(); | |||
} catch (RuntimeException e) { | |||
//catch the RuntimeException newCall throws, | |||
// entry is guaranteed to exit | |||
if (asyncEntry != null) { | |||
asyncEntry.exit(); | |||
} | |||
throw e; | |||
} | |||
} | |||
private void recordException(Throwable t) { | |||
Tracer.trace(t); | |||
private void recordException(final Throwable t, AsyncEntry asyncEntry) { | |||
ContextUtil.runOnContext(asyncEntry.getAsyncContext(), new Runnable() { | |||
@Override | |||
public void run() { | |||
Tracer.trace(t); | |||
} | |||
}); | |||
} | |||
} |
@@ -15,25 +15,17 @@ | |||
*/ | |||
package com.alibaba.csp.sentinel.adapter.grpc; | |||
import com.alibaba.csp.sentinel.Entry; | |||
import com.alibaba.csp.sentinel.AsyncEntry; | |||
import com.alibaba.csp.sentinel.EntryType; | |||
import com.alibaba.csp.sentinel.SphU; | |||
import com.alibaba.csp.sentinel.Tracer; | |||
import com.alibaba.csp.sentinel.context.ContextUtil; | |||
import com.alibaba.csp.sentinel.slots.block.BlockException; | |||
import io.grpc.ForwardingServerCall; | |||
import io.grpc.ForwardingServerCallListener; | |||
import io.grpc.Metadata; | |||
import io.grpc.ServerCall; | |||
import io.grpc.ServerCall.Listener; | |||
import io.grpc.ServerCallHandler; | |||
import io.grpc.ServerInterceptor; | |||
import io.grpc.Status; | |||
import io.grpc.*; | |||
/** | |||
* <p>gRPC server interceptor for Sentinel. Currently it only works with unary methods.</p> | |||
* | |||
* <p> | |||
* Example code: | |||
* <pre> | |||
* Server server = ServerBuilder.forPort(port) | |||
@@ -41,7 +33,7 @@ import io.grpc.Status; | |||
* .intercept(new SentinelGrpcServerInterceptor()) // Add the server interceptor. | |||
* .build(); | |||
* </pre> | |||
* | |||
* <p> | |||
* For client interceptor, see {@link SentinelGrpcClientInterceptor}. | |||
* | |||
* @author Eric Zhao | |||
@@ -49,42 +41,65 @@ import io.grpc.Status; | |||
public class SentinelGrpcServerInterceptor implements ServerInterceptor { | |||
private static final Status FLOW_CONTROL_BLOCK = Status.UNAVAILABLE.withDescription( | |||
"Flow control limit exceeded (server side)"); | |||
"Flow control limit exceeded (server side)"); | |||
@Override | |||
public <ReqT, RespT> Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> serverCall, Metadata metadata, | |||
ServerCallHandler<ReqT, RespT> serverCallHandler) { | |||
String resourceName = serverCall.getMethodDescriptor().getFullMethodName(); | |||
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call, Metadata headers, ServerCallHandler<ReqT, RespT> next) { | |||
String resourceName = call.getMethodDescriptor().getFullMethodName(); | |||
// Remote address: serverCall.getAttributes().get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR); | |||
Entry entry = null; | |||
AsyncEntry entry = null; | |||
try { | |||
ContextUtil.enter(resourceName); | |||
entry = SphU.entry(resourceName, EntryType.IN); | |||
entry = SphU.asyncEntry(resourceName, EntryType.IN); | |||
// Allow access, forward the call. | |||
final AsyncEntry tempEntry = entry; | |||
return new ForwardingServerCallListener.SimpleForwardingServerCallListener<ReqT>( | |||
serverCallHandler.startCall( | |||
new ForwardingServerCall.SimpleForwardingServerCall<ReqT, RespT>(serverCall) { | |||
@Override | |||
public void close(Status status, Metadata trailers) { | |||
super.close(status, trailers); | |||
// Record the exception metrics. | |||
if (!status.isOk()) { | |||
recordException(status.asRuntimeException()); | |||
} | |||
} | |||
}, metadata)) {}; | |||
next.startCall( | |||
new ForwardingServerCall.SimpleForwardingServerCall<ReqT, RespT>(call) { | |||
@Override | |||
public void close(Status status, Metadata trailers) { | |||
super.close(status, trailers); | |||
// Record the exception metrics. | |||
if (!status.isOk()) { | |||
recordException(status.asException(), tempEntry); | |||
} | |||
//entry exit when the call be closed | |||
tempEntry.exit(); | |||
} | |||
}, headers)) { | |||
/** | |||
* if call was canceled, onCancel will be called. and the close will not be called | |||
* so the server is encouraged to abort processing to save resources by onCancel | |||
* @see ServerCall.Listener#onCancel() | |||
*/ | |||
@Override | |||
public void onCancel() { | |||
super.onCancel(); | |||
// request has be canceled, entry should exit | |||
tempEntry.exit(); | |||
} | |||
}; | |||
} catch (BlockException e) { | |||
serverCall.close(FLOW_CONTROL_BLOCK, new Metadata()); | |||
return new ServerCall.Listener<ReqT>() {}; | |||
} finally { | |||
call.close(FLOW_CONTROL_BLOCK, new Metadata()); | |||
return new ServerCall.Listener<ReqT>() { | |||
}; | |||
} catch (RuntimeException e) { | |||
//catch the RuntimeException startCall throws, | |||
// entry is guaranteed to exit | |||
if (entry != null) { | |||
entry.exit(); | |||
} | |||
ContextUtil.exit(); | |||
throw e; | |||
} | |||
} | |||
private void recordException(Throwable t) { | |||
Tracer.trace(t); | |||
private void recordException(final Throwable t, AsyncEntry tempEntry) { | |||
ContextUtil.runOnContext(tempEntry.getAsyncContext(), new Runnable() { | |||
@Override | |||
public void run() { | |||
Tracer.trace(t); | |||
} | |||
}); | |||
} | |||
} |
@@ -37,16 +37,16 @@ final class FooServiceClient { | |||
FooServiceClient(String host, int port) { | |||
this.channel = ManagedChannelBuilder.forAddress(host, port) | |||
.usePlaintext() | |||
.build(); | |||
.usePlaintext() | |||
.build(); | |||
this.blockingStub = FooServiceGrpc.newBlockingStub(this.channel); | |||
} | |||
FooServiceClient(String host, int port, ClientInterceptor interceptor) { | |||
this.channel = ManagedChannelBuilder.forAddress(host, port) | |||
.usePlaintext() | |||
.intercept(interceptor) | |||
.build(); | |||
.usePlaintext() | |||
.intercept(interceptor) | |||
.build(); | |||
this.blockingStub = FooServiceGrpc.newBlockingStub(this.channel); | |||
} | |||
@@ -57,6 +57,7 @@ final class FooServiceClient { | |||
return blockingStub.sayHello(request); | |||
} | |||
FooResponse anotherHello(FooRequest request) { | |||
if (request == null) { | |||
throw new IllegalArgumentException("Request cannot be null"); | |||
@@ -64,6 +65,21 @@ final class FooServiceClient { | |||
return blockingStub.anotherHello(request); | |||
} | |||
FooResponse helloWithEx(FooRequest request) { | |||
if (request == null) { | |||
throw new IllegalArgumentException("Request cannot be null"); | |||
} | |||
return blockingStub.helloWithEx(request); | |||
} | |||
FooResponse anotherHelloWithEx(FooRequest request) { | |||
if (request == null) { | |||
throw new IllegalArgumentException("Request cannot be null"); | |||
} | |||
return blockingStub.anotherHelloWithEx(request); | |||
} | |||
void shutdown() throws InterruptedException { | |||
channel.shutdown().awaitTermination(1, TimeUnit.SECONDS); | |||
} | |||
@@ -18,7 +18,6 @@ package com.alibaba.csp.sentinel.adapter.grpc; | |||
import com.alibaba.csp.sentinel.adapter.grpc.gen.FooRequest; | |||
import com.alibaba.csp.sentinel.adapter.grpc.gen.FooResponse; | |||
import com.alibaba.csp.sentinel.adapter.grpc.gen.FooServiceGrpc; | |||
import io.grpc.stub.StreamObserver; | |||
/** | |||
@@ -28,7 +27,9 @@ class FooServiceImpl extends FooServiceGrpc.FooServiceImplBase { | |||
@Override | |||
public void sayHello(FooRequest request, StreamObserver<FooResponse> responseObserver) { | |||
String message = String.format("Hello %s! Your ID is %d.", request.getName(), request.getId()); | |||
FooResponse response = FooResponse.newBuilder().setMessage(message).build(); | |||
responseObserver.onNext(response); | |||
responseObserver.onCompleted(); | |||
@@ -36,6 +37,29 @@ class FooServiceImpl extends FooServiceGrpc.FooServiceImplBase { | |||
@Override | |||
public void anotherHello(FooRequest request, StreamObserver<FooResponse> responseObserver) { | |||
String message = String.format("Good day, %s (%d)", request.getName(), request.getId()); | |||
FooResponse response = FooResponse.newBuilder().setMessage(message).build(); | |||
responseObserver.onNext(response); | |||
responseObserver.onCompleted(); | |||
} | |||
@Override | |||
public void helloWithEx(FooRequest request, StreamObserver<FooResponse> responseObserver) { | |||
if (request.getId() == -1) { | |||
responseObserver.onError(new IllegalAccessException("The id is error")); | |||
return; | |||
} | |||
String message = String.format("Good day, %s (%d)", request.getName(), request.getId()); | |||
FooResponse response = FooResponse.newBuilder().setMessage(message).build(); | |||
responseObserver.onNext(response); | |||
responseObserver.onCompleted(); | |||
} | |||
@Override | |||
public void anotherHelloWithEx(FooRequest request, StreamObserver<FooResponse> responseObserver) { | |||
if (request.getId() == -1) { | |||
responseObserver.onError(new IllegalAccessException("The id is error")); | |||
return; | |||
} | |||
String message = String.format("Good day, %s (%d)", request.getName(), request.getId()); | |||
FooResponse response = FooResponse.newBuilder().setMessage(message).build(); | |||
responseObserver.onNext(response); | |||
@@ -19,6 +19,7 @@ import io.grpc.Server; | |||
import io.grpc.ServerBuilder; | |||
import java.io.IOException; | |||
import java.util.concurrent.Executors; | |||
class GrpcTestServer { | |||
@@ -0,0 +1,98 @@ | |||
package com.alibaba.csp.sentinel.adapter.grpc; | |||
import com.alibaba.csp.sentinel.EntryType; | |||
import com.alibaba.csp.sentinel.node.ClusterNode; | |||
import com.alibaba.csp.sentinel.slots.block.RuleConstant; | |||
import com.alibaba.csp.sentinel.slots.block.degrade.DegradeRule; | |||
import com.alibaba.csp.sentinel.slots.block.degrade.DegradeRuleManager; | |||
import com.alibaba.csp.sentinel.adapter.grpc.gen.FooRequest; | |||
import com.alibaba.csp.sentinel.adapter.grpc.gen.FooResponse; | |||
import com.alibaba.csp.sentinel.slots.block.flow.FlowRuleManager; | |||
import com.alibaba.csp.sentinel.slots.clusterbuilder.ClusterBuilderSlot; | |||
import io.grpc.StatusRuntimeException; | |||
import org.junit.After; | |||
import org.junit.Test; | |||
import java.io.IOException; | |||
import java.util.Collections; | |||
import static org.junit.Assert.*; | |||
/** | |||
* @author zhengzechao | |||
* @date 2018/12/7 | |||
* Email ooczzoo@gmail.com | |||
*/ | |||
public class SentinelGrpcClientInterceptorDegradeTest { | |||
private final String resourceName = "com.alibaba.sentinel.examples.FooService/helloWithEx"; | |||
private final int threshold = 2; | |||
private final GrpcTestServer server = new GrpcTestServer(); | |||
private final int timeWindow = 10; | |||
private void configureDegradeRule(int count) { | |||
DegradeRule rule = new DegradeRule() | |||
.setCount(count) | |||
.setGrade(RuleConstant.DEGRADE_GRADE_EXCEPTION_COUNT) | |||
.setResource(resourceName) | |||
.setLimitApp("default") | |||
.as(DegradeRule.class) | |||
.setTimeWindow(timeWindow); | |||
DegradeRuleManager.loadRules(Collections.singletonList(rule)); | |||
} | |||
private boolean sendRequest(FooServiceClient client) { | |||
try { | |||
FooResponse response = client.helloWithEx(FooRequest.newBuilder().setName("Sentinel").setId(666).build()); | |||
System.out.println("Response: " + response); | |||
return true; | |||
} catch (StatusRuntimeException ex) { | |||
System.out.println("Blocked, cause: " + ex.getMessage()); | |||
return false; | |||
} | |||
} | |||
@Test | |||
public void testGrpcClientInterceptor_degrade() throws IOException { | |||
final int port = 19316; | |||
configureDegradeRule(1); | |||
server.start(port, false); | |||
FooServiceClient client = new FooServiceClient("localhost", port, new SentinelGrpcClientInterceptor()); | |||
assertFalse(sendErrorRequest(client)); | |||
ClusterNode clusterNode = ClusterBuilderSlot.getClusterNode(resourceName, EntryType.OUT); | |||
assertNotNull(clusterNode); | |||
assertEquals(1, clusterNode.exceptionQps()); | |||
// The second request will be blocked. | |||
assertFalse(sendRequest(client)); | |||
assertEquals(1, clusterNode.blockRequest()); | |||
server.stop(); | |||
} | |||
private boolean sendErrorRequest(FooServiceClient client) { | |||
try { | |||
FooResponse response = client.helloWithEx(FooRequest.newBuilder().setName("Sentinel").setId(-1).build()); | |||
System.out.println("Response: " + response); | |||
return true; | |||
} catch (StatusRuntimeException ex) { | |||
System.out.println("Blocked, cause: " + ex.getMessage()); | |||
return false; | |||
} | |||
} | |||
@After | |||
public void cleanUp() { | |||
FlowRuleManager.loadRules(null); | |||
ClusterBuilderSlot.getClusterNodeMap().clear(); | |||
} | |||
} |
@@ -0,0 +1,109 @@ | |||
package com.alibaba.csp.sentinel.adapter.grpc; | |||
import com.alibaba.csp.sentinel.EntryType; | |||
import com.alibaba.csp.sentinel.node.ClusterNode; | |||
import com.alibaba.csp.sentinel.adapter.grpc.gen.FooRequest; | |||
import com.alibaba.csp.sentinel.adapter.grpc.gen.FooResponse; | |||
import com.alibaba.csp.sentinel.slots.block.RuleConstant; | |||
import com.alibaba.csp.sentinel.slots.block.degrade.DegradeRule; | |||
import com.alibaba.csp.sentinel.slots.block.degrade.DegradeRuleManager; | |||
import com.alibaba.csp.sentinel.slots.block.flow.FlowRuleManager; | |||
import com.alibaba.csp.sentinel.slots.clusterbuilder.ClusterBuilderSlot; | |||
import io.grpc.StatusRuntimeException; | |||
import org.junit.After; | |||
import org.junit.Test; | |||
import java.io.IOException; | |||
import java.util.Collections; | |||
import java.util.concurrent.CountDownLatch; | |||
import static org.junit.Assert.*; | |||
/** | |||
* @author zhengzechao | |||
* @date 2018/12/7 | |||
* Email ooczzoo@gmail.com | |||
*/ | |||
public class SentinelGrpcServerInterceptorDegradeTest { | |||
private final String resourceName = "com.alibaba.sentinel.examples.FooService/anotherHelloWithEx"; | |||
private final int threshold = 4; | |||
private final GrpcTestServer server = new GrpcTestServer(); | |||
private final int timeWindow = 10; | |||
private FooServiceClient client; | |||
private void configureDegradeRule(int count) { | |||
DegradeRule rule = new DegradeRule() | |||
.setCount(count) | |||
.setGrade(RuleConstant.DEGRADE_GRADE_EXCEPTION_COUNT) | |||
.setResource(resourceName) | |||
.setLimitApp("default") | |||
.as(DegradeRule.class) | |||
.setTimeWindow(timeWindow); | |||
DegradeRuleManager.loadRules(Collections.singletonList(rule)); | |||
} | |||
private boolean sendRequest() { | |||
try { | |||
FooResponse response = client.anotherHelloWithEx(FooRequest.newBuilder().setName("Sentinel").setId(666).build()); | |||
System.out.println("Response: " + response); | |||
return true; | |||
} catch (StatusRuntimeException ex) { | |||
System.out.println("Blocked, cause: " + ex.getMessage()); | |||
return false; | |||
} | |||
} | |||
@Test | |||
public void testGrpcServerInterceptor_degrade_fail_threads() throws IOException, InterruptedException { | |||
final int port = 19349; | |||
client = new FooServiceClient("localhost", port); | |||
server.start(port, true); | |||
// exception count = 1 | |||
configureDegradeRule(20); | |||
final CountDownLatch latch = new CountDownLatch(20); | |||
for (int i = 0; i < 20; i++) { | |||
new Thread(new Runnable() { | |||
@Override | |||
public void run() { | |||
assertFalse(sendErrorRequest()); | |||
latch.countDown(); | |||
} | |||
}).start(); | |||
} | |||
latch.await(); | |||
assertFalse(sendRequest()); | |||
ClusterNode clusterNode = ClusterBuilderSlot.getClusterNode(resourceName, EntryType.IN); | |||
assertEquals(20, clusterNode.totalException()); | |||
assertEquals(1, clusterNode.blockRequest()); | |||
} | |||
private boolean sendErrorRequest() { | |||
try { | |||
FooResponse response = client.anotherHelloWithEx(FooRequest.newBuilder().setName("Sentinel").setId(-1).build()); | |||
System.out.println("Response: " + response); | |||
return true; | |||
} catch (StatusRuntimeException ex) { | |||
System.out.println("Blocked, cause: " + ex.getMessage()); | |||
return false; | |||
} | |||
} | |||
@After | |||
public void cleanUp() { | |||
FlowRuleManager.loadRules(null); | |||
ClusterBuilderSlot.getClusterNodeMap().clear(); | |||
} | |||
} |
@@ -17,7 +17,14 @@ message FooResponse { | |||
// Example service definition. | |||
service FooService { | |||
rpc sayHello(FooRequest) returns (FooResponse) {} | |||
rpc anotherHello(FooRequest) returns (FooResponse) {} | |||
rpc helloWithEx(FooRequest) returns (FooResponse) {} | |||
rpc anotherHelloWithEx(FooRequest) returns (FooResponse) {} | |||
} |