作者:鄙人薛某 2021-02-19 10:42:58
开发
前端
分布式
Redis 上一篇文章写了Redis分布式锁的原理和缺陷,觉得有些不过瘾,只是简单的介绍了下Redisson这个框架,具体的原理什么的还没说过呢。趁年后暂时没什么事,反正闲着也是闲着,不如把Redisson的源码也学习一遍好了。
泉州网站制作公司哪家好,找创新互联公司!从网页设计、网站建设、微信开发、APP开发、响应式网站设计等网站项目制作,到程序开发,运营维护。创新互联公司公司2013年成立到现在10年的时间,我们拥有了丰富的建站经验和运维经验,来保证我们的工作的顺利进行。专注于网站建设就选创新互联公司。
前言
上一篇文章写了Redis分布式锁的原理和缺陷,觉得有些不过瘾,只是简单的介绍了下Redisson这个框架,具体的原理什么的还没说过呢。趁年后暂时没什么事,反正闲着也是闲着,不如把Redisson的源码也学习一遍好了。
虽说是一时兴起,但仔细研究之后发现Redisson的源码解读工作量还是挺大的,其中用到了大量的Java并发类,并且引用了Netty作为通信工具,实现与Redis组件的远程调用,这些知识点如果要全部讲解的话不太现实,本文的重点主要是关于Redisson分布式锁的实现原理,所以网络通信和并发原理这块的代码解读不会太仔细,有不足之处还望见谅!
Redis 发布订阅
之前说过,分布式锁的核心功能其实就三个:加锁、解锁、设置锁超时。这三个功能也是我们研究Redisson分布式锁原理的方向。
在学习之前,我们有必要先了解一个知识点,就是有关Redis的发布订阅功能。
Redis 发布订阅 (pub/sub) 是一种消息通信模式:发送者 (pub) 发送消息,订阅者 (sub) 接收消息,发布者可以向指定的渠道 (channel) 发送消息,订阅者如果订阅了该频道的话就能收到消息,从而实现多个客户端的通信效果。
订阅的命令是SUBSCRIBE channel[channel ...],可以订阅一个或多个频道,当有新消息通过PUBLISH命令发送给频道时,订阅者就能收到消息,就好像这样:
开启两个客户端,一个订阅了频道channel1,另一个通过PUBLISH发送消息后,订阅的那个就能收到了,靠这种模式就能实现不同客户端之间的通信。
关于这种通信模式有哪些妙用场景我们就不展开了,大家可以自己去网上查阅学习一下,我们的主角还是Redisson,热身完毕,该上主菜了。
Redisson源码
在使用Redisson加锁之前,需要先获取一个RLock实例对象,有了这个对象就可以调用lock、tryLock方法来完成加锁的功能
- Config config = new Config();
- config.useSingleServer()
- .setPassword("")
- .setAddress("redis://127.0.0.1:6379");
- RedissonClient redisson = Redisson.create(config);
- // RLock对象
- RLock lock = redisson.getLock("myLock");
配置好对应的host,然后就可以创建一个RLock对象。RLock是一个接口,具体的同步器需要实现该接口,当我们调用redisson.getLock()时,程序会初始化一个默认的同步执行器RedissonLock
这里面初始化了几个参数,
commandExecutor:异步的Executor执行器,Redisson中所有的命令都是通过...Executor 执行的 ;
id:唯一ID,初始化的时候是用UUID创建的;
internalLockLeaseTime:等待获取锁时间,这里读的是配置类中默认定义的,时间为30秒;
同时,图片里我还标注了一个方法getEntryName,返回的是 “ID :锁名称” 的字符串,代表的是当前线程持有对应锁的一个标识,这些参数有必要留个印象,后面的源码解析中经常会出现。
说完了初始化的东西,我们就可以开始学习加锁和解锁的源码了。
加锁
Redisson的加锁方法有两个,tryLock和lock,使用上的区别在于tryLock可以设置锁的过期时长leaseTime和等待时长waitTime,核心处理的逻辑都差不多,我们先从tryLock讲起。
tryLock
代码有点长啊。整成图片不太方便,直接贴上来吧,
- /**
- * @param waitTime 等待锁的时长
- * @param leaseTime 锁的持有时间
- * @param unit 时间单位
- * @return
- * @throws InterruptedException
- */
- public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException { // 剩余的等待锁的时间
- long time = unit.toMillis(waitTime);
- long current = System.currentTimeMillis();
- final long threadId = Thread.currentThread().getId();
- // 尝试获取锁,如果没取到锁,则返回锁的剩余超时时间
- Long ttl = tryAcquire(leaseTime, unit, threadId);
- // ttl为null,说明可以抢到锁了,返回true
- if (ttl == null) {
- return true;
- }
- // 如果waitTime已经超时了,就返回false,代表申请锁失败
- time -= (System.currentTimeMillis() - current);
- if (time <= 0) {
- acquireFailed(threadId);
- return false;
- }
- current = System.currentTimeMillis();
- // 订阅分布式锁, 解锁时进行通知,看,这里就用到了我们上面说的发布-订阅了吧
- final RFuture
subscribeFuture = subscribe(threadId); - // 阻塞等待锁释放,await()返回false,说明等待超时了
- if (!await(subscribeFuture, time, TimeUnit.MILLISECONDS)) {
- if (!subscribeFuture.cancel(false)) {
- subscribeFuture.addListener(new FutureListener
() { - @Override
- public void operationComplete(Future
future) throws Exception { - if (subscribeFuture.isSuccess()) {
- // 等待都超时了,直接取消订阅
- unsubscribe(subscribeFuture, threadId);
- }
- }
- });
- }
- acquireFailed(threadId);
- return false;
- }
- try {
- time -= (System.currentTimeMillis() - current);
- if (time <= 0) {
- acquireFailed(threadId);
- return false;
- }
- // 进入死循环,反复去调用tryAcquire尝试获取锁,跟上面那一段拿锁的逻辑一样
- while (true) {
- long currentTime = System.currentTimeMillis();
- ttl = tryAcquire(leaseTime, unit, threadId);
- // lock acquired
- if (ttl == null) {
- return true;
- }
- time -= (System.currentTimeMillis() - currentTime);
- if (time <= 0) {
- acquireFailed(threadId);
- return false;
- }
- // waiting for message
- currentTime = System.currentTimeMillis();
- if (ttl >= 0 && ttl < time) {
- getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
- } else {
- getEntry(threadId).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
- }
- time -= (System.currentTimeMillis() - currentTime);
- if (time <= 0) {
- acquireFailed(threadId);
- return false;
- }
- }
- } finally {
- unsubscribe(subscribeFuture, threadId);
- }
- // return get(tryLockAsync(waitTime, leaseTime, unit));
- }
代码还是挺长的,不过流程也就两步,要么线程拿到锁返回成功;要么没拿到锁并且等待时间还没过就继续循环拿锁,同时监听锁是否被释放。
拿锁的方法是tryAcquire,传入的参数分别是锁的持有时间,时间单位以及代表当前线程的ID,跟进代码查看调用栈,它会调到一个叫做tryAcquireAsync的方法:
- private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) {
- return get(tryAcquireAsync(leaseTime, unit, threadId));
- }
- private
RFuture tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) { - // 如果有设置锁的等待时长的话,就直接调用tryLockInnerAsync方法获取锁
- if (leaseTime != -1) {
- return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
- }
- // 没有设置等待锁的时长的话,加多一个监听器,也就是调用lock.lock()会跑的逻辑,后面会说
- RFuture
ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG); - ttlRemainingFuture.addListener(new FutureListener
() { - @Override
- public void operationComplete(Future
future) throws Exception { - if (!future.isSuccess()) {
- return;
- }
- Long ttlRemaining = future.getNow();
- // lock acquired
- if (ttlRemaining == null) {
- scheduleExpirationRenewal(threadId);
- }
- }
- });
- return ttlRemainingFuture;
- }
我们继续跟,看看tryLockInnerAsync方法的源码:
RFuture tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand command) { - internalLockLeaseTime = unit.toMillis(leaseTime);
- return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
- "if (redis.call('exists', KEYS[1]) == 0) then " +
- "redis.call('hset', KEYS[1], ARGV[2], 1); " +
- "redis.call('pexpire', KEYS[1], ARGV[1]); " +
- "return nil; " +
- "end; " +
- "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
- "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
- "redis.call('pexpire', KEYS[1], ARGV[1]); " +
- "return nil; " +
- "end; " +
- "return redis.call('pttl', KEYS[1]);",
- Collections.