又长又细,万字长文带你解读Redisson分布式锁的源码

又长又细,万字长文带你解读Redisson分布式锁的源码

作者:鄙人薛某 2021-02-19 10:42:58

开发

前端

分布式

Redis 上一篇文章写了Redis分布式锁的原理和缺陷,觉得有些不过瘾,只是简单的介绍了下Redisson这个框架,具体的原理什么的还没说过呢。趁年后暂时没什么事,反正闲着也是闲着,不如把Redisson的源码也学习一遍好了。

泉州网站制作公司哪家好,找创新互联公司!从网页设计、网站建设、微信开发、APP开发、响应式网站设计等网站项目制作,到程序开发,运营维护。创新互联公司公司2013年成立到现在10年的时间,我们拥有了丰富的建站经验和运维经验,来保证我们的工作的顺利进行。专注于网站建设就选创新互联公司

 前言

上一篇文章写了Redis分布式锁的原理和缺陷,觉得有些不过瘾,只是简单的介绍了下Redisson这个框架,具体的原理什么的还没说过呢。趁年后暂时没什么事,反正闲着也是闲着,不如把Redisson的源码也学习一遍好了。

虽说是一时兴起,但仔细研究之后发现Redisson的源码解读工作量还是挺大的,其中用到了大量的Java并发类,并且引用了Netty作为通信工具,实现与Redis组件的远程调用,这些知识点如果要全部讲解的话不太现实,本文的重点主要是关于Redisson分布式锁的实现原理,所以网络通信和并发原理这块的代码解读不会太仔细,有不足之处还望见谅!

Redis 发布订阅

之前说过,分布式锁的核心功能其实就三个:加锁、解锁、设置锁超时。这三个功能也是我们研究Redisson分布式锁原理的方向。

在学习之前,我们有必要先了解一个知识点,就是有关Redis的发布订阅功能。

Redis 发布订阅 (pub/sub) 是一种消息通信模式:发送者 (pub) 发送消息,订阅者 (sub) 接收消息,发布者可以向指定的渠道 (channel) 发送消息,订阅者如果订阅了该频道的话就能收到消息,从而实现多个客户端的通信效果。

订阅的命令是SUBSCRIBE channel[channel ...],可以订阅一个或多个频道,当有新消息通过PUBLISH命令发送给频道时,订阅者就能收到消息,就好像这样:

开启两个客户端,一个订阅了频道channel1,另一个通过PUBLISH发送消息后,订阅的那个就能收到了,靠这种模式就能实现不同客户端之间的通信。

关于这种通信模式有哪些妙用场景我们就不展开了,大家可以自己去网上查阅学习一下,我们的主角还是Redisson,热身完毕,该上主菜了。

Redisson源码

在使用Redisson加锁之前,需要先获取一个RLock实例对象,有了这个对象就可以调用lock、tryLock方法来完成加锁的功能

  
 
 
 
  1. Config config = new Config(); 
  2. config.useSingleServer() 
  3.   .setPassword("") 
  4.   .setAddress("redis://127.0.0.1:6379"); 
  5. RedissonClient redisson = Redisson.create(config); 
  6. // RLock对象 
  7. RLock lock = redisson.getLock("myLock"); 

配置好对应的host,然后就可以创建一个RLock对象。RLock是一个接口,具体的同步器需要实现该接口,当我们调用redisson.getLock()时,程序会初始化一个默认的同步执行器RedissonLock

这里面初始化了几个参数,

commandExecutor:异步的Executor执行器,Redisson中所有的命令都是通过...Executor 执行的 ;

id:唯一ID,初始化的时候是用UUID创建的;

internalLockLeaseTime:等待获取锁时间,这里读的是配置类中默认定义的,时间为30秒;

同时,图片里我还标注了一个方法getEntryName,返回的是 “ID :锁名称” 的字符串,代表的是当前线程持有对应锁的一个标识,这些参数有必要留个印象,后面的源码解析中经常会出现。

说完了初始化的东西,我们就可以开始学习加锁和解锁的源码了。

加锁

Redisson的加锁方法有两个,tryLock和lock,使用上的区别在于tryLock可以设置锁的过期时长leaseTime和等待时长waitTime,核心处理的逻辑都差不多,我们先从tryLock讲起。

tryLock

代码有点长啊。整成图片不太方便,直接贴上来吧,

  
 
 
 
  1. /** 
  2.  * @param waitTime 等待锁的时长  
  3.  * @param leaseTime 锁的持有时间  
  4.  * @param unit 时间单位 
  5.  * @return 
  6.  * @throws InterruptedException 
  7.  */ 
  8. public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {    // 剩余的等待锁的时间 
  9.         long time = unit.toMillis(waitTime); 
  10.         long current = System.currentTimeMillis(); 
  11.          
  12.         final long threadId = Thread.currentThread().getId(); 
  13.         // 尝试获取锁,如果没取到锁,则返回锁的剩余超时时间 
  14.         Long ttl = tryAcquire(leaseTime, unit, threadId); 
  15.         // ttl为null,说明可以抢到锁了,返回true 
  16.         if (ttl == null) { 
  17.             return true; 
  18.         } 
  19.          
  20.         // 如果waitTime已经超时了,就返回false,代表申请锁失败 
  21.         time -= (System.currentTimeMillis() - current); 
  22.         if (time <= 0) { 
  23.             acquireFailed(threadId); 
  24.             return false; 
  25.         } 
  26.          
  27.         current = System.currentTimeMillis(); 
  28.         // 订阅分布式锁, 解锁时进行通知,看,这里就用到了我们上面说的发布-订阅了吧 
  29.         final RFuture subscribeFuture = subscribe(threadId); 
  30.         // 阻塞等待锁释放,await()返回false,说明等待超时了 
  31.         if (!await(subscribeFuture, time, TimeUnit.MILLISECONDS)) { 
  32.             if (!subscribeFuture.cancel(false)) { 
  33.                 subscribeFuture.addListener(new FutureListener() { 
  34.                     @Override 
  35.                     public void operationComplete(Future future) throws Exception { 
  36.                         if (subscribeFuture.isSuccess()) { 
  37.                          // 等待都超时了,直接取消订阅 
  38.                             unsubscribe(subscribeFuture, threadId); 
  39.                         } 
  40.                     } 
  41.                 }); 
  42.             } 
  43.             acquireFailed(threadId); 
  44.             return false; 
  45.         } 
  46.  
  47.         try { 
  48.             time -= (System.currentTimeMillis() - current); 
  49.             if (time <= 0) { 
  50.                 acquireFailed(threadId); 
  51.                 return false; 
  52.             } 
  53.          // 进入死循环,反复去调用tryAcquire尝试获取锁,跟上面那一段拿锁的逻辑一样 
  54.             while (true) { 
  55.                 long currentTime = System.currentTimeMillis(); 
  56.                 ttl = tryAcquire(leaseTime, unit, threadId); 
  57.                 // lock acquired 
  58.                 if (ttl == null) { 
  59.                     return true; 
  60.                 } 
  61.  
  62.                 time -= (System.currentTimeMillis() - currentTime); 
  63.                 if (time <= 0) { 
  64.                     acquireFailed(threadId); 
  65.                     return false; 
  66.                 } 
  67.  
  68.                 // waiting for message 
  69.                 currentTime = System.currentTimeMillis(); 
  70.                 if (ttl >= 0 && ttl < time) { 
  71.                     getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); 
  72.                 } else { 
  73.                     getEntry(threadId).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS); 
  74.                 } 
  75.  
  76.                 time -= (System.currentTimeMillis() - currentTime); 
  77.                 if (time <= 0) { 
  78.                     acquireFailed(threadId); 
  79.                     return false; 
  80.                 } 
  81.             } 
  82.         } finally { 
  83.             unsubscribe(subscribeFuture, threadId); 
  84.         } 
  85. //        return get(tryLockAsync(waitTime, leaseTime, unit)); 
  86.     } 

代码还是挺长的,不过流程也就两步,要么线程拿到锁返回成功;要么没拿到锁并且等待时间还没过就继续循环拿锁,同时监听锁是否被释放。

拿锁的方法是tryAcquire,传入的参数分别是锁的持有时间,时间单位以及代表当前线程的ID,跟进代码查看调用栈,它会调到一个叫做tryAcquireAsync的方法:

  
 
 
 
  1. private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) { 
  2.     return get(tryAcquireAsync(leaseTime, unit, threadId)); 
  3.  
  4. private  RFuture tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) { 
  5.         // 如果有设置锁的等待时长的话,就直接调用tryLockInnerAsync方法获取锁 
  6.         if (leaseTime != -1) { 
  7.             return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG); 
  8.         } 
  9.         // 没有设置等待锁的时长的话,加多一个监听器,也就是调用lock.lock()会跑的逻辑,后面会说 
  10.         RFuture ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG); 
  11.         ttlRemainingFuture.addListener(new FutureListener() { 
  12.             @Override 
  13.             public void operationComplete(Future future) throws Exception { 
  14.                 if (!future.isSuccess()) { 
  15.                     return; 
  16.                 } 
  17.  
  18.                 Long ttlRemaining = future.getNow(); 
  19.                 // lock acquired 
  20.                 if (ttlRemaining == null) { 
  21.                     scheduleExpirationRenewal(threadId); 
  22.                 } 
  23.             } 
  24.         }); 
  25.         return ttlRemainingFuture; 
  26.     } 

我们继续跟,看看tryLockInnerAsync方法的源码:

  
 
 
 
  1.  RFuture tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand command) { 
  2.     internalLockLeaseTime = unit.toMillis(leaseTime); 
  3.  
  4.     return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command, 
  5.               "if (redis.call('exists', KEYS[1]) == 0) then " + 
  6.                   "redis.call('hset', KEYS[1], ARGV[2], 1); " + 
  7.                   "redis.call('pexpire', KEYS[1], ARGV[1]); " + 
  8.                   "return nil; " + 
  9.               "end; " + 
  10.               "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + 
  11.                   "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + 
  12.                   "redis.call('pexpire', KEYS[1], ARGV[1]); " + 
  13.                   "return nil; " + 
  14.               "end; " + 
  15.               "return redis.call('pttl', KEYS[1]);", 
  16.                 Collections.singletonList(getName()), internalLockLeaseTime, getLockName(threadId)); 
  17. String getLockName(long threadId) { 
  18.     return id + ":" + threadId; 
  19. 这里就是底层的调用栈了,直接操作命令,整合成lua脚本后,调用netty的工具类跟redis进行通信,从而实现获取锁的功能。

    这段脚本命令还是有点意思的,简单解读一下:

    • 先用exists key命令判断是否锁是否被占据了,没有的话就用hset命令写入,key为锁的名称,field为“客户端唯一ID:线程ID”,value为1;
    • 锁被占据了,判断是否是当前线程占据的,是的话value值加1;
    • 锁不是被当前线程占据,返回锁剩下的过期时长;

    命令的逻辑并不复杂,但不得不说,作者的设计还是很有心的,用了redis的Hash结构存储数据,如果发现当前线程已经持有锁了,就用hincrby命令将value值加1,value的值将决定释放锁的时候调用解锁命令的次数,达到实现锁的可重入性效果。

    每一步命令对应的逻辑我都在下面的图中标注了,大家可以读一下:

    我们继续跟代码吧,根据上面的命令可以看出,如果线程拿到锁的话,tryLock方法会直接返回true,万事大吉。

    拿不到的话,就会返回锁的剩余过期时长,这个时长有什么作用呢?我们回到tryLock方法中死循环的那个地方:

    这里有一个针对waitTime和key的剩余过期时间大小的比较,取到二者中比较小的那个值,然后用Java的Semaphore信号量的tryAcquire方法来阻塞线程。

    那么Semaphore信号量又是由谁控制呢,何时才能release呢。这里又需要回到上面来看,各位看官应该还记得,我们上面贴的tryLock代码中还有这一段:

      
     
     
     
    1. current = System.currentTimeMillis(); 
    2. // 订阅分布式锁, 解锁时进行通知 
    3. final RFuture subscribeFuture = subscribe(threadId); 

    订阅的逻辑显然是在subscribe方法里,跟着方法的调用链,它会进入到PublishSubscribe.Java中:

    这段代码的作用在于将当前线程的threadId添加到一个AsyncSemaphore中,并且设置一个redis的监听器,这个监听器是通过redis的发布、订阅功能实现的。

    一旦监听器收到redis发来的消息,就从中获取与当前thread相关的,如果是锁被释放的消息,就立马通过操作Semaphore(也就是调用release方法)来让刚才阻塞的地方释放。

    释放后线程继续执行,仍旧是判断是否已经超时。如果还没超时,就进入下一次循环再次去获取锁,拿到就返回true,没有拿到的话就继续流程。

    这里说明一下,之所以要循环,是因为锁可能会被多个客户端同时争抢,线程阻塞被释放之后的那一瞬间很可能还是拿不到锁,但是线程的等待时间又还没过,这个时候就需要重新跑循环去拿锁。

    这就是tryLock获取锁的整个过程了,画一张流程图的话表示大概是这样:

    lock

    除了tryLock,一般我们还经常直接调用lock来获取锁,lock的拿锁过程跟tryLock基本是一致的,区别在于lock没有手动设置锁过期时长的参数,该方法的调用链也是跑到tryAcquire方法来获取锁的,不同的是,它会跑到这部分的逻辑:

    这段代码做了两件事:

    1、预设30秒的过期时长,然后去获取锁

    2、开启一个监听器,如果发现拿到锁了,就开启定时任务不断去刷新该锁的过期时长

    刷新过期时长的方法是scheduleExpirationRenewal,贴一下源码吧:

      
     
     
     
    1. private void scheduleExpirationRenewal(final long threadId) { 
    2.  // expirationRenewalMap是一个ConcurrentMap,存储标志为"当前线程ID:key名称"的任务 
    3.         if (expirationRenewalMap.containsKey(getEntryName())) { 
    4.             return; 
    5.         } 
    6.  
    7.         Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() { 
    8.             @Override 
    9.             public void run(Timeout timeout) throws Exception { 
    10.                 // 检测锁是否存在的lua脚本,存在的话就用pexpire命令刷新过期时长 
    11.                 RFuture future = commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, 
    12.                         "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + 
    13.                             "redis.call('pexpire', KEYS[1], ARGV[1]); " + 
    14.                             "return 1; " + 
    15.                         "end; " + 
    16.                         "return 0;", 
    17.                           Collections.singletonList(getName()), internalLockLeaseTime, getLockName(threadId)); 
    18.                  
    19.                 future.addListener(new FutureListener() { 
    20.                     @Override 
    21.                     public void operationComplete(Future future) throws Exception { 
    22.                         expirationRenewalMap.remove(getEntryName()); 
    23.                         if (!future.isSuccess()) { 
    24.                             log.error("Can't update lock " + getName() + " expiration", future.cause()); 
    25.                             return; 
    26.                         } 
    27.                          
    28.                         if (future.getNow()) { 
    29.                             // reschedule itself 
    30.                             scheduleExpirationRenewal(threadId); 
    31.                         } 
    32.                     } 
    33.                 }); 
    34.             } 
    35.         }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS); 
    36.  
    37.         if (expirationRenewalMap.putIfAbsent(getEntryName(), task) != null) { 
    38.             task.cancel(); 
    39.         } 
    40.     } 
    41. 代码的流程比较简单,大概就是开启一个定时任务,每隔internalLockLeaseTime / 3的时间(这个时间是10秒)就去检测锁是否还被当前线程持有,是的话就重新设置过期时长internalLockLeaseTime,也就是30秒的时间。

      而这些定时任务会存储在一个ConcurrentHashMap对象expirationRenewalMap中,存储的key就为“线程ID:key名称”,如果发现expirationRenewalMap中不存在对应当前线程key的话,定时任务就不会跑,这也是后面解锁中的一步重要操作。

      上面这段代码就是Redisson中所谓的”看门狗“程序,用一个异步线程来定时检测并执行的,以防手动解锁之前就过期了。

      其他的逻辑就跟tryLock()基本没什么两样啦,大家看一下就知道了

      解锁

      有拿锁的方法,自然也就有解锁。Redisson分布式锁解锁的上层调用方法是unlock(),默认不用传任何参数

        
       
       
       
      1. @Override 
      2.     public void unlock() { 
      3.      // 发起释放锁的命令请求 
      4.         Boolean opStatus = get(unlockInnerAsync(Thread.currentThread().getId())); 
      5.         if (opStatus == null) { 
      6.             throw new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: " 
      7.                     + id + " thread-id: " + Thread.currentThread().getId()); 
      8.         } 
      9.         if (opStatus) { 
      10.          // 成功释放锁,取消"看门狗"的续时线程 
      11.             cancelExpirationRenewal(); 
      12.         } 
      13.     } 

      解锁相关的命令操作在unlockInnerAsync方法中定义,

       又是一大串的lua脚本,比起前面加锁那段脚本的命令稍微复杂了点,不过没关系,我们简单梳理一下,命令的逻辑大概是这么几步:

      1、判断锁是否存在,不存在的话用publish命令发布释放锁的消息,订阅者收到后就能做下一步的拿锁处理;

      2、锁存在但不是当前线程持有,返回空置nil;

      3、当前线程持有锁,用hincrby命令将锁的可重入次数-1,然后判断重入次数是否大于0,是的话就重新刷新锁的过期时长,返回0,否则就删除锁,并发布释放锁的消息,返回1;

       当线程完全释放锁后,就会调用cancelExpirationRenewal()方法取消"看门狗"的续时线程

        
       
       
       
      1. void cancelExpirationRenewal() { 
      2.  // expirationRenewalMap移除对应的key,就不会执行当前线程对应的"看门狗"程序了 
      3.     Timeout task = expirationRenewalMap.remove(getEntryName()); 
      4.     if (task != null) { 
      5.         task.cancel(); 
      6.     } 

      这就是释放锁的过程了,怎么样,是不是还是比较简单的,阅读起来比加锁那份代码舒服多了,当然啦,简单归简单,为了方便你们理清整个分布式锁的过程,我当然还是费心费力的给你们画流程图展示下啦(就冲这点,是不是该给我来个三连啊,哈哈):

      RedLock

      以上就是Redisson分布式锁的原理讲解,总的来说,就是简单的用lua脚本整合基本的set命令实现锁的功能,这也是很多Redis分布式锁工具的设计原理。除此之外,Redisson还支持用"RedLock算法"来实现锁的效果,这个工具类就是RedissonRedLock。

      用法也很简单,创建多个Redisson Node, 由这些无关联的Node就可以组成一个完整的分布式锁

        
       
       
       
      1. RLock lock1 = Redisson.create(config1).getLock(lockKey); 
      2. RLock lock2 = Redisson.create(config2).getLock(lockKey); 
      3. RLock lock3 = Redisson.create(config3).getLock(lockKey); 
      4.  
      5. RedissonRedLock redLock = new RedissonRedLock(lock1, lock2, lock3); 
      6. try { 
      7.    redLock.lock(); 
      8. } finally { 
      9.    redLock.unlock(); 

      RedLock算法原理方面我就不细说了,大家有兴趣可以看我之前的文章,或者是网上搜一下,简单的说就是能一定程度上能有效防止Redis实例单点故障的问题,但并不完全可靠,不管是哪种设计,光靠Redis本身都是无法保证锁的强一致性的。

      还是那句话,鱼和熊掌不可兼得,性能和安全方面也往往如此,Redis强大的性能和使用的方便足以满足日常的分布式锁需求,如果业务场景对锁的安全隐患无法忍受的话,最保底的方式就是在业务层做幂等处理。

      总结

      看了本文的源码解析,相信各位看官对Redisson分布式锁的设计也有了足够的了解,当然啦,虽然是讲解源码,我们的主要精力还是放在分布式锁的原理上,一些无关流程的代码就没有带大家字斟酌句的解读了,大家有兴趣的话可以自己去阅读看看,源码中很多地方都展示了一些基础并发工具和网络通信的妙用之处,学习一下还是挺有收获的。

      最后我还是想吐槽一下,Redisson的注释是真的少啊。

      本文名称:又长又细,万字长文带你解读Redisson分布式锁的源码
      文章网址:http://www.csdahua.cn/qtweb/news35/241535.html

      网站建设、网络推广公司-快上网,是专注品牌与效果的网站制作,网络营销seo公司;服务项目有等

      广告

      声明:本网站发布的内容(图片、视频和文字)以用户投稿、用户转载内容为主,如果涉及侵权请尽快告知,我们将会在第一时间删除。文章观点不代表本网站立场,如需处理请联系客服。电话:028-86922220;邮箱:631063699@qq.com。内容未经允许不得转载,或转载时需注明来源: 快上网