From c8df7e7456dd9ce543df490fc6495632d60837b3 Mon Sep 17 00:00:00 2001 From: chenledong <44924079+chenledong@users.noreply.github.com> Date: Fri, 16 Aug 2019 17:40:11 +0800 Subject: [PATCH] Improve RT statistic and exception tracing in Sentinel gRPC adapter (#995) --- .../sentinel-grpc-adapter/README.md | 2 - .../grpc/SentinelGrpcClientInterceptor.java | 86 ++++++------- .../grpc/SentinelGrpcServerInterceptor.java | 66 +++++----- .../adapter/grpc/FooServiceClient.java | 22 +--- .../sentinel/adapter/grpc/FooServiceImpl.java | 74 +++++++----- ...tinelGrpcClientInterceptorDegradeTest.java | 104 ---------------- .../SentinelGrpcClientInterceptorTest.java | 63 ++++++---- ...tinelGrpcServerInterceptorDegradeTest.java | 114 ------------------ .../SentinelGrpcServerInterceptorTest.java | 57 +++++---- .../src/test/proto/example.proto | 10 +- 10 files changed, 195 insertions(+), 403 deletions(-) delete mode 100644 sentinel-adapter/sentinel-grpc-adapter/src/test/java/com/alibaba/csp/sentinel/adapter/grpc/SentinelGrpcClientInterceptorDegradeTest.java delete mode 100644 sentinel-adapter/sentinel-grpc-adapter/src/test/java/com/alibaba/csp/sentinel/adapter/grpc/SentinelGrpcServerInterceptorDegradeTest.java diff --git a/sentinel-adapter/sentinel-grpc-adapter/README.md b/sentinel-adapter/sentinel-grpc-adapter/README.md index 38d60030..e3c3aa27 100755 --- a/sentinel-adapter/sentinel-grpc-adapter/README.md +++ b/sentinel-adapter/sentinel-grpc-adapter/README.md @@ -3,7 +3,6 @@ Sentinel gRPC Adapter provides client and server interceptor for gRPC services. > Note that currently the interceptor only supports unary methods in gRPC. -> In some circumstances (e.g. asynchronous call), the RT metrics might not be accurate. ## Client Interceptor @@ -35,4 +34,3 @@ Server server = ServerBuilder.forPort(port) .intercept(new SentinelGrpcServerInterceptor()) // Add the server interceptor. .build(); ``` - diff --git a/sentinel-adapter/sentinel-grpc-adapter/src/main/java/com/alibaba/csp/sentinel/adapter/grpc/SentinelGrpcClientInterceptor.java b/sentinel-adapter/sentinel-grpc-adapter/src/main/java/com/alibaba/csp/sentinel/adapter/grpc/SentinelGrpcClientInterceptor.java index a265c95d..20fe36b1 100755 --- a/sentinel-adapter/sentinel-grpc-adapter/src/main/java/com/alibaba/csp/sentinel/adapter/grpc/SentinelGrpcClientInterceptor.java +++ b/sentinel-adapter/sentinel-grpc-adapter/src/main/java/com/alibaba/csp/sentinel/adapter/grpc/SentinelGrpcClientInterceptor.java @@ -15,16 +15,23 @@ */ package com.alibaba.csp.sentinel.adapter.grpc; -import com.alibaba.csp.sentinel.AsyncEntry; +import com.alibaba.csp.sentinel.Entry; import com.alibaba.csp.sentinel.EntryType; import com.alibaba.csp.sentinel.SphU; import com.alibaba.csp.sentinel.Tracer; -import com.alibaba.csp.sentinel.context.ContextUtil; import com.alibaba.csp.sentinel.slots.block.BlockException; -import io.grpc.*; -import io.grpc.ForwardingClientCallListener.SimpleForwardingClientCallListener; +import io.grpc.CallOptions; +import io.grpc.Channel; +import io.grpc.ClientCall; +import io.grpc.ClientInterceptor; +import io.grpc.ForwardingClientCall; +import io.grpc.ForwardingClientCallListener; +import io.grpc.Metadata; +import io.grpc.MethodDescriptor; +import io.grpc.Status; import javax.annotation.Nullable; +import java.util.concurrent.atomic.AtomicReference; /** *

gRPC client interceptor for Sentinel. Currently it only works with unary methods.

@@ -50,43 +57,54 @@ import javax.annotation.Nullable; * @author Eric Zhao */ public class SentinelGrpcClientInterceptor implements ClientInterceptor { - private static final Status FLOW_CONTROL_BLOCK = Status.UNAVAILABLE.withDescription( "Flow control limit exceeded (client side)"); @Override public ClientCall interceptCall(MethodDescriptor methodDescriptor, CallOptions callOptions, Channel channel) { - String resourceName = methodDescriptor.getFullMethodName(); - AsyncEntry asyncEntry = null; + String fullMethodName = methodDescriptor.getFullMethodName(); + Entry entry = null; try { - asyncEntry = SphU.asyncEntry(resourceName, EntryType.OUT); - - final AsyncEntry tempEntry = asyncEntry; + entry = SphU.asyncEntry(fullMethodName, EntryType.OUT); + final AtomicReference atomicReferenceEntry = new AtomicReference<>(entry); // Allow access, forward the call. return new ForwardingClientCall.SimpleForwardingClientCall( channel.newCall(methodDescriptor, callOptions)) { @Override public void start(Listener responseListener, Metadata headers) { - super.start(new SimpleForwardingClientCallListener(responseListener) { - @Override - public void onReady() { - super.onReady(); - } - + super.start(new ForwardingClientCallListener.SimpleForwardingClientCallListener(responseListener) { @Override public void onClose(Status status, Metadata trailers) { - super.onClose(status, trailers); - // Record the exception metrics. - if (!status.isOk()) { - recordException(status.asRuntimeException(), tempEntry); + Entry entry = atomicReferenceEntry.get(); + if (entry != null) { + // Record the exception metrics. + if (!status.isOk()) { + Tracer.traceEntry(status.asRuntimeException(), entry); + } + entry.exit(); + atomicReferenceEntry.set(null); } - tempEntry.exit(); + super.onClose(status, trailers); } }, headers); } - + /** + * Some Exceptions will only call cancel. + */ + @Override + public void cancel(@Nullable String message, @Nullable Throwable cause) { + Entry entry = atomicReferenceEntry.get(); + // Some Exceptions will call onClose and cancel. + if (entry != null) { + // Record the exception metrics. + Tracer.traceEntry(cause, entry); + entry.exit(); + atomicReferenceEntry.set(null); + } + super.cancel(message, cause); + } }; } catch (BlockException e) { // Flow control threshold exceeded, block the call. @@ -98,43 +116,27 @@ public class SentinelGrpcClientInterceptor implements ClientInterceptor { @Override public void request(int numMessages) { - } @Override public void cancel(@Nullable String message, @Nullable Throwable cause) { - } @Override public void halfClose() { - } @Override public void sendMessage(ReqT message) { - } }; - } catch (RuntimeException e) { - //catch the RuntimeException newCall throws, - // entry is guaranteed to exit - if (asyncEntry != null) { - asyncEntry.exit(); + // Catch the RuntimeException newCall throws, entry is guaranteed to exit. + if (entry != null) { + Tracer.traceEntry(e, entry); + entry.exit(); } throw e; - - } } - - private void recordException(final Throwable t, AsyncEntry asyncEntry) { - ContextUtil.runOnContext(asyncEntry.getAsyncContext(), new Runnable() { - @Override - public void run() { - Tracer.trace(t); - } - }); - } } diff --git a/sentinel-adapter/sentinel-grpc-adapter/src/main/java/com/alibaba/csp/sentinel/adapter/grpc/SentinelGrpcServerInterceptor.java b/sentinel-adapter/sentinel-grpc-adapter/src/main/java/com/alibaba/csp/sentinel/adapter/grpc/SentinelGrpcServerInterceptor.java index a9b118ba..ca1d558d 100755 --- a/sentinel-adapter/sentinel-grpc-adapter/src/main/java/com/alibaba/csp/sentinel/adapter/grpc/SentinelGrpcServerInterceptor.java +++ b/sentinel-adapter/sentinel-grpc-adapter/src/main/java/com/alibaba/csp/sentinel/adapter/grpc/SentinelGrpcServerInterceptor.java @@ -15,13 +15,21 @@ */ package com.alibaba.csp.sentinel.adapter.grpc; -import com.alibaba.csp.sentinel.AsyncEntry; +import com.alibaba.csp.sentinel.Entry; import com.alibaba.csp.sentinel.EntryType; import com.alibaba.csp.sentinel.SphU; import com.alibaba.csp.sentinel.Tracer; -import com.alibaba.csp.sentinel.context.ContextUtil; import com.alibaba.csp.sentinel.slots.block.BlockException; -import io.grpc.*; +import io.grpc.ForwardingServerCall; +import io.grpc.ForwardingServerCallListener; +import io.grpc.Metadata; +import io.grpc.ServerCall; +import io.grpc.ServerCallHandler; +import io.grpc.ServerInterceptor; +import io.grpc.Status; +import io.grpc.StatusRuntimeException; + +import java.util.concurrent.atomic.AtomicReference; /** *

gRPC server interceptor for Sentinel. Currently it only works with unary methods.

@@ -39,45 +47,50 @@ import io.grpc.*; * @author Eric Zhao */ public class SentinelGrpcServerInterceptor implements ServerInterceptor { - private static final Status FLOW_CONTROL_BLOCK = Status.UNAVAILABLE.withDescription( "Flow control limit exceeded (server side)"); + private static final StatusRuntimeException STATUS_RUNTIME_EXCEPTION = new StatusRuntimeException(Status.CANCELLED); @Override public ServerCall.Listener interceptCall(ServerCall call, Metadata headers, ServerCallHandler next) { - String resourceName = call.getMethodDescriptor().getFullMethodName(); + String fullMethodName = call.getMethodDescriptor().getFullMethodName(); // Remote address: serverCall.getAttributes().get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR); - AsyncEntry entry = null; + Entry entry = null; try { - ContextUtil.enter(resourceName); - entry = SphU.asyncEntry(resourceName, EntryType.IN); + entry = SphU.asyncEntry(fullMethodName, EntryType.IN); + final AtomicReference atomicReferenceEntry = new AtomicReference<>(entry); // Allow access, forward the call. - final AsyncEntry tempEntry = entry; return new ForwardingServerCallListener.SimpleForwardingServerCallListener( next.startCall( new ForwardingServerCall.SimpleForwardingServerCall(call) { @Override public void close(Status status, Metadata trailers) { - super.close(status, trailers); - // Record the exception metrics. - if (!status.isOk()) { - recordException(status.asException(), tempEntry); + Entry entry = atomicReferenceEntry.get(); + if (entry != null) { + // Record the exception metrics. + if (!status.isOk()) { + Tracer.traceEntry(status.asRuntimeException(), entry); + } + //entry exit when the call be closed + entry.exit(); } - //entry exit when the call be closed - tempEntry.exit(); + super.close(status, trailers); } }, headers)) { - /** - * if call was canceled, onCancel will be called. and the close will not be called - * so the server is encouraged to abort processing to save resources by onCancel + * If call was canceled, onCancel will be called. and the close will not be called + * so the server is encouraged to abort processing to save resources by onCancel * @see ServerCall.Listener#onCancel() */ @Override public void onCancel() { + Entry entry = atomicReferenceEntry.get(); + if (entry != null) { + Tracer.traceEntry(STATUS_RUNTIME_EXCEPTION, entry); + entry.exit(); + atomicReferenceEntry.set(null); + } super.onCancel(); - // request has be canceled, entry should exit - tempEntry.exit(); } }; } catch (BlockException e) { @@ -85,21 +98,12 @@ public class SentinelGrpcServerInterceptor implements ServerInterceptor { return new ServerCall.Listener() { }; } catch (RuntimeException e) { - //catch the RuntimeException startCall throws, - // entry is guaranteed to exit + // Catch the RuntimeException startCall throws, entry is guaranteed to exit. if (entry != null) { + Tracer.traceEntry(e, entry); entry.exit(); } throw e; } } - - private void recordException(final Throwable t, AsyncEntry tempEntry) { - ContextUtil.runOnContext(tempEntry.getAsyncContext(), new Runnable() { - @Override - public void run() { - Tracer.trace(t); - } - }); - } } diff --git a/sentinel-adapter/sentinel-grpc-adapter/src/test/java/com/alibaba/csp/sentinel/adapter/grpc/FooServiceClient.java b/sentinel-adapter/sentinel-grpc-adapter/src/test/java/com/alibaba/csp/sentinel/adapter/grpc/FooServiceClient.java index c72e732a..2aaefae9 100755 --- a/sentinel-adapter/sentinel-grpc-adapter/src/test/java/com/alibaba/csp/sentinel/adapter/grpc/FooServiceClient.java +++ b/sentinel-adapter/sentinel-grpc-adapter/src/test/java/com/alibaba/csp/sentinel/adapter/grpc/FooServiceClient.java @@ -15,23 +15,21 @@ */ package com.alibaba.csp.sentinel.adapter.grpc; -import java.util.concurrent.TimeUnit; - import com.alibaba.csp.sentinel.adapter.grpc.gen.FooRequest; import com.alibaba.csp.sentinel.adapter.grpc.gen.FooResponse; import com.alibaba.csp.sentinel.adapter.grpc.gen.FooServiceGrpc; - import io.grpc.ClientInterceptor; import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; +import java.util.concurrent.TimeUnit; + /** * A simple wrapped gRPC client for FooService. * * @author Eric Zhao */ final class FooServiceClient { - private final ManagedChannel channel; private final FooServiceGrpc.FooServiceBlockingStub blockingStub; @@ -57,7 +55,6 @@ final class FooServiceClient { return blockingStub.sayHello(request); } - FooResponse anotherHello(FooRequest request) { if (request == null) { throw new IllegalArgumentException("Request cannot be null"); @@ -65,21 +62,6 @@ final class FooServiceClient { return blockingStub.anotherHello(request); } - FooResponse helloWithEx(FooRequest request) { - if (request == null) { - throw new IllegalArgumentException("Request cannot be null"); - } - return blockingStub.helloWithEx(request); - } - - - FooResponse anotherHelloWithEx(FooRequest request) { - if (request == null) { - throw new IllegalArgumentException("Request cannot be null"); - } - return blockingStub.anotherHelloWithEx(request); - } - void shutdown() throws InterruptedException { channel.shutdown().awaitTermination(1, TimeUnit.SECONDS); } diff --git a/sentinel-adapter/sentinel-grpc-adapter/src/test/java/com/alibaba/csp/sentinel/adapter/grpc/FooServiceImpl.java b/sentinel-adapter/sentinel-grpc-adapter/src/test/java/com/alibaba/csp/sentinel/adapter/grpc/FooServiceImpl.java index 832c6247..298ec44f 100755 --- a/sentinel-adapter/sentinel-grpc-adapter/src/test/java/com/alibaba/csp/sentinel/adapter/grpc/FooServiceImpl.java +++ b/sentinel-adapter/sentinel-grpc-adapter/src/test/java/com/alibaba/csp/sentinel/adapter/grpc/FooServiceImpl.java @@ -24,45 +24,53 @@ import io.grpc.stub.StreamObserver; * Implementation of FooService defined in proto. */ class FooServiceImpl extends FooServiceGrpc.FooServiceImplBase { - @Override public void sayHello(FooRequest request, StreamObserver responseObserver) { - - String message = String.format("Hello %s! Your ID is %d.", request.getName(), request.getId()); - - FooResponse response = FooResponse.newBuilder().setMessage(message).build(); - responseObserver.onNext(response); - responseObserver.onCompleted(); + int id = request.getId(); + switch (id) { + // Exception test + case -1: + responseObserver.onError(new IllegalAccessException("The id is error!")); + break; + // RT test + case -2: + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + responseObserver.onError(e); + break; + } + default: + String message = String.format("Hello %s! Your ID is %d.", request.getName(), id); + FooResponse response = FooResponse.newBuilder().setMessage(message).build(); + responseObserver.onNext(response); + responseObserver.onCompleted(); + break; + } } @Override public void anotherHello(FooRequest request, StreamObserver responseObserver) { - - String message = String.format("Good day, %s (%d)", request.getName(), request.getId()); - FooResponse response = FooResponse.newBuilder().setMessage(message).build(); - responseObserver.onNext(response); - responseObserver.onCompleted(); - } - @Override - public void helloWithEx(FooRequest request, StreamObserver responseObserver) { - if (request.getId() == -1) { - responseObserver.onError(new IllegalAccessException("The id is error")); - return; - } - String message = String.format("Good day, %s (%d)", request.getName(), request.getId()); - FooResponse response = FooResponse.newBuilder().setMessage(message).build(); - responseObserver.onNext(response); - responseObserver.onCompleted(); - } - @Override - public void anotherHelloWithEx(FooRequest request, StreamObserver responseObserver) { - if (request.getId() == -1) { - responseObserver.onError(new IllegalAccessException("The id is error")); - return; + int id = request.getId(); + switch (id) { + // Exception test + case -1: + responseObserver.onError(new IllegalAccessException("The id is error!")); + break; + // RT test + case -2: + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + responseObserver.onError(e); + break; + } + default: + String message = String.format("Good day, %s (%d)", request.getName(), id); + FooResponse response = FooResponse.newBuilder().setMessage(message).build(); + responseObserver.onNext(response); + responseObserver.onCompleted(); + break; } - String message = String.format("Good day, %s (%d)", request.getName(), request.getId()); - FooResponse response = FooResponse.newBuilder().setMessage(message).build(); - responseObserver.onNext(response); - responseObserver.onCompleted(); } } diff --git a/sentinel-adapter/sentinel-grpc-adapter/src/test/java/com/alibaba/csp/sentinel/adapter/grpc/SentinelGrpcClientInterceptorDegradeTest.java b/sentinel-adapter/sentinel-grpc-adapter/src/test/java/com/alibaba/csp/sentinel/adapter/grpc/SentinelGrpcClientInterceptorDegradeTest.java deleted file mode 100644 index 3e59d3d8..00000000 --- a/sentinel-adapter/sentinel-grpc-adapter/src/test/java/com/alibaba/csp/sentinel/adapter/grpc/SentinelGrpcClientInterceptorDegradeTest.java +++ /dev/null @@ -1,104 +0,0 @@ -/* - * Copyright 1999-2018 Alibaba Group Holding Ltd. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.csp.sentinel.adapter.grpc; - -import com.alibaba.csp.sentinel.EntryType; -import com.alibaba.csp.sentinel.node.ClusterNode; -import com.alibaba.csp.sentinel.slots.block.RuleConstant; -import com.alibaba.csp.sentinel.slots.block.degrade.DegradeRule; -import com.alibaba.csp.sentinel.slots.block.degrade.DegradeRuleManager; -import com.alibaba.csp.sentinel.adapter.grpc.gen.FooRequest; -import com.alibaba.csp.sentinel.adapter.grpc.gen.FooResponse; -import com.alibaba.csp.sentinel.slots.block.flow.FlowRuleManager; -import com.alibaba.csp.sentinel.slots.clusterbuilder.ClusterBuilderSlot; - -import io.grpc.StatusRuntimeException; -import org.junit.After; -import org.junit.Test; - -import java.io.IOException; -import java.util.Collections; - -import static org.junit.Assert.*; - -/** - * @author zhengzechao - */ -public class SentinelGrpcClientInterceptorDegradeTest { - - private final String resourceName = "com.alibaba.sentinel.examples.FooService/helloWithEx"; - private final GrpcTestServer server = new GrpcTestServer(); - private final int timeWindow = 10; - - private void configureDegradeRule(int count) { - DegradeRule rule = new DegradeRule() - .setCount(count) - .setGrade(RuleConstant.DEGRADE_GRADE_EXCEPTION_COUNT) - .setResource(resourceName) - .setLimitApp("default") - .as(DegradeRule.class) - .setTimeWindow(timeWindow); - DegradeRuleManager.loadRules(Collections.singletonList(rule)); - } - - private boolean sendRequest(FooServiceClient client) { - try { - FooResponse response = client.helloWithEx(FooRequest.newBuilder().setName("Sentinel").setId(666).build()); - System.out.println("Response: " + response); - return true; - } catch (StatusRuntimeException ex) { - System.out.println("Blocked, cause: " + ex.getMessage()); - return false; - } - } - - @Test - public void testGrpcClientInterceptor_degrade() throws IOException { - final int port = 19316; - - configureDegradeRule(1); - server.start(port, false); - - FooServiceClient client = new FooServiceClient("localhost", port, new SentinelGrpcClientInterceptor()); - - assertFalse(sendErrorRequest(client)); - ClusterNode clusterNode = ClusterBuilderSlot.getClusterNode(resourceName, EntryType.OUT); - assertNotNull(clusterNode); - assertEquals(1, clusterNode.exceptionQps(), 0.01); - // The second request will be blocked. - assertFalse(sendRequest(client)); - assertEquals(1, clusterNode.blockRequest()); - - server.stop(); - } - - private boolean sendErrorRequest(FooServiceClient client) { - try { - FooResponse response = client.helloWithEx(FooRequest.newBuilder().setName("Sentinel").setId(-1).build()); - System.out.println("Response: " + response); - return true; - } catch (StatusRuntimeException ex) { - System.out.println("Blocked, cause: " + ex.getMessage()); - return false; - } - } - - @After - public void cleanUp() { - FlowRuleManager.loadRules(null); - ClusterBuilderSlot.getClusterNodeMap().clear(); - } -} diff --git a/sentinel-adapter/sentinel-grpc-adapter/src/test/java/com/alibaba/csp/sentinel/adapter/grpc/SentinelGrpcClientInterceptorTest.java b/sentinel-adapter/sentinel-grpc-adapter/src/test/java/com/alibaba/csp/sentinel/adapter/grpc/SentinelGrpcClientInterceptorTest.java index e61dd760..9b434168 100755 --- a/sentinel-adapter/sentinel-grpc-adapter/src/test/java/com/alibaba/csp/sentinel/adapter/grpc/SentinelGrpcClientInterceptorTest.java +++ b/sentinel-adapter/sentinel-grpc-adapter/src/test/java/com/alibaba/csp/sentinel/adapter/grpc/SentinelGrpcClientInterceptorTest.java @@ -15,8 +15,6 @@ */ package com.alibaba.csp.sentinel.adapter.grpc; -import java.util.Collections; - import com.alibaba.csp.sentinel.EntryType; import com.alibaba.csp.sentinel.adapter.grpc.gen.FooRequest; import com.alibaba.csp.sentinel.adapter.grpc.gen.FooResponse; @@ -25,12 +23,17 @@ import com.alibaba.csp.sentinel.slots.block.RuleConstant; import com.alibaba.csp.sentinel.slots.block.flow.FlowRule; import com.alibaba.csp.sentinel.slots.block.flow.FlowRuleManager; import com.alibaba.csp.sentinel.slots.clusterbuilder.ClusterBuilderSlot; - import io.grpc.StatusRuntimeException; import org.junit.After; +import org.junit.Before; import org.junit.Test; -import static org.junit.Assert.*; +import java.util.Collections; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; /** * Test cases for {@link SentinelGrpcClientInterceptor}. @@ -38,48 +41,52 @@ import static org.junit.Assert.*; * @author Eric Zhao */ public class SentinelGrpcClientInterceptorTest { - - private final String resourceName = "com.alibaba.sentinel.examples.FooService/sayHello"; - private final int threshold = 2; + private final String fullMethodName = "com.alibaba.sentinel.examples.FooService/sayHello"; private final GrpcTestServer server = new GrpcTestServer(); + private FooServiceClient client; private void configureFlowRule(int count) { FlowRule rule = new FlowRule() - .setCount(count) - .setGrade(RuleConstant.FLOW_GRADE_QPS) - .setResource(resourceName) - .setLimitApp("default") - .as(FlowRule.class); + .setCount(count) + .setGrade(RuleConstant.FLOW_GRADE_QPS) + .setResource(fullMethodName) + .setLimitApp("default") + .as(FlowRule.class); FlowRuleManager.loadRules(Collections.singletonList(rule)); } @Test public void testGrpcClientInterceptor() throws Exception { final int port = 19328; - - configureFlowRule(threshold); server.start(port, false); + client = new FooServiceClient("localhost", port, new SentinelGrpcClientInterceptor()); - FooServiceClient client = new FooServiceClient("localhost", port, new SentinelGrpcClientInterceptor()); - - assertTrue(sendRequest(client)); - ClusterNode clusterNode = ClusterBuilderSlot.getClusterNode(resourceName, EntryType.OUT); + configureFlowRule(Integer.MAX_VALUE); + assertTrue(sendRequest(FooRequest.newBuilder().setName("Sentinel").setId(666).build())); + ClusterNode clusterNode = ClusterBuilderSlot.getClusterNode(fullMethodName, EntryType.OUT); assertNotNull(clusterNode); - assertEquals(1, clusterNode.totalRequest() - clusterNode.blockRequest()); + assertEquals(1, clusterNode.totalPass()); // Not allowed to pass. configureFlowRule(0); - // The second request will be blocked. - assertFalse(sendRequest(client)); + assertFalse(sendRequest(FooRequest.newBuilder().setName("Sentinel").setId(666).build())); assertEquals(1, clusterNode.blockRequest()); + configureFlowRule(Integer.MAX_VALUE); + assertFalse(sendRequest(FooRequest.newBuilder().setName("Sentinel").setId(-1).build())); + assertEquals(1, clusterNode.totalException()); + + configureFlowRule(Integer.MAX_VALUE); + assertTrue(sendRequest(FooRequest.newBuilder().setName("Sentinel").setId(-2).build())); + assertTrue(clusterNode.avgRt() >= 1000); + server.stop(); } - private boolean sendRequest(FooServiceClient client) { + private boolean sendRequest(FooRequest request) { try { - FooResponse response = client.sayHello(FooRequest.newBuilder().setName("Sentinel").setId(666).build()); + FooResponse response = client.sayHello(request); System.out.println("Response: " + response); return true; } catch (StatusRuntimeException ex) { @@ -88,9 +95,15 @@ public class SentinelGrpcClientInterceptorTest { } } + @Before + public void cleanUpBefore() { + FlowRuleManager.loadRules(null); + ClusterBuilderSlot.getClusterNodeMap().clear(); + } + @After - public void cleanUp() { + public void cleanUpAfter() { FlowRuleManager.loadRules(null); ClusterBuilderSlot.getClusterNodeMap().clear(); } -} \ No newline at end of file +} diff --git a/sentinel-adapter/sentinel-grpc-adapter/src/test/java/com/alibaba/csp/sentinel/adapter/grpc/SentinelGrpcServerInterceptorDegradeTest.java b/sentinel-adapter/sentinel-grpc-adapter/src/test/java/com/alibaba/csp/sentinel/adapter/grpc/SentinelGrpcServerInterceptorDegradeTest.java deleted file mode 100644 index 78bc32f1..00000000 --- a/sentinel-adapter/sentinel-grpc-adapter/src/test/java/com/alibaba/csp/sentinel/adapter/grpc/SentinelGrpcServerInterceptorDegradeTest.java +++ /dev/null @@ -1,114 +0,0 @@ -/* - * Copyright 1999-2018 Alibaba Group Holding Ltd. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.csp.sentinel.adapter.grpc; - -import com.alibaba.csp.sentinel.EntryType; -import com.alibaba.csp.sentinel.node.ClusterNode; -import com.alibaba.csp.sentinel.adapter.grpc.gen.FooRequest; -import com.alibaba.csp.sentinel.adapter.grpc.gen.FooResponse; -import com.alibaba.csp.sentinel.slots.block.RuleConstant; -import com.alibaba.csp.sentinel.slots.block.degrade.DegradeRule; -import com.alibaba.csp.sentinel.slots.block.degrade.DegradeRuleManager; -import com.alibaba.csp.sentinel.slots.block.flow.FlowRuleManager; -import com.alibaba.csp.sentinel.slots.clusterbuilder.ClusterBuilderSlot; - -import io.grpc.StatusRuntimeException; -import org.junit.After; -import org.junit.Test; - -import java.io.IOException; -import java.util.Collections; -import java.util.concurrent.CountDownLatch; - -import static org.junit.Assert.*; - -/** - * @author zhengzechao - */ -public class SentinelGrpcServerInterceptorDegradeTest { - - private final String resourceName = "com.alibaba.sentinel.examples.FooService/anotherHelloWithEx"; - private final GrpcTestServer server = new GrpcTestServer(); - private final int timeWindow = 10; - private FooServiceClient client; - - private void configureDegradeRule(int count) { - DegradeRule rule = new DegradeRule() - .setCount(count) - .setGrade(RuleConstant.DEGRADE_GRADE_EXCEPTION_COUNT) - .setResource(resourceName) - .setLimitApp("default") - .as(DegradeRule.class) - .setTimeWindow(timeWindow); - DegradeRuleManager.loadRules(Collections.singletonList(rule)); - } - - private boolean sendRequest() { - try { - FooResponse response = client.anotherHelloWithEx(FooRequest.newBuilder().setName("Sentinel").setId(666) - .build()); - System.out.println("Response: " + response); - return true; - } catch (StatusRuntimeException ex) { - System.out.println("Blocked, cause: " + ex.getMessage()); - return false; - } - } - - @Test - public void testGrpcServerInterceptor_degrade_fail_threads() throws IOException, InterruptedException { - final int port = 19349; - client = new FooServiceClient("localhost", port); - server.start(port, true); - // exception count = 1 - configureDegradeRule(20); - final CountDownLatch latch = new CountDownLatch(20); - - for (int i = 0; i < 20; i++) { - new Thread(new Runnable() { - @Override - public void run() { - assertFalse(sendErrorRequest()); - latch.countDown(); - } - }).start(); - } - latch.await(); - assertFalse(sendRequest()); - ClusterNode clusterNode = ClusterBuilderSlot.getClusterNode(resourceName, EntryType.IN); - assertEquals(20, clusterNode.totalException()); - assertEquals(1, clusterNode.blockRequest()); - - } - - private boolean sendErrorRequest() { - try { - FooResponse response = client.anotherHelloWithEx(FooRequest.newBuilder().setName("Sentinel").setId(-1) - .build()); - System.out.println("Response: " + response); - return true; - } catch (StatusRuntimeException ex) { - System.out.println("Blocked, cause: " + ex.getMessage()); - return false; - } - } - - @After - public void cleanUp() { - FlowRuleManager.loadRules(null); - ClusterBuilderSlot.getClusterNodeMap().clear(); - } -} diff --git a/sentinel-adapter/sentinel-grpc-adapter/src/test/java/com/alibaba/csp/sentinel/adapter/grpc/SentinelGrpcServerInterceptorTest.java b/sentinel-adapter/sentinel-grpc-adapter/src/test/java/com/alibaba/csp/sentinel/adapter/grpc/SentinelGrpcServerInterceptorTest.java index 004a2563..ceef04fa 100755 --- a/sentinel-adapter/sentinel-grpc-adapter/src/test/java/com/alibaba/csp/sentinel/adapter/grpc/SentinelGrpcServerInterceptorTest.java +++ b/sentinel-adapter/sentinel-grpc-adapter/src/test/java/com/alibaba/csp/sentinel/adapter/grpc/SentinelGrpcServerInterceptorTest.java @@ -15,8 +15,6 @@ */ package com.alibaba.csp.sentinel.adapter.grpc; -import java.util.Collections; - import com.alibaba.csp.sentinel.EntryType; import com.alibaba.csp.sentinel.adapter.grpc.gen.FooRequest; import com.alibaba.csp.sentinel.adapter.grpc.gen.FooResponse; @@ -25,12 +23,17 @@ import com.alibaba.csp.sentinel.slots.block.RuleConstant; import com.alibaba.csp.sentinel.slots.block.flow.FlowRule; import com.alibaba.csp.sentinel.slots.block.flow.FlowRuleManager; import com.alibaba.csp.sentinel.slots.clusterbuilder.ClusterBuilderSlot; - import io.grpc.StatusRuntimeException; import org.junit.After; +import org.junit.Before; import org.junit.Test; -import static org.junit.Assert.*; +import java.util.Collections; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; /** * Test cases for {@link SentinelGrpcServerInterceptor}. @@ -38,49 +41,52 @@ import static org.junit.Assert.*; * @author Eric Zhao */ public class SentinelGrpcServerInterceptorTest { - private final String resourceName = "com.alibaba.sentinel.examples.FooService/anotherHello"; - private final int threshold = 4; private final GrpcTestServer server = new GrpcTestServer(); - private FooServiceClient client; private void configureFlowRule(int count) { FlowRule rule = new FlowRule() - .setCount(count) - .setGrade(RuleConstant.FLOW_GRADE_QPS) - .setResource(resourceName) - .setLimitApp("default") - .as(FlowRule.class); + .setCount(count) + .setGrade(RuleConstant.FLOW_GRADE_QPS) + .setResource(resourceName) + .setLimitApp("default") + .as(FlowRule.class); FlowRuleManager.loadRules(Collections.singletonList(rule)); } @Test public void testGrpcServerInterceptor() throws Exception { final int port = 19329; - client = new FooServiceClient("localhost", port); - - configureFlowRule(threshold); server.start(port, true); + client = new FooServiceClient("localhost", port); - assertTrue(sendRequest()); + configureFlowRule(Integer.MAX_VALUE); + assertTrue(sendRequest(FooRequest.newBuilder().setName("Sentinel").setId(666).build())); ClusterNode clusterNode = ClusterBuilderSlot.getClusterNode(resourceName, EntryType.IN); assertNotNull(clusterNode); - assertEquals(1, clusterNode.totalRequest() - clusterNode.blockRequest()); + assertEquals(1, clusterNode.totalPass()); // Not allowed to pass. configureFlowRule(0); - // The second request will be blocked. - assertFalse(sendRequest()); + assertFalse(sendRequest(FooRequest.newBuilder().setName("Sentinel").setId(666).build())); assertEquals(1, clusterNode.blockRequest()); + configureFlowRule(Integer.MAX_VALUE); + assertFalse(sendRequest(FooRequest.newBuilder().setName("Sentinel").setId(-1).build())); + assertEquals(1, clusterNode.totalException()); + + configureFlowRule(Integer.MAX_VALUE); + assertTrue(sendRequest(FooRequest.newBuilder().setName("Sentinel").setId(-2).build())); + assertTrue(clusterNode.avgRt() >= 1000); + server.stop(); } - private boolean sendRequest() { + private boolean sendRequest(FooRequest request) { try { - FooResponse response = client.anotherHello(FooRequest.newBuilder().setName("Sentinel").setId(666).build()); + FooResponse response = client.anotherHello(request); System.out.println("Response: " + response); return true; } catch (StatusRuntimeException ex) { @@ -89,10 +95,15 @@ public class SentinelGrpcServerInterceptorTest { } } + @Before + public void cleanUpBefore() { + FlowRuleManager.loadRules(null); + ClusterBuilderSlot.getClusterNodeMap().clear(); + } @After - public void cleanUp() { + public void cleanUpAfter() { FlowRuleManager.loadRules(null); ClusterBuilderSlot.getClusterNodeMap().clear(); } -} \ No newline at end of file +} diff --git a/sentinel-adapter/sentinel-grpc-adapter/src/test/proto/example.proto b/sentinel-adapter/sentinel-grpc-adapter/src/test/proto/example.proto index 2b25b1e1..daf70e3c 100755 --- a/sentinel-adapter/sentinel-grpc-adapter/src/test/proto/example.proto +++ b/sentinel-adapter/sentinel-grpc-adapter/src/test/proto/example.proto @@ -17,14 +17,6 @@ message FooResponse { // Example service definition. service FooService { - - rpc sayHello(FooRequest) returns (FooResponse) {} - rpc anotherHello(FooRequest) returns (FooResponse) {} - - rpc helloWithEx(FooRequest) returns (FooResponse) {} - rpc anotherHelloWithEx(FooRequest) returns (FooResponse) {} - - -} \ No newline at end of file +}