zookeeper实现分布式锁的原理,基于zookeeper实现分布式锁

  zookeeper实现分布式锁的原理,基于zookeeper实现分布式锁

  

原理简介:

动物园管理员实现分布式锁的原理就是多个节点同时在一个指定的节点下面创建临时会话顺序节点,谁创建的节点序号最小,谁就获得了锁,并且其他节点就会监听序号比自己小的节点,一旦序号比自己小的节点被删除了,其他节点就会得到相应的事件,然后查看自己是否为序号最小的节点,如果是,则获取锁

  

zookeeper节点图分析

  进程间互斥实现的锁机制是公平且互斥的,公平的方式是按照每个请求的顺序进行排队的。

  进程间互斥实现的进程间锁接口,进程间锁主要规范了如下几个方法:

  //获取互斥锁public void acquire()引发异常;//在给定的时间内获取互斥锁公共布尔获取(长时间,时间单位单位)抛出异常;//释放锁处理公共无效释放()引发异常;//如果此虚拟机(Java虚拟机的缩写)中的线程获取了互斥锁,则返回true boolean isaacquiredinthisprocess();接下来我们看看进程间互斥中的实现,它究竟有哪些属性,以及实现细节

  公共类进程间互斥实现进程间锁,revokableinterprocessmutex {//锁内部是真正实现操作动物园管理员的类,它内部包含连接动物园管理员客户端的馆长框架//锁内部的具体实现后面我会讲到私人最终锁内部构件;//基本路径是锁的根结点,所有的临时有序的节点都是基本路径的子节点,私有最终字符串basePath//private final ConcurrentMapThread,LockData threadData=maps。newconcurrentmap();//LockData封装了请求对应的线程(owningThread),锁的重入的次数(锁计数),线程对应的临时节点(锁定路径)私有静态类锁定数据{最终线程拥有线程;最终字符串锁定路径//原子性的最终原子整数锁计数=新原子整数(1);私有锁数据(线程拥有线程,字符串锁路径){ this。拥有线程=拥有线程;这个。锁定路径=锁定路径;} }私有静态最终字符串LOCK _ NAME= LOCK-;//获取互斥锁,阻塞【进程间锁的实现】@覆盖public void acquire()抛出异常{ //获取锁,一直等待如果(!internalLock(-1,null) ) { throw new IOException(试图获取lock: 基本路径时失去连接);} } //获取互斥锁,指定时间时间【进程间锁的实现】@ Override public boolean acquire(长时间,时间单位单位)抛出异常{ return internalLock(时间,单位);} //当前线程是否占用锁中【进程间锁的实现】@ Override public boolean isaacquiredinthisprocess(){ return(threaddata。大小()

  gt; 0); } //如果调用线程与获取互斥锁的线程相同,则执行一次互斥锁释放。如果线程已多次调用acquire,当此方法返回时,互斥锁仍将保留 【InterProcessLock的实现】 @Override public void release() throws Exception { Thread currentThread = Thread.currentThread(); //当前线程 LockData lockData = threadData.get(currentThread); //线程对应的锁信息 if ( lockData == null ) { throw new IllegalMonitorStateException("You do not own the lock: " + basePath); } // 因为获取到的锁是可重入的,对lockCount进行减1,lockCount=0时才是真正释放锁 int newLockCount = lockData.lockCount.decrementAndGet(); if ( newLockCount > 0 ) { return; } if ( newLockCount < 0 ) { throw new IllegalMonitorStateException("Lock count has gone negative for lock: " + basePath); } try { // 到这里时lockCount=0,具体释放锁的操作交给LockInternals中的releaseLock方法实现 internals.releaseLock(lockData.lockPath); } finally { threadData.remove(currentThread); } } // 获取basePath根结点下的所有临时节点的有序集合 public Collection<String> getParticipantNodes() throws Exception { return LockInternals.getParticipantNodes(internals.getClient(), basePath, internals.getLockName(), internals.getDriver()); } boolean isOwnedByCurrentThread() { LockData lockData = threadData.get(Thread.currentThread()); return (lockData != null) && (lockData.lockCount.get() > 0); } protected String getLockPath() { LockData lockData = threadData.get(Thread.currentThread()); return lockData != null ? lockData.lockPath : null; } // acquire()中调用的internalLock()方法 private boolean internalLock(long time, TimeUnit unit) throws Exception { Thread currentThread = Thread.currentThread(); LockData lockData = threadData.get(currentThread); if ( lockData != null ) { // 如果当前线程已经获取到了锁,那么将重入次数lockCount+1,返回true lockData.lockCount.incrementAndGet(); return true; } // attemptLock方法是获取锁的真正实现,lockPath是当前线程成功在basePath下创建的节点,若lockPath不为空代表成功获取到锁 String lockPath = internals.attemptLock(time, unit, getLockNodeBytes()); if ( lockPath != null ) { // lockPath封装到当前线程对应的锁信息中 LockData newLockData = new LockData(currentThread, lockPath); threadData.put(currentThread, newLockData); return true; } return false; }}接下来我们看看InterProcessMutex中使用的LockInternals类的实现细节

  

public class LockInternals { private final CuratorFramework client; // 连接zookeeper的客户端 private final String path;// 等于basePath,InterProcessMutex中传进来的 private final String basePath; // 根结点 private final LockInternalsDriver driver; // 操作zookeeper节点的driver private final String lockName; // "lock-" private final AtomicReference<RevocationSpec> revocable = new AtomicReference<RevocationSpec>(null); private final CuratorWatcher revocableWatcher = new CuratorWatcher() { @Override public void process(WatchedEvent event) throws Exception { if ( event.getType() == Watcher.Event.EventType.NodeDataChanged ) { checkRevocableWatcher(event.getPath()); } } }; // 监听节点的监听器,若被监听的节点有动静,则唤醒 notifyFromWatcher()=>notifyAll(); private final Watcher watcher = new Watcher() { @Override public void process(WatchedEvent event) { notifyFromWatcher(); } }; private volatile int maxLeases; // 获取basePath的子节点,排序后的 public static List<String> getSortedChildren(CuratorFramework client, String basePath, final String lockName, final LockInternalsSorter sorter) throws Exception { List<String> children = client.getChildren().forPath(basePath); List<String> sortedList = Lists.newArrayList(children); Collections.sort ( sortedList, new Comparator<String>() { @Override public int compare(String lhs, String rhs) { return sorter.fixForSorting(lhs, lockName).compareTo(sorter.fixForSorting(rhs, lockName)); } } ); return sortedList; } // 尝试获取锁【internalLock=>attemptLock】 String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception {// 开始时间 final long startMillis = System.currentTimeMillis(); // 记录等待时间 final Long millisToWait = (unit != null) ? unit.toMillis(time) : null; final byte[] localLockNodeBytes = (revocable.get() != null) ? new byte[0] : lockNodeBytes; // 重试次数 int retryCount = 0; // 当前节点 String ourPath = null; // 是否获取到锁的标志 boolean hasTheLock = false; // 是否放弃获取到标志 boolean isDone = false; // 不停尝试获取 while ( !isDone ) { isDone = true; try {// 创建当前线程对应的节点 ourPath = driver.createsTheLock(client, path, localLockNodeBytes); // internalLockLoop中获取 hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath); } catch ( KeeperException.NoNodeException e ) {// 是否可再次尝试 if ( client.getZookeeperClient().getRetryPolicy().allowRetry(retryCount++, System.currentTimeMillis() - startMillis, RetryLoop.getDefaultRetrySleeper()) ) { isDone = false; } else { throw e; } } }// 获取到锁后,返回当前线程对应创建的节点路径 if ( hasTheLock ) { return ourPath; } return null; } // 循环获取【attemptLock=>internalLockLoop】 private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception { boolean haveTheLock = false; // 是否拥有分布式锁 boolean doDelete = false;// 是否需要删除当前节点 try { if ( revocable.get() != null ) { client.getData().usingWatcher(revocableWatcher).forPath(ourPath); }// 循环尝试获取锁 while ( (client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock ) {// 得到basePath下排序后的临时子节点 List<String> children = getSortedChildren(); // 获取之前创建的当前线程对应的子节点 String sequenceNodeName = ourPath.substring(basePath.length() + 1); // +1 to include the slash// 判断是否获取到锁,没有就返回监听路径 PredicateResults predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases); // 成功获取到 if ( predicateResults.getsTheLock() ) { haveTheLock = true; } else {// 没有获取到锁,监听前一个临时顺序节点 String previousSequencePath = basePath + "/" + predicateResults.getPathToWatch(); synchronized(this) { try { // 上一个临时顺序节点如果被删除,会唤醒当前线程继续竞争锁 client.getData().usingWatcher(watcher).forPath(previousSequencePath); if ( millisToWait != null ) { millisToWait -= (System.currentTimeMillis() - startMillis); startMillis = System.currentTimeMillis(); // 获取锁超时 if ( millisToWait <= 0 ) { doDelete = true; // timed out - delete our node break; } wait(millisToWait); } else { wait(); } } catch ( KeeperException.NoNodeException e ) { // it has been deleted (i.e. lock released). Try to acquire again } } } } } catch ( Exception e ) { ThreadUtils.checkInterrupted(e); doDelete = true; throw e; } finally { if ( doDelete ) { // 因为获取锁超时,所以删除之前创建的临时子节点 deleteOurPath(ourPath); } } return haveTheLock; } private void deleteOurPath(String ourPath) throws Exception { try { // 删除 client.delete().guaranteed().forPath(ourPath); } catch ( KeeperException.NoNodeException e ) { // ignore - already deleted (possibly expired session, etc.) } } }
StandardLockInternalsDriver implements LockInternalsDriver

  

// 前面internalLockLoop方法中driver.getsTheLock执行的方法@Override public PredicateResults getsTheLock(CuratorFramework client, List<String> children, String sequenceNodeName, int maxLeases) throws Exception { // 获取子节点在临时顺序节点列表中的位置 int ourIndex = children.indexOf(sequenceNodeName); // 检验子节点在临时顺序节点列表中是否有效 validateOurIndex(sequenceNodeName, ourIndex); // 若当前子节点的位置<maxLeases,代表可获取锁【maxLeases默认=1,若ourIndex=0,代笔自己位置最小】 boolean getsTheLock = ourIndex < maxLeases; // getsTheLock=true,则不需要监听前maxLeases的节点【maxLeases默认=1,代表监听前面最靠近自己的节点】 String pathToWatch = getsTheLock ? null : children.get(ourIndex - maxLeases); return new PredicateResults(pathToWatch, getsTheLock); }
用InterProcessMutex在自己业务实现分布式锁,请点击此链接阅读点我

  到此这篇关于InterProcessMutex实现zookeeper分布式锁原理的文章就介绍到这了,更多相关InterProcessMutex实现zookeeper分布式锁内容请搜索盛行IT以前的文章或继续浏览下面的相关文章希望大家以后多多支持盛行IT!

郑重声明:本文由网友发布,不代表盛行IT的观点,版权归原作者所有,仅为传播更多信息之目的,如有侵权请联系,我们将第一时间修改或删除,多谢。

留言与评论(共有 条评论)
   
验证码: