Redisson源码解读(redis源码是什么语言)

  本篇文章为你整理了Redisson源码解读(redis源码是什么语言)的详细内容,包含有redis源码解析 redis源码是什么语言 redis源码github redis cluster 源码 Redisson源码解读,希望能帮助你了解 Redisson源码解读。

  我在上一篇文章聊了Redisson的可重入锁,这次继续来聊聊Redisson的公平锁。下面是官方原话:

  它保证了当多个Redisson客户端线程同时请求加锁时,优先分配给先发出请求的线程。所有请求线程会在一个队列中排队,当某个线程出现宕机时,Redisson会等待5秒后继续下一个线程,也就是说如果前面有5个线程都处于等待状态,那么后面的线程会等待至少25秒。

  源码版本:3.17.7

  这是我 fork 的分支,添加了自己理解的中文注释:https://github.com/xiaoguyu/redisson

  先上官方例子:

  

RLock fairLock = redisson.getFairLock("anyLock");

 

  // 尝试加锁,最多等待100秒,上锁以后10秒自动解锁

  boolean res = fairLock.tryLock(100, 10, TimeUnit.SECONDS);

  fairLock.unlock();

  

 

  因为在Redisson中,公平锁和普通可重入锁的逻辑大体上一样,我在上一篇文章都介绍了,这里就不再赘述。下面开始介绍合理逻辑。

  加锁的 lua 脚本在 RedissonFairLock#tryLockInnerAsync方法中

  

 T RFuture T tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand T command) {

 

   long wait = threadWaitTime;

   if (waitTime 0) {

   wait = unit.toMillis(waitTime);

   long currentTime = System.currentTimeMillis();

   if (command == RedisCommands.EVAL_NULL_BOOLEAN) {

   ......

   if (command == RedisCommands.EVAL_LONG) {

   return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,

   // remove stale threads

   "while true do " + // list为空,证明没有人排队,退出循环

   "local firstThreadId2 = redis.call(lindex, KEYS[2], 0);" +

   "if firstThreadId2 == false then " +

   "break;" +

   "end;" +

   // 能到这里,证明有人排队,拿出在排队的第一个人的超时时间,如果超时了,则移除相应数据

   "local timeout = tonumber(redis.call(zscore, KEYS[3], firstThreadId2));" +

   "if timeout = tonumber(ARGV[4]) then " +

   // remove the item from the queue and timeout set

   // NOTE we do not alter any other timeout

   "redis.call(zrem, KEYS[3], firstThreadId2);" +

   "redis.call(lpop, KEYS[2]);" +

   "else " +

   "break;" +

   "end;" +

   "end;" +

   // check if the lock can be acquired now

   // 检查是否可以获取锁。如果hash和list都不存在,或者线程队列的第一个是当前线程,则可以获取锁

   "if (redis.call(exists, KEYS[1]) == 0) " +

   "and ((redis.call(exists, KEYS[2]) == 0) " +

   "or (redis.call(lindex, KEYS[2], 0) == ARGV[2])) then " +

   // remove this thread from the queue and timeout set

   // 都获取锁了,当然要从线程队列和时间队列中移除

   "redis.call(lpop, KEYS[2]);" +

   "redis.call(zrem, KEYS[3], ARGV[2]);" +

   // decrease timeouts for all waiting in the queue

   // 刷新时间集合中的时间

   "local keys = redis.call(zrange, KEYS[3], 0, -1);" +

   "for i = 1, #keys, 1 do " +

   "redis.call(zincrby, KEYS[3], -tonumber(ARGV[3]), keys[i]);" +

   "end;" +

   // acquire the lock and set the TTL for the lease

   // 和公平锁的设置一样,值加1并且设置过期时间

   "redis.call(hset, KEYS[1], ARGV[2], 1);" +

   "redis.call(pexpire, KEYS[1], ARGV[1]);" +

   "return nil;" +

   "end;" +

   // check if the lock is already held, and this is a re-entry

   // 能到这里,证明前面拿不到锁,但是也要做可重入锁的处理

   "if redis.call(hexists, KEYS[1], ARGV[2]) == 1 then " +

   "redis.call(hincrby, KEYS[1], ARGV[2],1);" +

   "redis.call(pexpire, KEYS[1], ARGV[1]);" +

   "return nil;" +

   "end;" +

   // the lock cannot be acquired

   // check if the thread is already in the queue

   // 时间集合中有值,证明线程已经在队列中,不需要往后执行逻辑了

   "local timeout = redis.call(zscore, KEYS[3], ARGV[2]);" +

   "if timeout ~= false then " +

   // the real timeout is the timeout of the prior thread

   // in the queue, but this is approximately correct, and

   // avoids having to traverse the queue

   // 因为下面的timeout = ttl + tonumber(ARGV[3]) + tonumber(ARGV[4])

   // 所以这里的ttl = timeout - tonumber(ARGV[3]) - tonumber(ARGV[4])

   "return timeout - tonumber(ARGV[3]) - tonumber(ARGV[4]);" +

   "end;" +

   // add the thread to the queue at the end, and set its timeout in the timeout set to the timeout of

   // the prior thread in the queue (or the timeout of the lock if the queue is empty) plus the

   // threadWaitTime

   "local lastThreadId = redis.call(lindex, KEYS[2], -1);" +

   "local ttl;" +

   // 如果最后一个线程不是当前线程,则从时间集合取出(举例:线程1/2/3按顺序获取锁,此时pttl得到的是线程1的锁过期时间,zscore拿到的是线程2的锁的过期时间,此时线程3应该以线程2的为准)

   "if lastThreadId ~= false and lastThreadId ~= ARGV[2] then " +

   "ttl = tonumber(redis.call(zscore, KEYS[3], lastThreadId)) - tonumber(ARGV[4]);" +

   "else " +

   // 否则直接获取锁的存活时间

   "ttl = redis.call(pttl, KEYS[1]);" +

   "end;" +

   // 过期时间 = 锁存活时间 + 等待时间 + 当前时间戳

   "local timeout = ttl + tonumber(ARGV[3]) + tonumber(ARGV[4]);" +

   // 如果添加到时间集合成功,则同时添加线程集合

   "if redis.call(zadd, KEYS[3], timeout, ARGV[2]) == 1 then " +

   "redis.call(rpush, KEYS[2], ARGV[2]);" +

   "end;" +

   "return ttl;",

   Arrays.asList(getRawName(), threadsQueueName, timeoutSetName),

   unit.toMillis(leaseTime), getLockName(threadId), wait, currentTime);

   throw new IllegalArgumentException();

  

 

  公平锁总共用了Redis的三种数据类型,对应着 lua 脚本里面的keys1、2、3的参数:

  
KEYS[1]

  锁的名字,使用 Hash 数据类型,是可重入锁的基础,结构为 {”threadId1”: 1, “thread2”: 1},key为线程id,value是锁的次数

  
KEYS[2]

  线程队列的名字,使用 List 数据类型,结构为 [ “threadId1”, “threadId2” ],按顺序存放需要获取锁的线程的id

  
KEYS[3]

  时间队列的名字,使用 sorted set 数据类型,结构为 {”threadId2”:123, “threadId1”:190},key为线程id,value为获取锁的超时时间戳

  
我下面会用 锁、线程队列、时间队列 来表示这3个数据结构,需要注意下我的表述。

  同样的,介绍下参数:

  ARGV[1]:leaseTime 锁的持有时间

  ARGV[2]:线程id(描述不太准确,暂时按这样理解)

  ARGV[3]:waitTime 尝试获取锁的最大等待时间

  ARGV[4]:currentTime 当前时间戳

  接下来,我们一段一段分析 lua 脚本,首先看最开始的 while 循环

  

"while true do " + // list为空,证明没有人排队,退出循环

 

   "local firstThreadId2 = redis.call(lindex, KEYS[2], 0);" +

   "if firstThreadId2 == false then " +

   "break;" +

   "end;" +

   // 能到这里,证明有人排队,拿出在排队的第一个人的超时时间,如果超时了,则移除相应数据

   "local timeout = tonumber(redis.call(zscore, KEYS[3], firstThreadId2));" +

   "if timeout = tonumber(ARGV[4]) then " +

   // 从时间队列和线程队列中移除

   "redis.call(zrem, KEYS[3], firstThreadId2);" +

   "redis.call(lpop, KEYS[2]);" +

   "else " +

   "break;" +

   "end;" +

  "end;" +

  

 

  具体的逻辑我在注释中写的很清楚了,看的时候记住 KEYS[2]、KEYS[3] 对应着线程队列和时间队列接口。主要注意的是,线程队列只有当一个线程持有锁,另一个线程获取不到锁时,才会有值(前面有人才排队,没人排什么队)。接着看第二段

  

// 检查是否可以获取锁。当锁不存在,并且线程队列不存在或者线程队列第一位是当前线程,则可以获取锁

 

  "if (redis.call(exists, KEYS[1]) == 0) " +

   "and ((redis.call(exists, KEYS[2]) == 0) or (redis.call(lindex, KEYS[2], 0) == ARGV[2])) then " +

   // remove this thread from the queue and timeout set

   // 都获取锁了,当然要从线程队列和时间队列中移除

   "redis.call(lpop, KEYS[2]);" +

   "redis.call(zrem, KEYS[3], ARGV[2]);" +

   // decrease timeouts for all waiting in the queue

   // 刷新时间队列中的时间

   "local keys = redis.call(zrange, KEYS[3], 0, -1);" +

   "for i = 1, #keys, 1 do " +

   "redis.call(zincrby, KEYS[3], -tonumber(ARGV[3]), keys[i]);" +

   "end;" +

   // acquire the lock and set the TTL for the lease

   // 和公平锁的设置一样,值加1并且设置过期时间

   "redis.call(hset, KEYS[1], ARGV[2], 1);" +

   "redis.call(pexpire, KEYS[1], ARGV[1]);" +

   "return nil;" +

  "end;" +

  

 

  翻译翻译就是,锁不存在(别人没有持有锁)并且线程队列不存在或者线程队列第一位是当前线程(不用排队或者自己排第一)才能获得锁。因为时间队列中存放的是各个线程等待锁的超时时间戳,所以每次都需要刷新下。继续下一段逻辑

  

// 能到这里,证明前面拿不到锁,但是也要做可重入锁的处理

 

  "if redis.call(hexists, KEYS[1], ARGV[2]) == 1 then " +

   "redis.call(hincrby, KEYS[1], ARGV[2],1);" +

   "redis.call(pexpire, KEYS[1], ARGV[1]);" +

   "return nil;" +

  "end;" +

  

 

  这是可重入锁的处理,继续下一段

  

// 时间队列中有值,证明线程已经在队列中,不需要往后执行逻辑了

 

  "local timeout = redis.call(zscore, KEYS[3], ARGV[2]);" +

  "if timeout ~= false then " +

   // the real timeout is the timeout of the prior thread

   // in the queue, but this is approximately correct, and

   // avoids having to traverse the queue

   // 因为下面的timeout = ttl + tonumber(ARGV[3]) + tonumber(ARGV[4])

   // 所以这里的ttl = timeout - tonumber(ARGV[3]) - tonumber(ARGV[4])

   "return timeout - tonumber(ARGV[3]) - tonumber(ARGV[4]);" +

  "end;" +

  

 

  举例子:线程1持有锁,线程2尝试第一次获取锁(不进入这段if),线程2第二次获取锁(进入了这段if)。继续下一段

  

"local lastThreadId = redis.call(lindex, KEYS[2], -1);" +

 

  "local ttl;" +

  // 如果最后一个线程不是当前线程,则从时间集合取出(举例:线程1/2/3按顺序获取锁,此时pttl得到的是线程1的锁过期时间,zscore拿到的是线程2的锁的过期时间,此时线程3应该以线程2的为准)

  "if lastThreadId ~= false and lastThreadId ~= ARGV[2] then " +

   "ttl = tonumber(redis.call(zscore, KEYS[3], lastThreadId)) - tonumber(ARGV[4]);" +

  "else " +

   // 否则直接获取锁的存活时间

   "ttl = redis.call(pttl, KEYS[1]);" +

  "end;" +

  // 过期时间 = 锁存活时间 + 等待时间 + 当前时间戳

  "local timeout = ttl + tonumber(ARGV[3]) + tonumber(ARGV[4]);" +

  // 如果添加到时间集合成功,则同时添加线程集合

  "if redis.call(zadd, KEYS[3], timeout, ARGV[2]) == 1 then " +

   "redis.call(rpush, KEYS[2], ARGV[2]);" +

  "end;" +

  "return ttl;",

  

 

  ttl 这段的获取逻辑,翻译翻译就是,如果前面有人排队,就以前面的超时时间为准,如果没人排队,就拿锁的超时时间。获取到 ttl ,就对添加到线程集合和时间集合。

  以上就是公平锁的加锁 lua 脚本的全部逻辑。讲的有点乱,但是只要能搞清楚keys1、2、3对应着哪种数据类型,理解整个逻辑应该问题不大。

  解锁的核心 lua 脚本是下面这段RedissonFairLock#unlockInnerAsync

  

protected RFuture Boolean unlockInnerAsync(long threadId) {

 

   return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,

   // remove stale threads

   "while true do " // 线程队列为空,证明没有人排队,退出循环

   + "local firstThreadId2 = redis.call(lindex, KEYS[2], 0);"

   + "if firstThreadId2 == false then "

   + "break;"

   + "end; "

   // 能到这里,证明有人排队,拿出在排队的第一个人的超时时间,如果超时了,则移除相应数据

   + "local timeout = tonumber(redis.call(zscore, KEYS[3], firstThreadId2));"

   + "if timeout = tonumber(ARGV[4]) then "

   + "redis.call(zrem, KEYS[3], firstThreadId2); "

   + "redis.call(lpop, KEYS[2]); "

   + "else "

   + "break;"

   + "end; "

   + "end;"

   // 如果锁不存在,则通过订阅发布机制通知下一个等待中的线程

   + "if (redis.call(exists, KEYS[1]) == 0) then " +

   "local nextThreadId = redis.call(lindex, KEYS[2], 0); " +

   "if nextThreadId ~= false then " +

   "redis.call(publish, KEYS[4] .. : .. nextThreadId, ARGV[1]); " +

   "end; " +

   "return 1; " +

   "end;" +

   // 如果当前线程已经不存在锁里面,直接返回null

   "if (redis.call(hexists, KEYS[1], ARGV[3]) == 0) then " +

   "return nil;" +

   "end; " +

   // 可重入锁处理逻辑,对当前线程的锁次数减1

   "local counter = redis.call(hincrby, KEYS[1], ARGV[3], -1); " +

   "if (counter 0) then " +

   // 锁次数仍然大于0,则刷新锁的存活时间

   "redis.call(pexpire, KEYS[1], ARGV[2]); " +

   "return 0; " +

   "end; " +

   // 删除锁

   "redis.call(del, KEYS[1]); " +

   // 订阅发布机制通知下一个等待中的线程

   "local nextThreadId = redis.call(lindex, KEYS[2], 0); " +

   "if nextThreadId ~= false then " +

   "redis.call(publish, KEYS[4] .. : .. nextThreadId, ARGV[1]); " +

   "end; " +

   "return 1; ",

   Arrays.asList(getRawName(), threadsQueueName, timeoutSetName, getChannelName()),

   LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId), System.currentTimeMillis());

  

 

  算了,不想写了,看注释吧。

  本文介绍了Redisson的公平锁,逻辑大体上和普通可重入锁一致,核心在于 lua 脚本,运用了Redis的3种数据类型。

  以上就是Redisson源码解读(redis源码是什么语言)的详细内容,想要了解更多 Redisson源码解读的内容,请持续关注盛行IT软件开发工作室。

郑重声明:本文由网友发布,不代表盛行IT的观点,版权归原作者所有,仅为传播更多信息之目的,如有侵权请联系,我们将第一时间修改或删除,多谢。

留言与评论(共有 条评论)
   
验证码: