最近遇到了一个麻烦的需求,我们需要一个微服务应用同时访问两个不同的 Redis 集群。一般我们不会这么使用 Redis,但是这两个 Redis 本来是不同业务集群,现在需要一个微服务同时访问。
其实我们在实际业务开发的时候,可能还会遇到类似的场景。例如 Redis 读写分离,这个也是 spring-data-Redis 没有提供的功能,底层连接池例如 Lettuce 或者 Jedis 都提供了获取只读连接的 API,但是缺陷有两个:
上层 spring-data-redis 并没有封装这种接口
基于 redis 的架构实现的,哨兵模式需要配置 sentinel 的地址,集群模式需要感知集群拓扑,在云原生环境中,这些都默认被云提供商隐藏了,暴露到外面的只有一个个动态 VIP 域名。
因此,我们需要在 spring-data-redis 的基础上实现一个动态切换 Redis 连接的机制。
spring-data-redis 的配置类为:org.springframework.boot.autoconfigure.data.redis.RedisProperties
,可以配置单个 Redis 实例或者 Redis 集群的连接配置。根据这些配置,会生成统一的 Redis 连接工厂 RedisConnectionFactory
spring-data-redis 核心接口与背后的连接相关抽象关系为:
通过这个图,我们可以知道,我们实现一个可以动态返回不同 Redis 连接的 RedisConnectionFactory
即可,并且根据 spring-data-redis 的自动装载源码可以知道,框架内的所有 RedisConnectionFactory
是 @ConditionalOnMissingBean
的,即我们可以使用我们自己实现的 RedisConnectionFactory
进行替换。
项目地址:https://github.com/JoJoTec/spring-boot-starter-redis-related
我们可以给 RedisProperties
配置外层封装一个多 Redis 连接的配置,即MultiRedisProperties
:
@Data@NoArgsConstructor@ConfigurationProperties(prefix="spring.redis")publicclassMultiRedisProperties{/***默认连接必须配置,配置key为default*/publicstaticfinalStringDEFAULT="default";privatebooleanenableMulti=false;privateMap<String,RedisProperties>multi;}
这个配置是在原有配置基础上的,也就是用户可以使用原有配置,也可以使用这种多 Redis 配置,就是需要配置 spring.redis.enable-multi=true
。multi 这个 Map 中放入的 key 是数据源名称,用户可以在使用 RedisTemplate 或者 ReactiveRedisTemplate 之前,通过这个数据源名称指定用哪个 Redis。
接下来我们来实现 MultiRedisLettuceConnectionFactory
,即可以动态切换 Redis 连接的 RedisConnectionFactory
,我们的项目采用的 Redis 客户端是 Lettuce:
publicclassMultiRedisLettuceConnectionFactoryimplementsInitializingBean,DisposableBean,RedisConnectionFactory,ReactiveRedisConnectionFactory{privatefinalMap<String,LettuceConnectionFactory>connectionFactoryMap;privatestaticfinalThreadLocal<String>currentRedis=newThreadLocal<>();publicMultiRedisLettuceConnectionFactory(Map<String,LettuceConnectionFactory>connectionFactoryMap){this.connectionFactoryMap=connectionFactoryMap;}publicvoidsetCurrentRedis(StringcurrentRedis){if(!connectionFactoryMap.containsKey(currentRedis)){thrownewRedisRelatedException("invalidcurrentRedis:"+currentRedis+",itdoesnotexistsinconfiguration");}MultiRedisLettuceConnectionFactory.currentRedis.set(currentRedis);}@Overridepublicvoiddestroy()throwsException{connectionFactoryMap.values().forEach(LettuceConnectionFactory::destroy);}@OverridepublicvoidafterPropertiesSet()throwsException{connectionFactoryMap.values().forEach(LettuceConnectionFactory::afterPropertiesSet);}privateLettuceConnectionFactorycurrentLettuceConnectionFactory(){StringcurrentRedis=MultiRedisLettuceConnectionFactory.currentRedis.get();if(StringUtils.isNotBlank(currentRedis)){MultiRedisLettuceConnectionFactory.currentRedis.remove();returnconnectionFactoryMap.get(currentRedis);}returnconnectionFactoryMap.get(MultiRedisProperties.DEFAULT);}@OverridepublicReactiveRedisConnectiongetReactiveConnection(){returncurrentLettuceConnectionFactory().getReactiveConnection();}@OverridepublicReactiveRedisClusterConnectiongetReactiveClusterConnection(){returncurrentLettuceConnectionFactory().getReactiveClusterConnection();}@OverridepublicRedisConnectiongetConnection(){returncurrentLettuceConnectionFactory().getConnection();}@OverridepublicRedisClusterConnectiongetClusterConnection(){returncurrentLettuceConnectionFactory().getClusterConnection();}@OverridepublicbooleangetConvertPipelineAndTxResults(){returncurrentLettuceConnectionFactory().getConvertPipelineAndTxResults();}@OverridepublicRedisSentinelConnectiongetSentinelConnection(){returncurrentLettuceConnectionFactory().getSentinelConnection();}@OverridepublicDataAccessExceptiontranslateExceptionIfPossible(RuntimeExceptionex){returncurrentLettuceConnectionFactory().translateExceptionIfPossible(ex);}}
逻辑非常简单,就是提供了设置 Redis 数据源的接口,并且放入了 ThreadLocal 中,并且仅对当前一次有效,读取后就清空。
然后,将 MultiRedisLettuceConnectionFactory 作为 Bean 注册到我们的 ApplicationContext 中:
@ConditionalOnProperty(prefix="spring.redis",value="enable-multi",matchIfMissing=false)@Configuration(proxyBeanMethods=false)publicclassRedisCustomizedConfiguration{/***@parambuilderCustomizers*@paramclientResources*@parammultiRedisProperties*@return*@seeorg.springframework.boot.autoconfigure.data.redis.LettuceConnectionConfiguration*/@BeanpublicMultiRedisLettuceConnectionFactorymultiRedisLettuceConnectionFactory(ObjectProvider<LettuceClientConfigurationBuilderCustomizer>builderCustomizers,ClientResourcesclientResources,MultiRedisPropertiesmultiRedisProperties,ObjectProvider<RedisSentinelConfiguration>sentinelConfigurationProvider,ObjectProvider<RedisClusterConfiguration>clusterConfigurationProvider){//读取配置Map<String,LettuceConnectionFactory>connectionFactoryMap=Maps.newHashMap();Map<String,RedisProperties>multi=multiRedisProperties.getMulti();multi.forEach((k,v)->{//这个其实就是框架中原有的源码使用RedisProperties的方式,我们其实就是在RedisProperties外面包装了一层而已LettuceConnectionConfigurationlettuceConnectionConfiguration=newLettuceConnectionConfiguration(v,sentinelConfigurationProvider,clusterConfigurationProvider);LettuceConnectionFactorylettuceConnectionFactory=lettuceConnectionConfiguration.redisConnectionFactory(builderCustomizers,clientResources);connectionFactoryMap.put(k,lettuceConnectionFactory);});returnnewMultiRedisLettuceConnectionFactory(connectionFactoryMap);}}
我们来测试下,使用 embedded-redis 来启动本地 redis,从而实现单元测试。我们启动两个 Redis,在两个 Redis 中放入不同的 Key,验证是否存在,并且测试同步接口,多线程调用同步接口,和多次异步接口无等待订阅从而测试有效性。:
importcom.github.jojotech.spring.boot.starter.redis.related.lettuce.MultiRedisLettuceConnectionFactory;importorg.junit.jupiter.api.AfterAll;importorg.junit.jupiter.api.Assertions;importorg.junit.jupiter.api.BeforeAll;importorg.junit.jupiter.api.Test;importorg.junit.jupiter.api.extension.ExtendWith;importorg.redisson.api.RedissonClient;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.boot.autoconfigure.EnableAutoConfiguration;importorg.springframework.boot.test.context.SpringBootTest;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;importorg.springframework.data.redis.core.ReactiveStringRedisTemplate;importorg.springframework.data.redis.core.StringRedisTemplate;importorg.springframework.test.context.junit.jupiter.SpringExtension;importreactor.core.publisher.Mono;importredis.embedded.RedisServer;importjava.util.concurrent.TimeUnit;importjava.util.concurrent.atomic.AtomicBoolean;@ExtendWith(SpringExtension.class)@SpringBootTest(properties={"spring.redis.enable-multi=true","spring.redis.multi.default.host=127.0.0.1","spring.redis.multi.default.port=6379","spring.redis.multi.test.host=127.0.0.1","spring.redis.multi.test.port=6380",})publicclassMultiRedisTest{//启动两个redisprivatestaticRedisServerredisServer;privatestaticRedisServerredisServer2;@BeforeAllpublicstaticvoidsetUp()throwsException{System.out.println("startredis");redisServer=RedisServer.builder().port(6379).setting("maxheap200m").build();redisServer2=RedisServer.builder().port(6380).setting("maxheap200m").build();redisServer.start();redisServer2.start();System.out.println("redisstarted");}@AfterAllpublicstaticvoidtearDown()throwsException{System.out.println("stopredis");redisServer.stop();redisServer2.stop();System.out.println("redisstopped");}@EnableAutoConfiguration@ConfigurationpublicstaticclassApp{}@AutowiredprivateStringRedisTemplateredisTemplate;@AutowiredprivateReactiveStringRedisTemplatereactiveRedisTemplate;@AutowiredprivateMultiRedisLettuceConnectionFactorymultiRedisLettuceConnectionFactory;privatevoidtestMulti(Stringsuffix){//使用默认连接,设置"testDefault"+suffix,"testDefault"键值对redisTemplate.opsForValue().set("testDefault"+suffix,"testDefault");//使用test连接,设置"testSecond"+suffix,"testDefault"键值对multiRedisLettuceConnectionFactory.setCurrentRedis("test");redisTemplate.opsForValue().set("testSecond"+suffix,"testSecond");//使用默认连接,验证"testDefault"+suffix存在,"testSecond"+suffix不存在Assertions.assertTrue(redisTemplate.hasKey("testDefault"+suffix));Assertions.assertFalse(redisTemplate.hasKey("testSecond"+suffix));//使用test连接,验证"testDefault"+suffix不存在,"testSecond"+suffix存在multiRedisLettuceConnectionFactory.setCurrentRedis("test");Assertions.assertFalse(redisTemplate.hasKey("testDefault"+suffix));multiRedisLettuceConnectionFactory.setCurrentRedis("test");Assertions.assertTrue(redisTemplate.hasKey("testSecond"+suffix));}//单次验证@TestpublicvoidtestMultiBlock(){testMulti("");}//多线程验证@TestpublicvoidtestMultiBlockMultiThread()throwsInterruptedException{Threadthread[]=newThread[50];AtomicBooleanresult=newAtomicBoolean(true);for(inti=0;i<thread.length;i++){intfinalI=i;thread[i]=newThread(()->{try{testMulti(""+finalI);}catch(Exceptione){e.printStackTrace();result.set(false);}});}for(inti=0;i<thread.length;i++){thread[i].start();}for(inti=0;i<thread.length;i++){thread[i].join();}Assertions.assertTrue(result.get());}//reactive接口验证privateMono<Boolean>reactiveMulti(Stringsuffix){returnreactiveRedisTemplate.opsForValue().set("testReactiveDefault"+suffix,"testReactiveDefault").flatMap(b->{multiRedisLettuceConnectionFactory.setCurrentRedis("test");returnreactiveRedisTemplate.opsForValue().set("testReactiveSecond"+suffix,"testReactiveSecond");}).flatMap(b->{returnreactiveRedisTemplate.hasKey("testReactiveDefault"+suffix);}).map(b->{Assertions.assertTrue(b);System.out.println(Thread.currentThread().getName());returnb;}).flatMap(b->{returnreactiveRedisTemplate.hasKey("testReactiveSecond"+suffix);}).map(b->{Assertions.assertFalse(b);System.out.println(Thread.currentThread().getName());returnb;}).flatMap(b->{multiRedisLettuceConnectionFactory.setCurrentRedis("test");returnreactiveRedisTemplate.hasKey("testReactiveDefault"+suffix);}).map(b->{Assertions.assertFalse(b);System.out.println(Thread.currentThread().getName());returnb;}).flatMap(b->{multiRedisLettuceConnectionFactory.setCurrentRedis("test");returnreactiveRedisTemplate.hasKey("testReactiveSecond"+suffix);}).map(b->{Assertions.assertTrue(b);returnb;});}//多次调用reactive验证,并且subscribe,这本身就是多线程的@TestpublicvoidtestMultiReactive()throwsInterruptedException{for(inti=0;i<10000;i++){reactiveMulti(""+i).subscribe(System.out::println);}TimeUnit.SECONDS.sleep(10);}}
运行测试,通过。
微信搜索“我的编程喵”关注公众号,每日一刷,轻松提升技术,斩获各种offer