From 3e438b3dba17641603099524aabbe457f8bc55fd Mon Sep 17 00:00:00 2001 From: liqiangz Date: Tue, 9 Mar 2021 09:59:33 +0800 Subject: [PATCH] Support Redis cluster mode in Redis data-source extension (#1751) --- .../sentinel-datasource-redis/README.md | 26 ++++ .../datasource/redis/RedisDataSource.java | 71 ++++++++-- .../redis/config/RedisConnectionConfig.java | 89 ++++++++++++- .../redis/ClusterModeRedisDataSourceTest.java | 124 ++++++++++++++++++ .../redis/RedisConnectionConfigTest.java | 39 ++++++ 5 files changed, 337 insertions(+), 12 deletions(-) create mode 100644 sentinel-extension/sentinel-datasource-redis/src/test/java/com/alibaba/csp/sentinel/datasource/redis/ClusterModeRedisDataSourceTest.java diff --git a/sentinel-extension/sentinel-datasource-redis/README.md b/sentinel-extension/sentinel-datasource-redis/README.md index d04a8e42..00ad3c9b 100644 --- a/sentinel-extension/sentinel-datasource-redis/README.md +++ b/sentinel-extension/sentinel-datasource-redis/README.md @@ -57,6 +57,24 @@ public void pushRules(List rules, Converter, String> encoder) { } ``` +Transaction can be handled in Redis Cluster when just using the same key. + +An example using Lettuce Redis Cluster client: + +```java +public void pushRules(List rules, Converter, String> encoder) { + RedisAdvancedClusterCommands subCommands = client.connect().sync(); + int slot = SlotHash.getSlot(ruleKey); + NodeSelection nodes = subCommands.nodes((n)->n.hasSlot(slot)); + RedisCommands commands = nodes.commands(0); + String value = encoder.convert(rules); + commands.multi(); + commands.set(ruleKey, value); + commands.publish(channel, value); + commands.exec(); +} +``` + ## How to build RedisConnectionConfig ### Build with Redis standalone mode @@ -79,3 +97,11 @@ RedisConnectionConfig config = RedisConnectionConfig.builder() .withRedisSentinel("redisSentinelServer2",5001) .withRedisSentinelMasterId("redisSentinelMasterId").build(); ``` + +### Build with Redis Cluster mode + +```java +RedisConnectionConfig config = RedisConnectionConfig.builder() + .withRedisCluster("redisSentinelServer1",5000) + .withRedisCluster("redisSentinelServer2",5001).build(); +``` diff --git a/sentinel-extension/sentinel-datasource-redis/src/main/java/com/alibaba/csp/sentinel/datasource/redis/RedisDataSource.java b/sentinel-extension/sentinel-datasource-redis/src/main/java/com/alibaba/csp/sentinel/datasource/redis/RedisDataSource.java index c0e3e45b..d4e296b1 100644 --- a/sentinel-extension/sentinel-datasource-redis/src/main/java/com/alibaba/csp/sentinel/datasource/redis/RedisDataSource.java +++ b/sentinel-extension/sentinel-datasource-redis/src/main/java/com/alibaba/csp/sentinel/datasource/redis/RedisDataSource.java @@ -26,11 +26,16 @@ import com.alibaba.csp.sentinel.util.StringUtil; import io.lettuce.core.RedisClient; import io.lettuce.core.RedisURI; import io.lettuce.core.api.sync.RedisCommands; +import io.lettuce.core.cluster.RedisClusterClient; +import io.lettuce.core.cluster.api.sync.RedisAdvancedClusterCommands; +import io.lettuce.core.cluster.pubsub.StatefulRedisClusterPubSubConnection; import io.lettuce.core.pubsub.RedisPubSubAdapter; import io.lettuce.core.pubsub.StatefulRedisPubSubConnection; import io.lettuce.core.pubsub.api.sync.RedisPubSubCommands; import java.time.Duration; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.TimeUnit; /** @@ -59,6 +64,8 @@ public class RedisDataSource extends AbstractDataSource { private final RedisClient redisClient; + private final RedisClusterClient redisClusterClient; + private final String ruleKey; /** @@ -75,7 +82,13 @@ public class RedisDataSource extends AbstractDataSource { AssertUtil.notNull(connectionConfig, "Redis connection config can not be null"); AssertUtil.notEmpty(ruleKey, "Redis ruleKey can not be empty"); AssertUtil.notEmpty(channel, "Redis subscribe channel can not be empty"); - this.redisClient = getRedisClient(connectionConfig); + if (connectionConfig.getRedisClusters().size() == 0) { + this.redisClient = getRedisClient(connectionConfig); + this.redisClusterClient = null; + } else { + this.redisClusterClient = getRedisClusterClient(connectionConfig); + this.redisClient = null; + } this.ruleKey = ruleKey; loadInitialConfig(); subscribeFromChannel(channel); @@ -96,6 +109,26 @@ public class RedisDataSource extends AbstractDataSource { } } + private RedisClusterClient getRedisClusterClient(RedisConnectionConfig connectionConfig) { + char[] password = connectionConfig.getPassword(); + String clientName = connectionConfig.getClientName(); + + //If any uri is successful for connection, the others are not tried anymore + List redisUris = new ArrayList<>(); + for (RedisConnectionConfig config : connectionConfig.getRedisClusters()) { + RedisURI.Builder clusterRedisUriBuilder = RedisURI.builder(); + clusterRedisUriBuilder.withHost(config.getHost()) + .withPort(config.getPort()) + .withTimeout(Duration.ofMillis(connectionConfig.getTimeout())); + //All redis nodes must have same password + if (password != null) { + clusterRedisUriBuilder.withPassword(connectionConfig.getPassword()); + } + redisUris.add(clusterRedisUriBuilder.build()); + } + return RedisClusterClient.create(redisUris); + } + private RedisClient getRedisStandaloneClient(RedisConnectionConfig connectionConfig) { char[] password = connectionConfig.getPassword(); String clientName = connectionConfig.getClientName(); @@ -132,11 +165,18 @@ public class RedisDataSource extends AbstractDataSource { } private void subscribeFromChannel(String channel) { - StatefulRedisPubSubConnection pubSubConnection = redisClient.connectPubSub(); RedisPubSubAdapter adapterListener = new DelegatingRedisPubSubListener(); - pubSubConnection.addListener(adapterListener); - RedisPubSubCommands sync = pubSubConnection.sync(); - sync.subscribe(channel); + if (redisClient != null) { + StatefulRedisPubSubConnection pubSubConnection = redisClient.connectPubSub(); + pubSubConnection.addListener(adapterListener); + RedisPubSubCommands sync = pubSubConnection.sync(); + sync.subscribe(channel); + } else { + StatefulRedisClusterPubSubConnection pubSubConnection = redisClusterClient.connectPubSub(); + pubSubConnection.addListener(adapterListener); + RedisPubSubCommands sync = pubSubConnection.sync(); + sync.subscribe(channel); + } } private void loadInitialConfig() { @@ -153,16 +193,27 @@ public class RedisDataSource extends AbstractDataSource { @Override public String readSource() { - if (this.redisClient == null) { - throw new IllegalStateException("Redis client has not been initialized or error occurred"); + if (this.redisClient == null && this.redisClusterClient == null) { + throw new IllegalStateException("Redis client or Redis Cluster client has not been initialized or error occurred"); + } + + if (redisClient != null) { + RedisCommands stringRedisCommands = redisClient.connect().sync(); + return stringRedisCommands.get(ruleKey); + } else { + RedisAdvancedClusterCommands stringRedisCommands = redisClusterClient.connect().sync(); + return stringRedisCommands.get(ruleKey); } - RedisCommands stringRedisCommands = redisClient.connect().sync(); - return stringRedisCommands.get(ruleKey); } @Override public void close() { - redisClient.shutdown(); + if (redisClient != null) { + redisClient.shutdown(); + } else { + redisClusterClient.shutdown(); + } + } private class DelegatingRedisPubSubListener extends RedisPubSubAdapter { diff --git a/sentinel-extension/sentinel-datasource-redis/src/main/java/com/alibaba/csp/sentinel/datasource/redis/config/RedisConnectionConfig.java b/sentinel-extension/sentinel-datasource-redis/src/main/java/com/alibaba/csp/sentinel/datasource/redis/config/RedisConnectionConfig.java index 64fa0d7b..98aa3cd7 100644 --- a/sentinel-extension/sentinel-datasource-redis/src/main/java/com/alibaba/csp/sentinel/datasource/redis/config/RedisConnectionConfig.java +++ b/sentinel-extension/sentinel-datasource-redis/src/main/java/com/alibaba/csp/sentinel/datasource/redis/config/RedisConnectionConfig.java @@ -33,6 +33,11 @@ public class RedisConnectionConfig { */ public static final int DEFAULT_SENTINEL_PORT = 26379; + /** + * The default redisCluster port. + */ + public static final int DEFAULT_CLUSTER_PORT = 6379; + /** * The default redis port. */ @@ -51,6 +56,7 @@ public class RedisConnectionConfig { private char[] password; private long timeout = DEFAULT_TIMEOUT_MILLISECONDS; private final List redisSentinels = new ArrayList(); + private final List redisClusters = new ArrayList(); /** * Default empty constructor. @@ -238,6 +244,13 @@ public class RedisConnectionConfig { return redisSentinels; } + /** + * @return the list of {@link RedisConnectionConfig Redis Cluster URIs}. + */ + public List getRedisClusters() { + return redisClusters; + } + @Override public String toString() { final StringBuilder sb = new StringBuilder(); @@ -254,6 +267,10 @@ public class RedisConnectionConfig { sb.append(", redisSentinelMasterId=").append(redisSentinelMasterId); } + if (redisClusters.size() > 0) { + sb.append("redisClusters=").append(getRedisClusters()); + } + sb.append(']'); return sb.toString(); } @@ -281,6 +298,10 @@ public class RedisConnectionConfig { : redisURI.redisSentinelMasterId != null) { return false; } + if (redisClusters != null ? !redisClusters.equals(redisURI.redisClusters) + : redisURI.redisClusters != null) { + return false; + } return !(redisSentinels != null ? !redisSentinels.equals(redisURI.redisSentinels) : redisURI.redisSentinels != null); @@ -293,6 +314,7 @@ public class RedisConnectionConfig { result = 31 * result + port; result = 31 * result + database; result = 31 * result + (redisSentinels != null ? redisSentinels.hashCode() : 0); + result = 31 * result + (redisClusters != null ? redisClusters.hashCode() : 0); return result; } @@ -309,6 +331,7 @@ public class RedisConnectionConfig { private char[] password; private long timeout = DEFAULT_TIMEOUT_MILLISECONDS; private final List redisSentinels = new ArrayList(); + private final List redisClusters = new ArrayList(); private Builder() { } @@ -424,6 +447,63 @@ public class RedisConnectionConfig { return this; } + /** + * Set Cluster host. Creates a new builder. + * + * @param host the host name + * @return New builder with Cluster host/port. + */ + public static RedisConnectionConfig.Builder redisCluster(String host) { + + AssertUtil.notEmpty(host, "Host must not be empty"); + + RedisConnectionConfig.Builder builder = RedisConnectionConfig.builder(); + return builder.withRedisCluster(host); + } + + /** + * Set Cluster host and port. Creates a new builder. + * + * @param host the host name + * @param port the port + * @return New builder with Cluster host/port. + */ + public static RedisConnectionConfig.Builder redisCluster(String host, int port) { + + AssertUtil.notEmpty(host, "Host must not be empty"); + AssertUtil.isTrue(isValidPort(port), String.format("Port out of range: %s", port)); + + RedisConnectionConfig.Builder builder = RedisConnectionConfig.builder(); + return builder.withRedisCluster(host, port); + } + + /** + * Add a withRedisCluster host to the existing builder. + * + * @param host the host name + * @return the builder + */ + public RedisConnectionConfig.Builder withRedisCluster(String host) { + return withRedisCluster(host, DEFAULT_CLUSTER_PORT); + } + + /** + * Add a withRedisCluster host/port to the existing builder. + * + * @param host the host name + * @param port the port + * @return the builder + */ + public RedisConnectionConfig.Builder withRedisCluster(String host, int port) { + + AssertUtil.assertState(this.host == null, "Cannot use with Redis mode."); + AssertUtil.notEmpty(host, "Host must not be empty"); + AssertUtil.isTrue(isValidPort(port), String.format("Port out of range: %s", port)); + + redisClusters.add(RedisHostAndPort.of(host, port)); + return this; + } + /** * Adds host information to the builder. Does only affect Redis URI, cannot be used with Sentinel connections. * @@ -544,9 +624,9 @@ public class RedisConnectionConfig { */ public RedisConnectionConfig build() { - if (redisSentinels.isEmpty() && StringUtil.isEmpty(host)) { + if (redisSentinels.isEmpty() && redisClusters.isEmpty() && StringUtil.isEmpty(host)) { throw new IllegalStateException( - "Cannot build a RedisConnectionConfig. One of the following must be provided Host, Socket or " + "Cannot build a RedisConnectionConfig. One of the following must be provided Host, Socket, Cluster or " + "Sentinel"); } @@ -568,6 +648,11 @@ public class RedisConnectionConfig { new RedisConnectionConfig(sentinel.getHost(), sentinel.getPort(), timeout)); } + for (RedisHostAndPort sentinel : redisClusters) { + redisURI.getRedisClusters().add( + new RedisConnectionConfig(sentinel.getHost(), sentinel.getPort(), timeout)); + } + redisURI.setTimeout(timeout); return redisURI; diff --git a/sentinel-extension/sentinel-datasource-redis/src/test/java/com/alibaba/csp/sentinel/datasource/redis/ClusterModeRedisDataSourceTest.java b/sentinel-extension/sentinel-datasource-redis/src/test/java/com/alibaba/csp/sentinel/datasource/redis/ClusterModeRedisDataSourceTest.java new file mode 100644 index 00000000..68f9e500 --- /dev/null +++ b/sentinel-extension/sentinel-datasource-redis/src/test/java/com/alibaba/csp/sentinel/datasource/redis/ClusterModeRedisDataSourceTest.java @@ -0,0 +1,124 @@ +/* + * Copyright 1999-2020 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.csp.sentinel.datasource.redis; + +import com.alibaba.csp.sentinel.datasource.Converter; +import com.alibaba.csp.sentinel.datasource.ReadableDataSource; +import com.alibaba.csp.sentinel.datasource.redis.config.RedisConnectionConfig; +import com.alibaba.csp.sentinel.slots.block.flow.FlowRule; +import com.alibaba.csp.sentinel.slots.block.flow.FlowRuleManager; +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.TypeReference; +import io.lettuce.core.RedisURI; +import io.lettuce.core.api.sync.RedisCommands; +import io.lettuce.core.cluster.RedisClusterClient; +import io.lettuce.core.cluster.SlotHash; +import io.lettuce.core.cluster.api.sync.NodeSelection; +import io.lettuce.core.cluster.api.sync.RedisAdvancedClusterCommands; +import org.hamcrest.Matchers; +import org.junit.*; + +import java.util.List; +import java.util.Random; +import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; + +import static org.awaitility.Awaitility.await; + +/** + * Redis redisCluster mode test cases for {@link RedisDataSource}. + * + * @author liqiangz + */ +@Ignore(value = "Before run this test, you need to set up your Redis Cluster.") +public class ClusterModeRedisDataSourceTest { + + private String host = "localhost"; + + private int redisSentinelPort = 7000; + private final RedisClusterClient client = RedisClusterClient.create(RedisURI.Builder.redis(host, redisSentinelPort).build()); + private String ruleKey = "sentinel.rules.flow.ruleKey"; + private String channel = "sentinel.rules.flow.channel"; + + @Before + public void initData() { + Converter> flowConfigParser = buildFlowConfigParser(); + RedisConnectionConfig config = RedisConnectionConfig.builder() + .withRedisCluster(host, redisSentinelPort).build(); + initRedisRuleData(); + ReadableDataSource> redisDataSource = new RedisDataSource<>(config, + ruleKey, channel, flowConfigParser); + FlowRuleManager.register2Property(redisDataSource.getProperty()); + } + + @Test + public void testConnectToSentinelAndPubMsgSuccess() { + int maxQueueingTimeMs = new Random().nextInt(); + String flowRulesJson = + "[{\"resource\":\"test\", \"limitApp\":\"default\", \"grade\":1, \"count\":\"0.0\", \"strategy\":0, " + + "\"refResource\":null, " + + + "\"controlBehavior\":0, \"warmUpPeriodSec\":10, \"maxQueueingTimeMs\":" + maxQueueingTimeMs + + ", \"controller\":null}]"; + RedisAdvancedClusterCommands subCommands = client.connect().sync(); + int slot = SlotHash.getSlot(ruleKey); + NodeSelection nodes = subCommands.nodes((n) -> n.hasSlot(slot)); + RedisCommands commands = nodes.commands(0); + commands.multi(); + commands.set(ruleKey, flowRulesJson); + commands.publish(channel, flowRulesJson); + commands.exec(); + + await().timeout(2, TimeUnit.SECONDS) + .until(new Callable>() { + @Override + public List call() throws Exception { + return FlowRuleManager.getRules(); + } + }, Matchers.hasSize(1)); + + List rules = FlowRuleManager.getRules(); + Assert.assertEquals(rules.get(0).getMaxQueueingTimeMs(), maxQueueingTimeMs); + String value = subCommands.get(ruleKey); + List flowRulesValuesInRedis = buildFlowConfigParser().convert(value); + Assert.assertEquals(flowRulesValuesInRedis.size(), 1); + Assert.assertEquals(flowRulesValuesInRedis.get(0).getMaxQueueingTimeMs(), maxQueueingTimeMs); + } + + @After + public void clearResource() { + RedisAdvancedClusterCommands stringRedisCommands = client.connect().sync(); + stringRedisCommands.del(ruleKey); + client.shutdown(); + } + + private Converter> buildFlowConfigParser() { + return source -> JSON.parseObject(source, new TypeReference>() { + }); + } + + private void initRedisRuleData() { + String flowRulesJson = + "[{\"resource\":\"test\", \"limitApp\":\"default\", \"grade\":1, \"count\":\"0.0\", \"strategy\":0, " + + "\"refResource\":null, " + + + "\"controlBehavior\":0, \"warmUpPeriodSec\":10, \"maxQueueingTimeMs\":500, \"controller\":null}]"; + RedisAdvancedClusterCommands stringRedisCommands = client.connect().sync(); + String ok = stringRedisCommands.set(ruleKey, flowRulesJson); + Assert.assertEquals("OK", ok); + } +} diff --git a/sentinel-extension/sentinel-datasource-redis/src/test/java/com/alibaba/csp/sentinel/datasource/redis/RedisConnectionConfigTest.java b/sentinel-extension/sentinel-datasource-redis/src/test/java/com/alibaba/csp/sentinel/datasource/redis/RedisConnectionConfigTest.java index 15a95762..b9ffb9b4 100644 --- a/sentinel-extension/sentinel-datasource-redis/src/test/java/com/alibaba/csp/sentinel/datasource/redis/RedisConnectionConfigTest.java +++ b/sentinel-extension/sentinel-datasource-redis/src/test/java/com/alibaba/csp/sentinel/datasource/redis/RedisConnectionConfigTest.java @@ -94,4 +94,43 @@ public class RedisConnectionConfigTest { Assert.assertNull(redisConnectionConfig.getHost()); Assert.assertEquals(3, redisConnectionConfig.getRedisSentinels().size()); } + + @Test + public void testRedisClusterDefaultPortSuccess() { + String host = "localhost"; + RedisConnectionConfig redisConnectionConfig = RedisConnectionConfig.Builder.redisCluster(host) + .withPassword("211233") + .build(); + Assert.assertNull(redisConnectionConfig.getHost()); + Assert.assertEquals(1, redisConnectionConfig.getRedisClusters().size()); + Assert.assertEquals(RedisConnectionConfig.DEFAULT_CLUSTER_PORT, + redisConnectionConfig.getRedisClusters().get(0).getPort()); + } + + @Test + public void testRedisClusterMoreThanOneServerSuccess() { + String host = "localhost"; + String host2 = "server2"; + int port1 = 1879; + int port2 = 1880; + RedisConnectionConfig redisConnectionConfig = RedisConnectionConfig.Builder.redisCluster(host, port1) + .withRedisCluster(host2, port2) + .build(); + Assert.assertNull(redisConnectionConfig.getHost()); + Assert.assertEquals(2, redisConnectionConfig.getRedisClusters().size()); + } + + @Test + public void testRedisClusterMoreThanOneDuplicateServerSuccess() { + String host = "localhost"; + String host2 = "server2"; + int port2 = 1879; + RedisConnectionConfig redisConnectionConfig = RedisConnectionConfig.Builder.redisCluster(host) + .withRedisCluster(host2, port2) + .withRedisCluster(host2, port2) + .withPassword("211233") + .build(); + Assert.assertNull(redisConnectionConfig.getHost()); + Assert.assertEquals(3, redisConnectionConfig.getRedisClusters().size()); + } }