Zookeeper学习笔记(zookeeper入门书籍)

  本篇文章为你整理了Zookeeper学习笔记(zookeeper入门书籍)的详细内容,包含有zookeeper教程 zookeeper入门书籍 zookeeper知识点 zookeeper入门初体验第一关 Zookeeper学习笔记,希望能帮助你了解 Zookeeper学习笔记。

  Zookeeper,为分布式框架提供协调服务,基于观察者模式。

  负责存储管理大家关心的数据,接受观察者的注册,当数据状态发生变化,Zookeeper负责同志在Zookeeper上注册的观察者。

  Zookeeper=文件系统+监听机制

  一个Leader,多个Follower组成为集群

  集群存活节点超过1/2,就能正常服务(适合安装奇数台服务器)

  全局数据一致

  同一个Client更新请求按顺序依次执行

  数据更新原子性

  实时性,Client能读到最新的数据

  3)数据结构

  Zookeeper数据模型类似Unix的文件系统,每个节点叫做ZNode。

  每个ZNode能存储1MB数据(存储数据量小),能通过路径唯一标识

  4)应用场景

  
分布式环境,需要对服务进行统一的命名,便于识别。

  例:通过域名,分发到不同IP的服务器上(这点类似于Nginx),当一台挂了,其他的还能工作

  
#创建test2节点,会自动对节点加上编号

  #比如此节点可能创建为test20000000013,为创建的第13个节点,所有节点共用同一个编号

  

 

 

  
import org.apache.curator.RetryPolicy;

  import org.apache.curator.framework.CuratorFramework;

  import org.apache.curator.framework.CuratorFrameworkFactory;

  import org.apache.curator.retry.ExponentialBackoffRetry;

  import org.junit.Test;

   * 创建client连接

  public class ConnectTest {

   @Test

   public void method1(){

   //重试机制

   RetryPolicy retryPolic=new ExponentialBackoffRetry(3000,10);

   //客户端连接建立

   CuratorFramework client = CuratorFrameworkFactory.newClient("101.43.244.40", retryPolic);

   //开启连接

   client.start();

   @Test

   public void method2(){

   //重试机制

   RetryPolicy retryPolic=new ExponentialBackoffRetry(3000,10);

   CuratorFramework client = CuratorFrameworkFactory.builder()

   .connectString("101.43.244.40")

   .retryPolicy(retryPolic)

   // zookeeper根目录为/zko0,不为/

   .namespace("zko0")

   .build();

   client.start();

  

 

 

  

package com.zko0;

 

  import lombok.extern.slf4j.Slf4j;

  import org.apache.curator.RetryPolicy;

  import org.apache.curator.framework.CuratorFramework;

  import org.apache.curator.framework.CuratorFrameworkFactory;

  import org.apache.curator.retry.ExponentialBackoffRetry;

  import org.apache.zookeeper.CreateMode;

  import org.junit.After;

  import org.junit.Before;

  import org.junit.Test;

   * 创建节点的测试类

  @Slf4j

  public class CreateNodeTest {

   private CuratorFramework client;

   @Before

   //在create前执行

   public void connect(){

   //重试机制

   RetryPolicy retryPolic=new ExponentialBackoffRetry(3000,10);

   client = CuratorFrameworkFactory.builder()

   .connectString("101.43.244.40")

   .retryPolicy(retryPolic)

   // zookeeper根目录为/zko0,不为/

   .namespace("zko0")

   .build();

   client.start();

  
//如果没有指定数据,那么会将客户端的ip作为存储的数据

   public void basicCreate() throws Exception {

   String path = client.create().forPath("/test1");

   log.info(path);

   @Test

   //2.创建,同时添加数据

   public void createWithMessage() throws Exception {

   String path = client.create().forPath("/test2","fuck".getBytes());

   log.info(path);

   @Test

   //3.创建临时节点

   //Client会话结束节点会消息,所以创建完就消失了

   public void createEphemeral() throws Exception {

   client.create()

   .withMode(CreateMode.EPHEMERAL)

   .forPath("/test3");

  
//creatingParentsIfNeeded如果需要创建多级节点

   public void create2() throws Exception {

   client.create()

   .creatingParentsIfNeeded()

   .forPath("/directory/test5");

   @After

   //在create执行后执行

   public void close(){

   if (client!=null){

   client.close();

  

 

  

package com.zko0;

 

  import lombok.extern.slf4j.Slf4j;

  import org.apache.curator.RetryPolicy;

  import org.apache.curator.framework.CuratorFramework;

  import org.apache.curator.framework.CuratorFrameworkFactory;

  import org.apache.curator.retry.ExponentialBackoffRetry;

  import org.apache.zookeeper.data.Stat;

  import org.junit.After;

  import org.junit.Before;

  import org.junit.Test;

  import java.util.List;

  @Slf4j

  public class CheckNodeTest {

   private CuratorFramework client;

   @Before

   //在create前执行

   public void connect(){

   //重试机制

   RetryPolicy retryPolic=new ExponentialBackoffRetry(3000,10);

   client = CuratorFrameworkFactory.builder()

   .connectString("101.43.244.40")

   .retryPolicy(retryPolic)

   // zookeeper根目录为/zko0,不为/

   .namespace("zko0")

   .build();

   client.start();

   @Test

   //get方法

   public void get() throws Exception {

   byte[] bytes = client.getData().forPath("/test1");

   log.info(new String(bytes));

  
public void getChildren() throws Exception {

   List String list = client.getChildren().forPath("/directory");

   log.info(list.toString());

  
//把状态信息保存在Stat对象中 storingStatIn(xx)

   public void getAbout() throws Exception {

   Stat stat = new Stat();

   client.getData()

   .storingStatIn(stat)

   .forPath("/test1");

   log.info(stat.toString());

   @After

   //在create执行后执行

   public void close(){

   if (client!=null){

   client.close();

  

 

  

package com.zko0;

 

  import lombok.extern.slf4j.Slf4j;

  import org.apache.curator.RetryPolicy;

  import org.apache.curator.framework.CuratorFramework;

  import org.apache.curator.framework.CuratorFrameworkFactory;

  import org.apache.curator.retry.ExponentialBackoffRetry;

  import org.apache.zookeeper.data.Stat;

  import org.junit.After;

  import org.junit.Before;

  import org.junit.Test;

  @Slf4j

  public class EditNodeTest {

   private CuratorFramework client;

   @Before

   //在create前执行

   public void connect(){

   //重试机制

   RetryPolicy retryPolic=new ExponentialBackoffRetry(3000,10);

   client = CuratorFrameworkFactory.builder()

   .connectString("101.43.244.40")

   .retryPolicy(retryPolic)

   // zookeeper根目录为/zko0,不为/

   .namespace("zko0")

   .build();

   client.start();

   @Test

   //修改节点数据内容

   public void edit() throws Exception {

   client.setData()

   .forPath("/test1","fuckU".getBytes());

   @Test

   //乐观锁,如果edit的时候version有修改了不匹配,那么setData会失败

   public void editWithVersion() throws Exception {

   Stat stat = new Stat();

   client.getData()

   .storingStatIn(stat)

   .forPath("/test1");

   client.setData()

   .withVersion(stat.getVersion())

   .forPath("/test1","fuckMe".getBytes());

   @After

   //在create执行后执行

   public void close(){

   if (client!=null){

   client.close();

  

 

  

package com.zko0;

 

  import lombok.extern.slf4j.Slf4j;

  import org.apache.curator.RetryPolicy;

  import org.apache.curator.framework.CuratorFramework;

  import org.apache.curator.framework.CuratorFrameworkFactory;

  import org.apache.curator.framework.api.BackgroundCallback;

  import org.apache.curator.framework.api.CuratorEvent;

  import org.apache.curator.retry.ExponentialBackoffRetry;

  import org.junit.After;

  import org.junit.Before;

  import org.junit.Test;

  @Slf4j

  public class DeleteNodeTest {

   private CuratorFramework client;

   @Before

   //在create前执行

   public void connect(){

   //重试机制

   RetryPolicy retryPolic=new ExponentialBackoffRetry(3000,10);

   client = CuratorFrameworkFactory.builder()

   .connectString("101.43.244.40")

   .retryPolicy(retryPolic)

   // zookeeper根目录为/zko0,不为/

   .namespace("zko0")

   .build();

   client.start();

   @Test

   //删除单个节点

   public void deleteOne() throws Exception {

   client.delete()

   .forPath("/test1");

   @Test

   //删除带有子节点的节点

   public void deleteOnehasChildren() throws Exception {

   client.delete()

   .deletingChildrenIfNeeded()

   .forPath("/directory");

   //必须成功的删除,如果失败会反复重试

   @Test

   public void deleteMustSucc() throws Exception {

   client.delete()

   .guaranteed()//必须的

   .forPath("/test2");

   @Test

   //删除回调

   public void callBack() throws Exception {

   client.delete()

   .guaranteed()//一般都会加上

   .inBackground((client,event)- {

   //client和evnet都可以进行操作

   log.info("删除Ok");

   .forPath("/test1");

   @After

   //在create执行后执行

   public void close(){

   if (client!=null){

   client.close();

  

 

  6)Watch事件监听

  zookeeper提供三种不同的Watcher:

  NodeCache:只监听某一个节点

  PathChildrenCache:监听一个Znode的子节点

  TreeCache:监听树上的所有节点

  1.NodeCache

  

@Slf4j

 

  public class NodeCacheTest {

   private CuratorFramework client;

   @Before

   //在create前执行

   public void connect(){

   //重试机制

   RetryPolicy retryPolic=new ExponentialBackoffRetry(3000,10);

   client = CuratorFrameworkFactory.builder()

   .connectString("101.43.244.40")

   .retryPolicy(retryPolic)

   // zookeeper根目录为/zko0,不为/

   .namespace("zko0")

   .build();

   client.start();

   @Test

   * 写法1.通过addListener将new CuratorCacheListener() 加入,对于create,update,delete等事件

   * 都能进行监听

   public void testNodeCache() throws Exception {

   CuratorCache curatorCache = CuratorCache.build(client, "/test1");

   curatorCache.listenable().addListener(new CuratorCacheListener() {

   @Override

   public void event(Type type, ChildData beforeData, ChildData afterData) {

   // 第一个参数:事件类型(枚举)

   // 第二个参数:节点更新前的状态、数据

   // 第三个参数:节点更新后的状态、数据

   // 创建节点时:节点刚被创建,不存在 更新前节点 ,所以第二个参数为 null

   // 删除节点时:节点被删除,不存在 更新后节点 ,所以第三个参数为 null

   // 节点创建时没有赋予值 create /curator/app1 只创建节点,在这种情况下,更新前节点的 data 为 null,获取不到更新前节点的数据

   switch (type.name()) {

   case "NODE_CREATED": // 监听器第一次执行时节点存在也会触发次事件

   if (afterData != null) {

   System.out.println("创建了节点: " + afterData.getPath());

   break;

   case "NODE_CHANGED": // 节点更新

   if (beforeData.getData() != null) {

   System.out.println("修改前的数据: " + new String(beforeData.getData()));

   } else {

   System.out.println("节点第一次赋值!");

   System.out.println("修改后的数据: " + new String(afterData.getData()));

   break;

   case "NODE_DELETED": // 节点删除

   System.out.println(beforeData.getPath() + " 节点已删除");

   break;

   default:

   break;

   // 开启监听

   curatorCache.start();

   // 线程阻塞防止停止

   while (true){}

  
CuratorCache cache = CuratorCache.build(client,"/test1");

   CuratorCacheListener listener = CuratorCacheListener.builder()

   .forCreates(node - System.out.println(String.format("Node created: [%s]", node)))

   .forChanges((oldNode, node) - System.out.println(String.format("Node changed. Old: [%s] New: [%s]", oldNode, node)))

   .forDeletes(oldNode - System.out.println(String.format("Node deleted. Old value: [%s]", oldNode)))

   .forInitialized(() - System.out.println("Cache initialized"))

   .build();

   // register the listener

   cache.listenable().addListener(listener);

   // the cache must be started

   cache.start();

   while (true){}

   @After

   //在create执行后执行

   public void close(){

   if (client!=null){

   client.close();

  

 

  2.PathChildrenCache

  在5.1之后,new PathChildrenCache显示为废弃方法

  如果有更好的方法麻烦教授

  

package com.zko0.watcher;

 

  import lombok.extern.slf4j.Slf4j;

  import org.apache.curator.RetryPolicy;

  import org.apache.curator.framework.CuratorFramework;

  import org.apache.curator.framework.CuratorFrameworkFactory;

  import org.apache.curator.framework.recipes.cache.PathChildrenCache;

  import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;

  import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;

  import org.apache.curator.retry.ExponentialBackoffRetry;

  import org.apache.curator.utils.ZKPaths;

  import org.junit.After;

  import org.junit.Before;

  import org.junit.Test;

  @Slf4j

  public class PathChildrenCacheTest {

   private CuratorFramework client;

   @Before

   //在create前执行

   public void connect(){

   //重试机制

   RetryPolicy retryPolic=new ExponentialBackoffRetry(3000,10);

   client = CuratorFrameworkFactory.builder()

   .connectString("101.43.244.40")

   .retryPolicy(retryPolic)

   // zookeeper根目录为/zko0,不为/

   .namespace("zko0")

   .build();

   client.start();

  
//true表示是否缓存data信息

   PathChildrenCache cache = new PathChildrenCache(client, "/test2", true);

   PathChildrenCacheListener listener = new PathChildrenCacheListener(){

   @Override

   public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent event) throws Exception {

   switch ( event.getType() )

   case CHILD_ADDED:

   System.out.println("Node added: " + ZKPaths.getNodeFromPath(event.getData().getPath()));

   break;

   case CHILD_UPDATED:

   System.out.println("Node changed: " + ZKPaths.getNodeFromPath(event.getData().getPath()));

   break;

   case CHILD_REMOVED:

   System.out.println("Node removed: " + ZKPaths.getNodeFromPath(event.getData().getPath()));

   break;

   cache.getListenable().addListener(listener);

   cache.start();

   while (true){}

   @After

   //在create执行后执行

   public void close(){

   if (client!=null){

   client.close();

  

 

  3.TreeCache

  监听树上所有节点的变化情况

  

package com.zko0.watcher;

 

  import org.apache.curator.RetryPolicy;

  import org.apache.curator.framework.CuratorFramework;

  import org.apache.curator.framework.CuratorFrameworkFactory;

  import org.apache.curator.framework.recipes.cache.TreeCache;

  import org.apache.curator.retry.ExponentialBackoffRetry;

  import org.junit.After;

  import org.junit.Before;

  import org.junit.Test;

  public class TreeCacheTest {

   private CuratorFramework client;

   @Before

   //在create前执行

   public void connect(){

   //重试机制

   RetryPolicy retryPolic=new ExponentialBackoffRetry(3000,10);

   client = CuratorFrameworkFactory.builder()

   .connectString("101.43.244.40")

   .retryPolicy(retryPolic)

   // zookeeper根目录为/zko0,不为/

   .namespace("zko0")

   .build();

   client.start();

   @Test

   public void test() throws Exception {

   TreeCache cache = TreeCache.newBuilder(client, "/test3").setCacheData(true).build();

   cache.getListenable().addListener((c, event) - {

   if ( event.getData() != null )

   System.out.println("type=" + event.getType() + " path=" + event.getData().getPath());

   else

   System.out.println("type=" + event.getType());

   cache.start();

   while (true){}

   @After

   //在create执行后执行

   public void close(){

   if (client!=null){

   client.close();

  

 

  5.Zookeeper分布式锁

  1)zookeeper分布式锁原理:

  思想:获取锁时,创建节点,使用完,删除节点

  流程:

  在lock节点下创建临时且顺序节点(类似Redis,防止宕机时其他client无法获取)

  然后获取lock下所有的子节点,获取到后,如果发现自己的子节点最小,认为客户端获取到了锁。使用完后,删除该节点

  如果发现自己创建的节点并非lock中所有子节点最小的,说明没有获取到锁,客户端需要找到比自己小(前一个)的那个节点,注册监听删除事件

  如果发现比自己小的那个节点被删了,客户端Watcher收到通知,再判断自己是不是最小的(避免宕机使中间节点删除)。如果是,获取锁,否则继续监听前一个小的节点。

  2)Curator分布式锁Api

  在Curator中有五种锁:

  InterProcessSemaphoreMutex:分布式排他锁

  InterProcessMutes:分布式可重入锁

  InterProcessReadWriteLock:分布式读写锁

  InterProcessMulitiLock:将多个锁作为单个实体管理的容器

  InterProcessSemaphoreV2:共享信号量

  可重入锁演示InterProcessMutes:

  

package com.zko0.lock;

 

  
import org.apache.curator.RetryPolicy;

  import org.apache.curator.framework.CuratorFramework;

  import org.apache.curator.framework.CuratorFrameworkFactory;

  import org.apache.curator.framework.recipes.locks.InterProcessLock;

  import org.apache.curator.framework.recipes.locks.InterProcessMutex;

  import org.apache.curator.retry.ExponentialBackoffRetry;

  import java.util.concurrent.TimeUnit;

  public class LockTest implements Runnable{

   public LockTest() {

   //创建client

   RetryPolicy retryPolic=new ExponentialBackoffRetry(3000,10);

   CuratorFramework client = CuratorFrameworkFactory.builder()

   .connectString("101.43.244.40")

   .retryPolicy(retryPolic)

   // zookeeper根目录为/zko0,不为/

   .namespace("zko0")

   .build();

   client.start();

   this.lock=new InterProcessMutex(client,"/lock");

   public static void main(String[] args) {

   LockTest lockTest = new LockTest();

   Thread t1=new Thread(lockTest,"test1");

   Thread t2=new Thread(lockTest,"test2");

   t1.start();

   t2.start();

   private Integer num=10;//对该变量做锁

   private InterProcessLock lock;

   @Override

   public void run() {

   while (true){

   //acquire为等待时间

   try {

   lock.acquire(3, TimeUnit.SECONDS);

   if (num 0){

   System.out.println(Thread.currentThread()+""+num);

   num--;

   } catch (Exception e) {

   throw new RuntimeException(e);

   }finally {

   try {

   lock.release();

   } catch (Exception e) {

   throw new RuntimeException(e);

  

 

  Zookeeper客户端通信端口:默认2181

  Zookeeper服务器通信端口:默认2881

  Zookeeper投票端口:默认3881

  Leader选举策略:

  Serverid:服务器ID 比如有三台服务器,编号分别是1,2,3。编号越大代表在选择算法中,它的权重就越大。

  Zxid:数据ID 服务器中存在的最大数据ID,值越大说明数据越新,在选举算法中数据越新,权重就越大。

  在Leader选举的过程中,如果某台Zookeeper获票超过半数,那么就可以成为Leader;

  
在每个zookeeper的data目录下创建myid文件,内容分别是1,2,3。这个文件记录每个服务的id。

  

echo 1 /zoo/z1/data/myid

 

  echo 2 /zoo/z2/data/myid

  echo 3 /zoo/z3/data/myid

  

 

  
配置端口(三台的zoo.cfg)

  此步骤不仅配置了,需要连接的其他节点的端口信息。而且还配置了自己的服务器通信端口和投票端口。

  

server.1=127.0.0.1:2881:3881

 

  server.2=127.0.0.1:2882:3882

  server.3=127.0.0.1:2883:3883

  

 

  
3号挂了,1好follower正常,2号leader正常

  1号也挂了,2号leader不运行,集群处于休眠状态

  

[root@VM-24-4-centos zoo]# z2/bin/zkServer.sh status

 

  ZooKeeper JMX enabled by default

  Using config: /zoo/z2/bin/../conf/zoo.cfg

  Client port found: 2182. Client address: localhost. Client SSL: false.

  Error contacting service. It is probably not running.

  

 

  


[root@VM-24-4-centos zoo]# z3/bin/zkServer.sh status

 

  ZooKeeper JMX enabled by default

  Using config: /zoo/z3/bin/../conf/zoo.cfg

  Client port found: 2183. Client address: localhost. Client SSL: false.

  Mode: leader

  

 

  
!-- https://mvnrepository.com/artifact/org.apache.curator/curator-recipes --

   dependency

   groupId org.apache.curator /groupId

   artifactId curator-recipes /artifactId

   version 5.2.0 /version

   exclusions

   exclusion

   groupId org.slf4j /groupId

   artifactId slf4j-api /artifactId

   /exclusion

   /exclusions

   /dependency

   dependency

   groupId org.slf4j /groupId

   artifactId slf4j-log4j12 /artifactId

   version 1.7.28 /version

   /dependency

   /dependencies

  

 

 

  以上就是Zookeeper学习笔记(zookeeper入门书籍)的详细内容,想要了解更多 Zookeeper学习笔记的内容,请持续关注盛行IT软件开发工作室。

郑重声明:本文由网友发布,不代表盛行IT的观点,版权归原作者所有,仅为传播更多信息之目的,如有侵权请联系,我们将第一时间修改或删除,多谢。

留言与评论(共有 条评论)
   
验证码: