Lettuce一定要打开redis集群拓扑刷新功能

在使用Lettuce访问Redis的时候,一定要记得打开它的Redis 集群拓扑刷新功能,否则他压根就不存在高可用。因为他的集群拓扑刷新功能是默认没开启的。

上面的3个文章其实说的就是这个事情,在redis集群拓扑结构发生变化,比如Redis的master挂掉了后,lettuce的client端就会长时间不能恢复。因此可以通过下面的配置打开拓扑刷新功能:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
//默认超时时间, lettuce默认超时时间为60s太长了,此处默认设置为15s
private Long timeoutInMillis = Duration.ofSeconds(15).toMillis();

static ClusterClientOptions.Builder initDefaultClusterClientOptions(ClusterClientOptions.Builder builder) {
ClusterTopologyRefreshOptions defaultClusterTopologyRefreshOptions = ClusterTopologyRefreshOptions.builder()
//开启集群拓扑结构周期性刷新,和默认参数保持一致
.enablePeriodicRefresh(60, TimeUnit.SECONDS)
//开启针对{@link RefreshTrigger}中所有类型的事件的触发器
.enableAllAdaptiveRefreshTriggers()
//和默认一样,30s超时,避免短时间大量出现刷新拓扑的事件
.adaptiveRefreshTriggersTimeout(30, TimeUnit.SECONDS)
//和默认一样重连5次先,然后在刷新集群拓扑
.refreshTriggersReconnectAttempts(5)
.build();

return builder
// 配置用于开启自适应刷新和定时刷新。如自适应刷新不开启,Redis集群变更时将会导致连接异常
.topologyRefreshOptions(defaultClusterTopologyRefreshOptions)
//默认就是重连的,显示定义一下
.autoReconnect(true)
//和默认一样最大重定向5次,避免极端情况无止境的重定向
.maxRedirects(5)
//Accept commands when auto-reconnect is enabled, reject commands when auto-reconnect is disabled.
.disconnectedBehavior(ClientOptions.DisconnectedBehavior.DEFAULT)
.socketOptions(SocketOptions.builder().keepAlive(true).tcpNoDelay(true).build())
//取消校验集群节点的成员关系
.validateClusterNodeMembership(false);
}

public static ClusterClientOptions.Builder getDefaultClusterClientOptionBuilder() {
return initDefaultClusterClientOptions(ClusterClientOptions.builder());
}

上面的配置其实就是修改默认的连接参数,打开集群拓扑刷新功能。其中有几个比较重要的地方,redis的默认超时时间是1分钟,其实这个时间太长了,很多的时候几秒钟就可以了,我这里是改为了15秒。
另外一个比较重要的参数就是validateClusterNodeMembership,这个大家一定要注意,默认这个属性是true的,也就是你的redis cluster集群增加一个redis节点,Lettuce默认是不信任这个节点的,
因此在内网的情况下,我们基本上都要关闭这个功能。也就是:validateClusterNodeMembership(false);

redis Pipeline

在基于request-response的请求模型中,一般都会涉及下面几个阶段:

  • client发送命令
  • 命令在网络上传输
  • server收到命令并开始执行
  • server返回结果

在这个过程中我们可以看到有2次网络传输,而这两次网络传输的耗时成为:RTT (Round Trip Time)。例如,如果 RTT 时间是250毫秒(网络连接很慢的情况下),
即使服务端每秒能处理100k的请求量,那我们每秒最多也只能处理4个请求。如果使用的是本地环回接口,RTT 就短得多,但如如果需要连续执行多次写入,这也是一笔很大的开销。下面的图是传统的
N次request-response的交互图:

传统的N次request-response的交互图

一般情况下我们为了解决rtt耗时太长的问题,会采样批处理的解决方案,也就是将请求参数批量发给server端,server端处理完这些请求以后,在一次性返回结果。

在redis中,已经提供了一些批量操作命令,比如mget,mset等命令。但是也有不少命令是没有批量操作命令的,但是为了解决这个问题,redis支持Pipeline。

Pipeline 并不是一种新的技术或机制,很多技术上都使用过。RTT 在不同网络环境下会不同,例如同机房和同机房会比较快,跨机房跨地区会比较慢。Redis 很早就支持 Pipeline 技术,因此无论你运行的是什么版本,你都可以使用 Pipeline 操作 Redis。如果客户端和服务端的网络延时越大,那么Pipeline的效果越明显。

Pipeline 能将一组 Redis 命令进行组装,通过一次 RTT 传输给 Redis,再将这组 Redis 命令按照顺序执行并将结果返回给客户端。上图没有使用 Pipeline 执行了 N 条命令,整个过程需要 N 次 RTT。下图为使用 Pipeline 执行 N 条命令,整个过程仅需要 1 次 RTT:

Pipeline 示意图

Pipeline 基本使用

我比较喜欢用的lettuce中对pipeline的使用方式(Asynchronous Pipelining)如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
StatefulRedisConnection<String, String> connection = client.connect();
RedisAsyncCommands<String, String> commands = connection.async();

// disable auto-flushing
commands.setAutoFlushCommands(false);

// perform a series of independent calls
List<RedisFuture<?>> futures = Lists.newArrayList();
for (int i = 0; i < iterations; i++) {
futures.add(commands.set("key-" + i, "value-" + i));
futures.add(commands.expire("key-" + i, 3600));
}

// write all commands to the transport layer
commands.flushCommands();

// synchronization example: Wait until all futures complete
boolean result = LettuceFutures.awaitAll(5, TimeUnit.SECONDS,
futures.toArray(new RedisFuture[futures.size()]));

// later
connection.close();

Jedis 也提供了对 Pipeline 特性的支持。我们可以借助 Pipeline 来模拟批量删除,虽然不会像 mget 和 mset 那样是一个原子命令,但是在绝大数情况下可以使用:

1
2
3
4
5
6
7
8
9
10
11
public void mdel(List<String> keys){
Jedis jedis = new Jedis("127.0.0.1");
// 创建Pipeline对象
Pipeline pipeline = jedis.pipelined();
for (String key : keys){
// 组装命令
pipeline.del(key);
}
// 执行命令
pipeline.sync();
}

批量命令与Pipeline对比

  • redis的原生批量命令(如mget)是原子的,Pipeline 是非原子的。
  • redis原生批量命令是一个命令对应多个key,Pipeline 支持多个命令。
  • redis原生批量命令是Redis服务端支持实现的,而Pipeline需要服务端和客户端的共同实现。

参考资料

redisson redlock代码阅读

本文章未完,待续

redisson redlock基本使用

1
2
3
4
5
6
7
8
9
10
RLock lock1 = redissonInstance1.getLock("lock1");
RLock lock2 = redissonInstance2.getLock("lock2");
RLock lock3 = redissonInstance3.getLock("lock3");

RedissonRedLock lock = new RedissonRedLock(lock1, lock2, lock3);
// 同时加锁:lock1 lock2 lock3
// 红锁在大部分节点上加锁成功就算成功。
lock.lock();
...
lock.unlock();

另外Redisson还通过加锁的方法提供了leaseTime的参数来指定加锁的时间。超过这个时间后锁便自动解开了。

1
2
3
4
5
6
7
8
RedissonRedLock lock = new RedissonRedLock(lock1, lock2, lock3);
// 给lock1,lock2,lock3加锁,如果没有手动解开的话,10秒钟后将会自动解开
lock.lock(10, TimeUnit.SECONDS);

// 为加锁等待100秒时间,并在加锁成功10秒钟后自动解开
boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS);
...
lock.unlock();

源码阅读

RedissonRedLock类继承了RedissonMultiLock,基于redlock算法,这个类重写了RedissonMultiLock的failedLocksLimitcalcLockWaitTime方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class RedissonRedLock extends RedissonMultiLock {

/**
* Creates instance with multiple {@link RLock} objects.
* Each RLock object could be created by own Redisson instance.
*
* @param locks - array of locks
*/
public RedissonRedLock(RLock... locks) {
super(locks);
}

@Override
protected int failedLocksLimit() {
return locks.size() - minLocksAmount(locks);
}

protected int minLocksAmount(final List<RLock> locks) {
return locks.size()/2 + 1;
}

@Override
protected long calcLockWaitTime(long remainTime) {
return Math.max(remainTime / locks.size(), 1);
}

@Override
public void unlock() {
unlockInner(locks);
}

}

参考资料

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×