本篇文章为你整理了Zookeeper学习笔记(zookeeper入门书籍)的详细内容,包含有zookeeper教程 zookeeper入门书籍 zookeeper知识点 zookeeper入门初体验第一关 Zookeeper学习笔记,希望能帮助你了解 Zookeeper学习笔记。
Zookeeper,为分布式框架提供协调服务,基于观察者模式。
负责存储管理大家关心的数据,接受观察者的注册,当数据状态发生变化,Zookeeper负责同志在Zookeeper上注册的观察者。
Zookeeper=文件系统+监听机制
一个Leader,多个Follower组成为集群
集群存活节点超过1/2,就能正常服务(适合安装奇数台服务器)
全局数据一致
同一个Client更新请求按顺序依次执行
数据更新原子性
实时性,Client能读到最新的数据
3)数据结构
Zookeeper数据模型类似Unix的文件系统,每个节点叫做ZNode。
每个ZNode能存储1MB数据(存储数据量小),能通过路径唯一标识
4)应用场景
分布式环境,需要对服务进行统一的命名,便于识别。
例:通过域名,分发到不同IP的服务器上(这点类似于Nginx),当一台挂了,其他的还能工作
#创建test2节点,会自动对节点加上编号
#比如此节点可能创建为test20000000013,为创建的第13个节点,所有节点共用同一个编号
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.junit.Test;
* 创建client连接
public class ConnectTest {
@Test
public void method1(){
//重试机制
RetryPolicy retryPolic=new ExponentialBackoffRetry(3000,10);
//客户端连接建立
CuratorFramework client = CuratorFrameworkFactory.newClient("101.43.244.40", retryPolic);
//开启连接
client.start();
@Test
public void method2(){
//重试机制
RetryPolicy retryPolic=new ExponentialBackoffRetry(3000,10);
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString("101.43.244.40")
.retryPolicy(retryPolic)
// zookeeper根目录为/zko0,不为/
.namespace("zko0")
.build();
client.start();
package com.zko0;
import lombok.extern.slf4j.Slf4j;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
* 创建节点的测试类
@Slf4j
public class CreateNodeTest {
private CuratorFramework client;
@Before
//在create前执行
public void connect(){
//重试机制
RetryPolicy retryPolic=new ExponentialBackoffRetry(3000,10);
client = CuratorFrameworkFactory.builder()
.connectString("101.43.244.40")
.retryPolicy(retryPolic)
// zookeeper根目录为/zko0,不为/
.namespace("zko0")
.build();
client.start();
//如果没有指定数据,那么会将客户端的ip作为存储的数据
public void basicCreate() throws Exception {
String path = client.create().forPath("/test1");
log.info(path);
@Test
//2.创建,同时添加数据
public void createWithMessage() throws Exception {
String path = client.create().forPath("/test2","fuck".getBytes());
log.info(path);
@Test
//3.创建临时节点
//Client会话结束节点会消息,所以创建完就消失了
public void createEphemeral() throws Exception {
client.create()
.withMode(CreateMode.EPHEMERAL)
.forPath("/test3");
//creatingParentsIfNeeded如果需要创建多级节点
public void create2() throws Exception {
client.create()
.creatingParentsIfNeeded()
.forPath("/directory/test5");
@After
//在create执行后执行
public void close(){
if (client!=null){
client.close();
package com.zko0;
import lombok.extern.slf4j.Slf4j;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.data.Stat;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.util.List;
@Slf4j
public class CheckNodeTest {
private CuratorFramework client;
@Before
//在create前执行
public void connect(){
//重试机制
RetryPolicy retryPolic=new ExponentialBackoffRetry(3000,10);
client = CuratorFrameworkFactory.builder()
.connectString("101.43.244.40")
.retryPolicy(retryPolic)
// zookeeper根目录为/zko0,不为/
.namespace("zko0")
.build();
client.start();
@Test
//get方法
public void get() throws Exception {
byte[] bytes = client.getData().forPath("/test1");
log.info(new String(bytes));
public void getChildren() throws Exception {
List String list = client.getChildren().forPath("/directory");
log.info(list.toString());
//把状态信息保存在Stat对象中 storingStatIn(xx)
public void getAbout() throws Exception {
Stat stat = new Stat();
client.getData()
.storingStatIn(stat)
.forPath("/test1");
log.info(stat.toString());
@After
//在create执行后执行
public void close(){
if (client!=null){
client.close();
package com.zko0;
import lombok.extern.slf4j.Slf4j;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.data.Stat;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@Slf4j
public class EditNodeTest {
private CuratorFramework client;
@Before
//在create前执行
public void connect(){
//重试机制
RetryPolicy retryPolic=new ExponentialBackoffRetry(3000,10);
client = CuratorFrameworkFactory.builder()
.connectString("101.43.244.40")
.retryPolicy(retryPolic)
// zookeeper根目录为/zko0,不为/
.namespace("zko0")
.build();
client.start();
@Test
//修改节点数据内容
public void edit() throws Exception {
client.setData()
.forPath("/test1","fuckU".getBytes());
@Test
//乐观锁,如果edit的时候version有修改了不匹配,那么setData会失败
public void editWithVersion() throws Exception {
Stat stat = new Stat();
client.getData()
.storingStatIn(stat)
.forPath("/test1");
client.setData()
.withVersion(stat.getVersion())
.forPath("/test1","fuckMe".getBytes());
@After
//在create执行后执行
public void close(){
if (client!=null){
client.close();
package com.zko0;
import lombok.extern.slf4j.Slf4j;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@Slf4j
public class DeleteNodeTest {
private CuratorFramework client;
@Before
//在create前执行
public void connect(){
//重试机制
RetryPolicy retryPolic=new ExponentialBackoffRetry(3000,10);
client = CuratorFrameworkFactory.builder()
.connectString("101.43.244.40")
.retryPolicy(retryPolic)
// zookeeper根目录为/zko0,不为/
.namespace("zko0")
.build();
client.start();
@Test
//删除单个节点
public void deleteOne() throws Exception {
client.delete()
.forPath("/test1");
@Test
//删除带有子节点的节点
public void deleteOnehasChildren() throws Exception {
client.delete()
.deletingChildrenIfNeeded()
.forPath("/directory");
//必须成功的删除,如果失败会反复重试
@Test
public void deleteMustSucc() throws Exception {
client.delete()
.guaranteed()//必须的
.forPath("/test2");
@Test
//删除回调
public void callBack() throws Exception {
client.delete()
.guaranteed()//一般都会加上
.inBackground((client,event)- {
//client和evnet都可以进行操作
log.info("删除Ok");
.forPath("/test1");
@After
//在create执行后执行
public void close(){
if (client!=null){
client.close();
6)Watch事件监听
zookeeper提供三种不同的Watcher:
NodeCache:只监听某一个节点
PathChildrenCache:监听一个Znode的子节点
TreeCache:监听树上的所有节点
1.NodeCache
@Slf4j
public class NodeCacheTest {
private CuratorFramework client;
@Before
//在create前执行
public void connect(){
//重试机制
RetryPolicy retryPolic=new ExponentialBackoffRetry(3000,10);
client = CuratorFrameworkFactory.builder()
.connectString("101.43.244.40")
.retryPolicy(retryPolic)
// zookeeper根目录为/zko0,不为/
.namespace("zko0")
.build();
client.start();
@Test
* 写法1.通过addListener将new CuratorCacheListener() 加入,对于create,update,delete等事件
* 都能进行监听
public void testNodeCache() throws Exception {
CuratorCache curatorCache = CuratorCache.build(client, "/test1");
curatorCache.listenable().addListener(new CuratorCacheListener() {
@Override
public void event(Type type, ChildData beforeData, ChildData afterData) {
// 第一个参数:事件类型(枚举)
// 第二个参数:节点更新前的状态、数据
// 第三个参数:节点更新后的状态、数据
// 创建节点时:节点刚被创建,不存在 更新前节点 ,所以第二个参数为 null
// 删除节点时:节点被删除,不存在 更新后节点 ,所以第三个参数为 null
// 节点创建时没有赋予值 create /curator/app1 只创建节点,在这种情况下,更新前节点的 data 为 null,获取不到更新前节点的数据
switch (type.name()) {
case "NODE_CREATED": // 监听器第一次执行时节点存在也会触发次事件
if (afterData != null) {
System.out.println("创建了节点: " + afterData.getPath());
break;
case "NODE_CHANGED": // 节点更新
if (beforeData.getData() != null) {
System.out.println("修改前的数据: " + new String(beforeData.getData()));
} else {
System.out.println("节点第一次赋值!");
System.out.println("修改后的数据: " + new String(afterData.getData()));
break;
case "NODE_DELETED": // 节点删除
System.out.println(beforeData.getPath() + " 节点已删除");
break;
default:
break;
// 开启监听
curatorCache.start();
// 线程阻塞防止停止
while (true){}
CuratorCache cache = CuratorCache.build(client,"/test1");
CuratorCacheListener listener = CuratorCacheListener.builder()
.forCreates(node - System.out.println(String.format("Node created: [%s]", node)))
.forChanges((oldNode, node) - System.out.println(String.format("Node changed. Old: [%s] New: [%s]", oldNode, node)))
.forDeletes(oldNode - System.out.println(String.format("Node deleted. Old value: [%s]", oldNode)))
.forInitialized(() - System.out.println("Cache initialized"))
.build();
// register the listener
cache.listenable().addListener(listener);
// the cache must be started
cache.start();
while (true){}
@After
//在create执行后执行
public void close(){
if (client!=null){
client.close();
2.PathChildrenCache
在5.1之后,new PathChildrenCache显示为废弃方法
如果有更好的方法麻烦教授
package com.zko0.watcher;
import lombok.extern.slf4j.Slf4j;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.utils.ZKPaths;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@Slf4j
public class PathChildrenCacheTest {
private CuratorFramework client;
@Before
//在create前执行
public void connect(){
//重试机制
RetryPolicy retryPolic=new ExponentialBackoffRetry(3000,10);
client = CuratorFrameworkFactory.builder()
.connectString("101.43.244.40")
.retryPolicy(retryPolic)
// zookeeper根目录为/zko0,不为/
.namespace("zko0")
.build();
client.start();
//true表示是否缓存data信息
PathChildrenCache cache = new PathChildrenCache(client, "/test2", true);
PathChildrenCacheListener listener = new PathChildrenCacheListener(){
@Override
public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent event) throws Exception {
switch ( event.getType() )
case CHILD_ADDED:
System.out.println("Node added: " + ZKPaths.getNodeFromPath(event.getData().getPath()));
break;
case CHILD_UPDATED:
System.out.println("Node changed: " + ZKPaths.getNodeFromPath(event.getData().getPath()));
break;
case CHILD_REMOVED:
System.out.println("Node removed: " + ZKPaths.getNodeFromPath(event.getData().getPath()));
break;
cache.getListenable().addListener(listener);
cache.start();
while (true){}
@After
//在create执行后执行
public void close(){
if (client!=null){
client.close();
3.TreeCache
监听树上所有节点的变化情况
package com.zko0.watcher;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.TreeCache;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
public class TreeCacheTest {
private CuratorFramework client;
@Before
//在create前执行
public void connect(){
//重试机制
RetryPolicy retryPolic=new ExponentialBackoffRetry(3000,10);
client = CuratorFrameworkFactory.builder()
.connectString("101.43.244.40")
.retryPolicy(retryPolic)
// zookeeper根目录为/zko0,不为/
.namespace("zko0")
.build();
client.start();
@Test
public void test() throws Exception {
TreeCache cache = TreeCache.newBuilder(client, "/test3").setCacheData(true).build();
cache.getListenable().addListener((c, event) - {
if ( event.getData() != null )
System.out.println("type=" + event.getType() + " path=" + event.getData().getPath());
else
System.out.println("type=" + event.getType());
cache.start();
while (true){}
@After
//在create执行后执行
public void close(){
if (client!=null){
client.close();
5.Zookeeper分布式锁
1)zookeeper分布式锁原理:
思想:获取锁时,创建节点,使用完,删除节点
流程:
在lock节点下创建临时且顺序节点(类似Redis,防止宕机时其他client无法获取)
然后获取lock下所有的子节点,获取到后,如果发现自己的子节点最小,认为客户端获取到了锁。使用完后,删除该节点
如果发现自己创建的节点并非lock中所有子节点最小的,说明没有获取到锁,客户端需要找到比自己小(前一个)的那个节点,注册监听删除事件
如果发现比自己小的那个节点被删了,客户端Watcher收到通知,再判断自己是不是最小的(避免宕机使中间节点删除)。如果是,获取锁,否则继续监听前一个小的节点。
2)Curator分布式锁Api
在Curator中有五种锁:
InterProcessSemaphoreMutex:分布式排他锁
InterProcessMutes:分布式可重入锁
InterProcessReadWriteLock:分布式读写锁
InterProcessMulitiLock:将多个锁作为单个实体管理的容器
InterProcessSemaphoreV2:共享信号量
可重入锁演示InterProcessMutes:
package com.zko0.lock;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessLock;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;
import java.util.concurrent.TimeUnit;
public class LockTest implements Runnable{
public LockTest() {
//创建client
RetryPolicy retryPolic=new ExponentialBackoffRetry(3000,10);
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString("101.43.244.40")
.retryPolicy(retryPolic)
// zookeeper根目录为/zko0,不为/
.namespace("zko0")
.build();
client.start();
this.lock=new InterProcessMutex(client,"/lock");
public static void main(String[] args) {
LockTest lockTest = new LockTest();
Thread t1=new Thread(lockTest,"test1");
Thread t2=new Thread(lockTest,"test2");
t1.start();
t2.start();
private Integer num=10;//对该变量做锁
private InterProcessLock lock;
@Override
public void run() {
while (true){
//acquire为等待时间
try {
lock.acquire(3, TimeUnit.SECONDS);
if (num 0){
System.out.println(Thread.currentThread()+""+num);
num--;
} catch (Exception e) {
throw new RuntimeException(e);
}finally {
try {
lock.release();
} catch (Exception e) {
throw new RuntimeException(e);
Zookeeper客户端通信端口:默认2181
Zookeeper服务器通信端口:默认2881
Zookeeper投票端口:默认3881
Leader选举策略:
Serverid:服务器ID 比如有三台服务器,编号分别是1,2,3。编号越大代表在选择算法中,它的权重就越大。
Zxid:数据ID 服务器中存在的最大数据ID,值越大说明数据越新,在选举算法中数据越新,权重就越大。
在Leader选举的过程中,如果某台Zookeeper获票超过半数,那么就可以成为Leader;
在每个zookeeper的data目录下创建myid文件,内容分别是1,2,3。这个文件记录每个服务的id。
echo 1 /zoo/z1/data/myid
echo 2 /zoo/z2/data/myid
echo 3 /zoo/z3/data/myid
配置端口(三台的zoo.cfg)
此步骤不仅配置了,需要连接的其他节点的端口信息。而且还配置了自己的服务器通信端口和投票端口。
server.1=127.0.0.1:2881:3881
server.2=127.0.0.1:2882:3882
server.3=127.0.0.1:2883:3883
3号挂了,1好follower正常,2号leader正常
1号也挂了,2号leader不运行,集群处于休眠状态
[root@VM-24-4-centos zoo]# z2/bin/zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /zoo/z2/bin/../conf/zoo.cfg
Client port found: 2182. Client address: localhost. Client SSL: false.
Error contacting service. It is probably not running.
[root@VM-24-4-centos zoo]# z3/bin/zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /zoo/z3/bin/../conf/zoo.cfg
Client port found: 2183. Client address: localhost. Client SSL: false.
Mode: leader
!-- https://mvnrepository.com/artifact/org.apache.curator/curator-recipes --
dependency
groupId org.apache.curator /groupId
artifactId curator-recipes /artifactId
version 5.2.0 /version
exclusions
exclusion
groupId org.slf4j /groupId
artifactId slf4j-api /artifactId
/exclusion
/exclusions
/dependency
dependency
groupId org.slf4j /groupId
artifactId slf4j-log4j12 /artifactId
version 1.7.28 /version
/dependency
/dependencies
以上就是Zookeeper学习笔记(zookeeper入门书籍)的详细内容,想要了解更多 Zookeeper学习笔记的内容,请持续关注盛行IT软件开发工作室。
郑重声明:本文由网友发布,不代表盛行IT的观点,版权归原作者所有,仅为传播更多信息之目的,如有侵权请联系,我们将第一时间修改或删除,多谢。