








  //获取互斥锁public void acquire()引发异常;//在给定的时间内获取互斥锁公共布尔获取(长时间,时间单位单位)抛出异常;//释放锁处理公共无效释放()引发异常;//如果此虚拟机(Java虚拟机的缩写)中的线程获取了互斥锁,则返回true boolean isaacquiredinthisprocess();接下来我们看看进程间互斥中的实现,它究竟有哪些属性,以及实现细节

  公共类进程间互斥实现进程间锁,revokableinterprocessmutex {//锁内部是真正实现操作动物园管理员的类,它内部包含连接动物园管理员客户端的馆长框架//锁内部的具体实现后面我会讲到私人最终锁内部构件;//基本路径是锁的根结点,所有的临时有序的节点都是基本路径的子节点,私有最终字符串basePath//private final ConcurrentMapThread,LockData threadData=maps。newconcurrentmap();//LockData封装了请求对应的线程(owningThread),锁的重入的次数(锁计数),线程对应的临时节点(锁定路径)私有静态类锁定数据{最终线程拥有线程;最终字符串锁定路径//原子性的最终原子整数锁计数=新原子整数(1);私有锁数据(线程拥有线程,字符串锁路径){ this。拥有线程=拥有线程;这个。锁定路径=锁定路径;} }私有静态最终字符串LOCK _ NAME= LOCK-;//获取互斥锁,阻塞【进程间锁的实现】@覆盖public void acquire()抛出异常{ //获取锁,一直等待如果(!internalLock(-1,null) ) { throw new IOException(试图获取lock: 基本路径时失去连接);} } //获取互斥锁,指定时间时间【进程间锁的实现】@ Override public boolean acquire(长时间,时间单位单位)抛出异常{ return internalLock(时间,单位);} //当前线程是否占用锁中【进程间锁的实现】@ Override public boolean isaacquiredinthisprocess(){ return(threaddata。大小()

  gt; 0); } //如果调用线程与获取互斥锁的线程相同,则执行一次互斥锁释放。如果线程已多次调用acquire,当此方法返回时,互斥锁仍将保留 【InterProcessLock的实现】 @Override public void release() throws Exception { Thread currentThread = Thread.currentThread(); //当前线程 LockData lockData = threadData.get(currentThread); //线程对应的锁信息 if ( lockData == null ) { throw new IllegalMonitorStateException("You do not own the lock: " + basePath); } // 因为获取到的锁是可重入的,对lockCount进行减1,lockCount=0时才是真正释放锁 int newLockCount = lockData.lockCount.decrementAndGet(); if ( newLockCount > 0 ) { return; } if ( newLockCount < 0 ) { throw new IllegalMonitorStateException("Lock count has gone negative for lock: " + basePath); } try { // 到这里时lockCount=0,具体释放锁的操作交给LockInternals中的releaseLock方法实现 internals.releaseLock(lockData.lockPath); } finally { threadData.remove(currentThread); } } // 获取basePath根结点下的所有临时节点的有序集合 public Collection<String> getParticipantNodes() throws Exception { return LockInternals.getParticipantNodes(internals.getClient(), basePath, internals.getLockName(), internals.getDriver()); } boolean isOwnedByCurrentThread() { LockData lockData = threadData.get(Thread.currentThread()); return (lockData != null) && (lockData.lockCount.get() > 0); } protected String getLockPath() { LockData lockData = threadData.get(Thread.currentThread()); return lockData != null ? lockData.lockPath : null; } // acquire()中调用的internalLock()方法 private boolean internalLock(long time, TimeUnit unit) throws Exception { Thread currentThread = Thread.currentThread(); LockData lockData = threadData.get(currentThread); if ( lockData != null ) { // 如果当前线程已经获取到了锁,那么将重入次数lockCount+1,返回true lockData.lockCount.incrementAndGet(); return true; } // attemptLock方法是获取锁的真正实现,lockPath是当前线程成功在basePath下创建的节点,若lockPath不为空代表成功获取到锁 String lockPath = internals.attemptLock(time, unit, getLockNodeBytes()); if ( lockPath != null ) { // lockPath封装到当前线程对应的锁信息中 LockData newLockData = new LockData(currentThread, lockPath); threadData.put(currentThread, newLockData); return true; } return false; }}接下来我们看看InterProcessMutex中使用的LockInternals类的实现细节


public class LockInternals { private final CuratorFramework client; // 连接zookeeper的客户端 private final String path;// 等于basePath,InterProcessMutex中传进来的 private final String basePath; // 根结点 private final LockInternalsDriver driver; // 操作zookeeper节点的driver private final String lockName; // "lock-" private final AtomicReference<RevocationSpec> revocable = new AtomicReference<RevocationSpec>(null); private final CuratorWatcher revocableWatcher = new CuratorWatcher() { @Override public void process(WatchedEvent event) throws Exception { if ( event.getType() == Watcher.Event.EventType.NodeDataChanged ) { checkRevocableWatcher(event.getPath()); } } }; // 监听节点的监听器,若被监听的节点有动静,则唤醒 notifyFromWatcher()=>notifyAll(); private final Watcher watcher = new Watcher() { @Override public void process(WatchedEvent event) { notifyFromWatcher(); } }; private volatile int maxLeases; // 获取basePath的子节点,排序后的 public static List<String> getSortedChildren(CuratorFramework client, String basePath, final String lockName, final LockInternalsSorter sorter) throws Exception { List<String> children = client.getChildren().forPath(basePath); List<String> sortedList = Lists.newArrayList(children); Collections.sort ( sortedList, new Comparator<String>() { @Override public int compare(String lhs, String rhs) { return sorter.fixForSorting(lhs, lockName).compareTo(sorter.fixForSorting(rhs, lockName)); } } ); return sortedList; } // 尝试获取锁【internalLock=>attemptLock】 String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception {// 开始时间 final long startMillis = System.currentTimeMillis(); // 记录等待时间 final Long millisToWait = (unit != null) ? unit.toMillis(time) : null; final byte[] localLockNodeBytes = (revocable.get() != null) ? new byte[0] : lockNodeBytes; // 重试次数 int retryCount = 0; // 当前节点 String ourPath = null; // 是否获取到锁的标志 boolean hasTheLock = false; // 是否放弃获取到标志 boolean isDone = false; // 不停尝试获取 while ( !isDone ) { isDone = true; try {// 创建当前线程对应的节点 ourPath = driver.createsTheLock(client, path, localLockNodeBytes); // internalLockLoop中获取 hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath); } catch ( KeeperException.NoNodeException e ) {// 是否可再次尝试 if ( client.getZookeeperClient().getRetryPolicy().allowRetry(retryCount++, System.currentTimeMillis() - startMillis, RetryLoop.getDefaultRetrySleeper()) ) { isDone = false; } else { throw e; } } }// 获取到锁后,返回当前线程对应创建的节点路径 if ( hasTheLock ) { return ourPath; } return null; } // 循环获取【attemptLock=>internalLockLoop】 private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception { boolean haveTheLock = false; // 是否拥有分布式锁 boolean doDelete = false;// 是否需要删除当前节点 try { if ( revocable.get() != null ) { client.getData().usingWatcher(revocableWatcher).forPath(ourPath); }// 循环尝试获取锁 while ( (client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock ) {// 得到basePath下排序后的临时子节点 List<String> children = getSortedChildren(); // 获取之前创建的当前线程对应的子节点 String sequenceNodeName = ourPath.substring(basePath.length() + 1); // +1 to include the slash// 判断是否获取到锁,没有就返回监听路径 PredicateResults predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases); // 成功获取到 if ( predicateResults.getsTheLock() ) { haveTheLock = true; } else {// 没有获取到锁,监听前一个临时顺序节点 String previousSequencePath = basePath + "/" + predicateResults.getPathToWatch(); synchronized(this) { try { // 上一个临时顺序节点如果被删除,会唤醒当前线程继续竞争锁 client.getData().usingWatcher(watcher).forPath(previousSequencePath); if ( millisToWait != null ) { millisToWait -= (System.currentTimeMillis() - startMillis); startMillis = System.currentTimeMillis(); // 获取锁超时 if ( millisToWait <= 0 ) { doDelete = true; // timed out - delete our node break; } wait(millisToWait); } else { wait(); } } catch ( KeeperException.NoNodeException e ) { // it has been deleted (i.e. lock released). Try to acquire again } } } } } catch ( Exception e ) { ThreadUtils.checkInterrupted(e); doDelete = true; throw e; } finally { if ( doDelete ) { // 因为获取锁超时,所以删除之前创建的临时子节点 deleteOurPath(ourPath); } } return haveTheLock; } private void deleteOurPath(String ourPath) throws Exception { try { // 删除 client.delete().guaranteed().forPath(ourPath); } catch ( KeeperException.NoNodeException e ) { // ignore - already deleted (possibly expired session, etc.) } } }
StandardLockInternalsDriver implements LockInternalsDriver


// 前面internalLockLoop方法中driver.getsTheLock执行的方法@Override public PredicateResults getsTheLock(CuratorFramework client, List<String> children, String sequenceNodeName, int maxLeases) throws Exception { // 获取子节点在临时顺序节点列表中的位置 int ourIndex = children.indexOf(sequenceNodeName); // 检验子节点在临时顺序节点列表中是否有效 validateOurIndex(sequenceNodeName, ourIndex); // 若当前子节点的位置<maxLeases,代表可获取锁【maxLeases默认=1,若ourIndex=0,代笔自己位置最小】 boolean getsTheLock = ourIndex < maxLeases; // getsTheLock=true,则不需要监听前maxLeases的节点【maxLeases默认=1,代表监听前面最靠近自己的节点】 String pathToWatch = getsTheLock ? null : children.get(ourIndex - maxLeases); return new PredicateResults(pathToWatch, getsTheLock); }



