redis分布式锁实现代码,redisson分布式锁实现原理
Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId)); }其实它的本质是调用一段 LUA 脚本进行加锁。
锁续期设计
锁的续期是在org.redisson.RedissonLock#tryAcquireAsync
方法中调用 scheduleExpirationRenewal
实现的。续期需要注意的是,看门狗是设置在主线程的延迟队列的线程中。
tryAcquireAsync
代码如下:
private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) { RFuture<Long> ttlRemainingFuture; if (leaseTime != -1) { ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG); } else { ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime, TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG); } CompletionStage<Long> f = ttlRemainingFuture.thenApply(ttlRemaining -> { // lock acquired if (ttlRemaining == null) { if (leaseTime != -1) { internalLockLeaseTime = unit.toMillis(leaseTime); } else { // 锁过期时间续期 scheduleExpirationRenewal(threadId); } } return ttlRemaining; }); return new CompletableFutureWrapper<>(f);}锁续期
scheduleExpirationRenewal
代码如下:
protected void scheduleExpirationRenewal(long threadId) { ExpirationEntry entry = new ExpirationEntry(); ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry); if (oldEntry != null) { oldEntry.addThreadId(threadId); } else { entry.addThreadId(threadId); try { renewExpiration(); } finally { if (Thread.currentThread().isInterrupted()) { cancelExpirationRenewal(threadId); } } }}然后在调用
renewExpiration();
执行续期逻辑
private void renewExpiration() { ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName()); if (ee == null) { return; } // 创建延迟任务 Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() { @Override public void run(Timeout timeout) throws Exception { ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName()); if (ent == null) { return; } Long threadId = ent.getFirstThreadId(); if (threadId == null) { return; } // 真正的续期,调用 LUA 脚本续期 RFuture<Boolean> future = renewExpirationAsync(threadId); future.whenComplete((res, e) -> { if (e != null) { log.error("Cant update lock " + getRawName() + " expiration", e); EXPIRATION_RENEWAL_MAP.remove(getEntryName()); return; } // 如果续期成功 if (res) { // reschedule itself renewExpiration(); } else { cancelExpirationRenewal(null); } }); } }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS); ee.setTimeout(task);}
renewExpirationAsync
方法, 里面还是一段 LUA 脚本,进行重新设置锁的过期时间。
protected RFuture<Boolean> renewExpirationAsync(long threadId) { return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call(hexists, KEYS[1], ARGV[2]) == 1) then " + "redis.call(pexpire, KEYS[1], ARGV[1]); " + "return 1; " + "end; " + "return 0;", Collections.singletonList(getRawName()), internalLockLeaseTime, getLockName(threadId)); }
锁的自旋重试
org.redisson.RedissonLock#lock(long, java.util.concurrent.TimeUnit, boolean)
在执行获取锁失败的时候,会进入重试。其实这里就会执行 18 行以后的 while (true)
逻辑
private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException { long threadId = Thread.currentThread().getId(); Long ttl = tryAcquire(-1, leaseTime, unit, threadId); // lock acquired if (ttl == null) { return; } CompletableFuture<RedissonLockEntry> future = subscribe(threadId); RedissonLockEntry entry; if (interruptibly) { entry = commandExecutor.getInterrupted(future); } else { entry = commandExecutor.get(future); } try { while (true) { ttl = tryAcquire(-1, leaseTime, unit, threadId); // lock acquired if (ttl == null) { break; } // waiting for message if (ttl >= 0) { try { // 阻塞锁的超时时间,等锁过期后再尝试加锁 entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { if (interruptibly) { throw e; } entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); } } else { if (interruptibly) { entry.getLatch().acquire(); } else { entry.getLatch().acquireUninterruptibly(); } } } } finally { unsubscribe(entry, threadId); }// get(lockAsync(leaseTime, unit));}
entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
其实这里就是一个间歇性自旋。 等到上次锁过期的时间,在唤醒进行抢锁 entry.getLatch().acquire();
还有一个逻辑就是
CompletableFuture future = subscribe(threadId);这里其实是会订阅一个消息,如果解锁过后,会发布解锁的消息。
解锁设计
rLock.unlock(); 的核心就是释放锁,撤销续期和唤醒在等待加锁的线程(发布解锁成功消息)。 核心方法(解锁): org.redisson.RedissonLock#unlockInnerAsync
protected RFuture<Boolean> unlockInnerAsync(long threadId) { return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call(hexists, KEYS[1], ARGV[3]) == 0) then " + "return nil;" + "end; " + "local counter = redis.call(hincrby, KEYS[1], ARGV[3], -1); " + "if (counter > 0) then " + "redis.call(pexpire, KEYS[1], ARGV[2]); " + "return 0; " + "else " + "redis.call(del, KEYS[1]); " + // 发布解锁成功消息 "redis.call(publish, KEYS[2], ARGV[1]); " + "return 1; " + "end; " + "return nil;", Arrays.asList(getRawName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId)); }还是 LUA 的执行方式。
撤销锁续期
核心方法org.redisson.RedissonBaseLock#unlockAsync(long)
@Overridepublic RFuture<Void> unlockAsync(long threadId) { // 解锁 RFuture<Boolean> future = unlockInnerAsync(threadId); // 撤销续期 CompletionStage<Void> f = future.handle((opStatus, e) -> { cancelExpirationRenewal(threadId); if (e != null) { throw new CompletionException(e); } if (opStatus == null) { IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: " + id + " thread-id: " + threadId); throw new CompletionException(cause); } return null; }); return new CompletableFutureWrapper<>(f);}
解锁成功唤排队线程
在org.redisson.pubsub.LockPubSub#onMessage
中回去唤醒阻塞的线程,让执行前面的锁自旋逻辑,具体代码如下:
@Overrideprotected void onMessage(RedissonLockEntry value, Long message) { if (message.equals(UNLOCK_MESSAGE)) { Runnable runnableToExecute = value.getListeners().poll(); if (runnableToExecute != null) { runnableToExecute.run(); } value.getLatch().release(); } else if (message.equals(READ_UNLOCK_MESSAGE)) { while (true) { Runnable runnableToExecute = value.getListeners().poll(); if (runnableToExecute == null) { break; } runnableToExecute.run(); } value.getLatch().release(value.getLatch().getQueueLength()); }}到此这篇关于redisson 实现分布式锁的文章就介绍到这了,更多相关redisson 分布式锁内容请搜索盛行IT以前的文章或继续浏览下面的相关文章希望大家以后多多支持盛行IT!
郑重声明:本文由网友发布,不代表盛行IT的观点,版权归原作者所有,仅为传播更多信息之目的,如有侵权请联系,我们将第一时间修改或删除,多谢。