ZooKeeper 分布式协调服务

发布于 2025-02-16 18:44:29 字数 17508 浏览 5 评论 0

Zookeeper 概述

ZooKeeper 是一种分布式协调服务,用于管理大型主机。在分布式环境中协调和管理服务是一个复杂的过程。ZooKeeper 通过其简单的架构和 API 解决了这个问题。ZooKeeper 允许开发人员专注于核心应用程序逻辑,而不必担心应用程序的分布式特性。

分布式应用的优点

  • 可靠性 - 单个或几个系统的故障不会使整个系统出现故障。
  • 可扩展性 - 可以在需要时增加性能,通过添加更多机器,在应用程序配置中进行微小的更改,而不会有停机时间。
  • 透明性 - 隐藏系统的复杂性,并将其显示为单个实体/应用程序。

分布式应用的挑战

  • 竞争条件 - 两个或多个机器尝试执行特定任务,实际上只需在任意给定时间由单个机器完成。例如,共享资源只能在任意给定时间由单个机器修改。
  • 死锁 - 两个或多个操作等待彼此无限期完成。
  • 不一致 - 数据的部分失败。

什么是 Apache ZooKeeper?

Apache ZooKeeper 是由集群(节点组)使用的一种服务,用于在自身之间协调,并通过稳健的同步技术维护共享数据。ZooKeeper 本身是一个分布式应用程序,为写入分布式应用程序提供服务。

ZooKeeper 提供的常见服务如下 :

  • 命名服务 - 按名称标识集群中的节点。它类似于 DNS,但仅对于节点。
  • 配置管理 - 加入节点的最近的和最新的系统配置信息。
  • 集群管理 - 实时地在集群和节点状态中加入/离开节点。
  • 选举算法 - 选举一个节点作为协调目的的 leader。
  • 锁定和同步服务 - 在修改数据的同时锁定数据。此机制可帮助你在连接其他分布式应用程序(如 Apache HBase)时进行自动故障恢复。
  • 高度可靠的数据注册表 - 即使在一个或几个节点关闭时也可以获得数据。

ZooKeeper 的好处

  • 简单的分布式协调过程
  • 同步 - 服务器进程之间的相互排斥和协作。此过程有助于 Apache HBase 进行配置管理。
  • 有序的消息
  • 序列化 - 根据特定规则对数据进行编码。确保应用程序运行一致。这种方法可以在 MapReduce 中用来协调队列以执行运行的线程。
  • 可靠性
  • 原子性 - 数据转移完全成功或完全失败,但没有事务是部分的。

ZooKeeper 的架构

输入图片说明

Client(客户端)

  • 客户端,我们的分布式应用集群中的一个节点,从服务器访问信息。对于特定的时间间隔,每个客户端向服务器发送消息以使服务器知道客户端是活跃的。类似地,当客户端连接时,服务器发送确认码。如果连接的服务器没有响应,客户端会自动将消息重定向到另一个服务器。

Server(服务器)

  • 服务器,我们的 ZooKeeper 总体中的一个节点,为客户端提供所有的服务。向客户端发送确认码以告知服务器是活跃的。

Ensemble

  • ZooKeeper 服务器组。形成 ensemble 所需的最小节点数为 3。

Leader:

  • 服务器节点,如果任何连接的节点失败,则执行自动恢复。Leader 在服务启动时被选举。

Follower

  • 跟随 leader 指令的服务器节点。

层次命名空间

ZooKeeper 节点称为 znode 。每个 znode 由一个名称标识,并用路径(/) 序列分隔。
输入图片说明

  • config 命名空间用于集中式配置管理,workers 命名空间用于命名。
  • 在 config 命名空间下,每个 znode 最多可存储 1MB 的数据。这与 UNIX 文件系统相类似,除了父 znode 也可以存储数据。这种结构的主要目的是存储同步数据并描述 znode 的元数据。此结构称为 ZooKeeper 数据模型。
  • ZooKeeper 数据模型中的每个 znode 都维护着一个 stat 结构。一个 stat 仅提供一个 znode 的元数据。它由版本号,操作控制列表(ACL),时间戳和数据长度组成。
  1. 版本号 - 每个 znode 都有版本号,这意味着每当与 znode 相关联的数据发生变化时,其对应的版本号也会增加。当多个 zookeeper 客户端尝试在同一 znode 上执行操作时,版本号的使用就很重要。
  2. 操作控制列表(ACL) - ACL 基本上是访问 znode 的认证机制。它管理所有 znode 读取和写入操作。
  3. 时间戳 - 时间戳表示创建和修改 znode 所经过的时间。它通常以毫秒为单位。ZooKeeper 从“事务 ID"(zxid) 标识 znode 的每个更改。Zxid 是唯一的,并且为每个事务保留时间,以便你可以轻松地确定从一个请求到另一个请求所经过的时间。
  4. 数据长度 - 存储在 znode 中的数据总量是数据长度。你最多可以存储 1MB 的数据。

Znode 的类型

Znode 被分为持久(persistent)节点,顺序(sequential)节点和临时(ephemeral)节点。

  • 持久节点 - 即使在创建该特定 znode 的客户端断开连接后,持久节点仍然存在。默认情况下,除非另有说明,否则所有 znode 都是持久的。
  • 临时节点 - 客户端活跃时,临时节点就是有效的。当客户端与 ZooKeeper 集合断开连接时,临时节点会自动删除。因此,只有临时节点不允许有子节点。如果临时节点被删除,则下一个合适的节点将填充其位置。临时节点在 leader 选举中起着重要作用。
  • 顺序节点 - 顺序节点可以是持久的或临时的。当一个新的 znode 被创建为一个顺序节点时,ZooKeeper 通过将 10 位的序列号附加到原始名称来设置 znode 的 路径。例如,如果将具有路径 /myapp 的 znode 创建为顺序节点,则 ZooKeeper 会将路径更改为 /myapp0000000001 ,并将下一个序列号设置为 0000000002。如果两个顺序节点是同时创建的,那么 ZooKeeper 不会对每个 znode 使用相同的数字。顺序节点在 锁定和同步中起重要作用。

Sessions(会话)

  • 会话对于 ZooKeeper 的操作非常重要。会话中的请求按 FIFO 顺序执行。一旦客户端连接到服务器,将建立会话并向客户端分配会话 ID 。
  • 客户端以特定的时间间隔发送心跳以保持会话有效。如果 ZooKeeper 集合在超过服务器开启时指定的期间(会话超时)都没有从客户端接收到心跳,则它会判定客户端死机。
  • 会话超时通常以毫秒为单位。当会话由于任何原因结束时,在该会话期间创建的临时节点也会被删除。

Watches(监视)

  • 监视是一种简单的机制,使客户端收到关于 ZooKeeper 集合中的更改的通知。客户端可以在读取特定 znode 时设置 Watches。Watches 会向注册的客户端发送任何 znode(客户端注册表)更改的通知。
  • Znode 更改是与 znode 相关的数据的修改或 znode 的子项中的更改。只触发一次 watches。如果客户端想要再次通知,则必须通过另一个读取操作来完成。当连接会话过期时,客户端将与服务器断开连接,相关的 watches 也将被删除。

Zookeeper 工作流

一旦 ZooKeeper 集合启动,它将等待客户端连接。客户端将连接到 ZooKeeper 集合中的一个节点。它可以是 leader 或 follower 节点。一旦客户端被连接,节点将向特定客户端分配会话 ID 并向该客户端发送确认。如果客户端没有收到确认,它将尝试连接 ZooKeeper 集合中的另一个节点。 一旦连接到节点,客户端将以有规律的间隔向节点发送心跳,以确保连接不会丢失。

  • 如果客户端想要读取特定的 znode,它将会向具有 znode 路径的节点发送读取请求,并且节点通过从其自己的数据库获取来返回所请求的 znode。为此,在 ZooKeeper 集合中读取速度很快。
  • 如果客户端想要将数据存储在 ZooKeeper 集合中,则会将 znode 路径和数据发送到服务器。连接的服务器将该请求转发给 leader,然后 leader 将向所有的 follower 重新发出写入请求。如果只有大部分节点成功响应,而写入请求成功,则成功返回代码将被发送到客户端。 否则,写入请求失败。绝大多数节点被称为 Quorum 。

ZooKeeper 集合中的节点

  • 如果我们有单个节点,则当该节点故障时,ZooKeeper 集合将故障。它有助于“单点故障",不建议在生产环境中使用。
  • 如果我们有两个节点而一个节点故障,我们没有占多数,因为两个中的一个不是多数。
  • 如果我们有三个节点而一个节点故障,那么我们有大多数,因此,这是最低要求。ZooKeeper 集合在实际生产环境中必须至少有三个节点。
  • 如果我们有四个节点而两个节点故障,它将再次故障。类似于有三个节点,额外节点不用于任何目的,因此,最好添加奇数的节点,例如 3,5,7。
  • 写入过程比 ZooKeeper 集合中的读取过程要贵,因为所有节点都需要在数据库中写入相同的数据

输入图片说明

写入(write)

  • 写入过程由 leader 节点处理。leader 将写入请求转发到所有 znode,并等待 znode 的回复。如果一半的 znode 回复,则写入过程完成。

读取(read)

  • 读取由特定连接的 znode 在内部执行,因此不需要与集群进行交互。

复制数据库(replicated database)

  • 它用于在 zookeeper 中存储数据。每个 znode 都有自己的数据库,每个 znode 在一致性的帮助下每次都有相同的数据。

Leader 负责处理写入请求的 Znode。

Follower:从客户端接收写入请求,并将它们转发到 leader znode。

请求处理器(request processor):只存在于 leader 节点。它管理来自 follower 节点的写入请求。

原子广播(atomic broadcasts):负责广播从 leader 节点到 follower 节点的变化。

Zookeeper leader 选举

分析如何在 ZooKeeper 集合中选举 leader 节点。考虑一个集群中有 N 个节点。leader 选举的过程如下:

  • 所有节点创建具有相同路径 /app/leader_election/guid_ 的顺序、临时节点。
  • ZooKeeper 集合将附加 10 位序列号到路径,创建的 znode 将是 /app/leader_election/guid_0000000001,/app/leader_election/guid_0000000002 等。
  • 对于给定的实例,在 znode 中创建最小数字的节点成为 leader,而所有其他节点是 follower。
  • 每个 follower 节点监视下一个具有最小数字的 znode。例如,创建 znode/app/leader_election/guid_0000000008 的节点将监视 znode/app/leader_election/guid_0000000007 ,创建 znode/app/leader_election/guid_0000000007 的节点将监视 znode/app/leader_election/guid_0000000006
  • 如果 leader 关闭,则其相应的 znode/app/leader_electionN 会被删除。
  • 下一个在线 follower 节点将通过监视器获得关于 leader 移除的通知。
  • 下一个在线 follower 节点将检查是否存在其他具有最小数字的 znode。如果没有,那么它将承担 leader 的角色。否则,它找到的创建具有最小数字的 znode 的节点将作为 leader。
  • 类似地,所有其他 follower 节点选举创建具有最小数字的 znode 的节点作为 leader。

Zookeeper 集群安装
安装:

wget http://www.apache.org/dist//zookeeper/zookeeper-3.3.3/zookeeper-3.3.3.tar.gz
tar zxvf zookeeper-3.3.3.tar.gz
cd zookeeper-3.3.3
cp conf/zoo_sample.cfg conf/zoo.cfg

zookeeper-3.4.10/conf 新建立 zoo.cfg,zoo2.cfg,zoo3.cfg 三个文件,配置如下,

#心跳间隔 毫秒每次
tickTime = 2000
##日志位置 伪集群设置不同目录
dataDir = /home/zookeeper-3.4.10/data/data1
#监听客户端连接的端口 伪集群设置不同端口
clientPort = 2181
#多少个心跳时间内,允许其他 server 连接并初始化数据,如果 ZooKeeper 管理的数据较大,则应相应增大这个值
initLimit = 10
#多少个 tickTime 内,允许 follower 同步,如果 follower 落后太多,则会被丢弃
syncLimit = 5

#伪集群配置 不需要集群去掉(vim /etc/host 映射 ip 的 hostname 的关系)
server.1=CentOS124:2886:3886
server.2=CentOS124:2888:3888
server.3=CentOS124:2889:3889

并在 zookeeper-3.4.10/datadata1,data2,data3 目录下放置 myid 文件,文件内容为 1,2,3

输入图片说明
输入图片说明

进入 bi 目录 启动

./zkServer.sh start zoo.cfg
./zkServer.sh start zoo2.cfg
./zkServer.sh start zoo3.cfg

查看服务状态

./zkServer.sh status zoo.cfg
./zkServer.sh status zoo2.cfg
./zkServer.sh status zoo3.cfg

使用 Zookeeper 的客户端来连接并测试了

[root@CentOS124 bin]# ./zkCli.sh

#查看根节点
[zk: localhost:2181(CONNECTED) 0] ls /
[firstNode, SecodeZnode, firstNode0000000002, hbase, zookeeper]
[zk: localhost:2181(CONNECTED) 0] create /mykey1 myvalue1  #创建一个新节点 mykey1 
Created /mykey1
[zk: localhost:2181(CONNECTED) 1] get /mykey1   #获取 mykey1 节点  

#要创建顺序节点
create -s /FirstZnode second-data

#要创建临时节点
create -e /SecondZnode “Ephemeral-data"

Zookeeper 客户端(Apache Curator)

ZooKeeper 常用客户端

  • zookeeper 自带的客户端是官方提供的,比较底层、使用起来写代码麻烦、不够直接。
  • Apache Curator 是 Apache 的开源项目,封装了 zookeeper 自带的客户端,使用相对简便,易于使用。
  • zkclient 是另一个开源的 ZooKeeper 客户端,其地址: https://github.com/adyliu/zkclient 生产环境不推荐使用。

Curator 主要解决了三类问题

  • 封装 ZooKeeper client 与 ZooKeeper server 之间的连接处理
  • 提供了一套 Fluent 风格的操作 API
  • 提供 ZooKeeper 各种应用场景(recipe, 比如共享锁服务,集群领导选举机制) 的抽象封装

Java 操作 api

package com.qxw.controller;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.commons.lang.StringUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;
/**
 *  Curator 主要解决了三类问题
	1.封装 ZooKeeper client 与 ZooKeeper server 之间的连接处理
	2.提供了一套 Fluent 风格的操作 API
	3.提供 ZooKeeper 各种应用场景(recipe, 比如共享锁服务,集群领导选举机制) 的抽象封装
 * @author qxw
 * @data 2018 年 8 月 14 日下午 2:08:51
 */
public class CuratorAp {
	/**
	* Curator 客户端
	*/
	public static CuratorFramework client = null;
	/**
	* 集群模式则是多个 ip
	*/
	// private static final String zkServerIps = "192.168.10.124:2182,192.168.10.124:2183,192.168.10.124:2184";
	private static final String zkServerIps = "127.0.0.1:2181";

	public static CuratorFramework getConnection(){
	if(client==null){
	synchronized (CuratorAp.class){
	if(client==null){
	//通过工程创建连接
	client= CuratorFrameworkFactory.builder()
	.connectString(zkServerIps)
	.connectionTimeoutMs(5000) ///连接超时时间
	.sessionTimeoutMs(5000)  // 设定会话时间
	.retryPolicy(new ExponentialBackoffRetry(1000, 10))   // 重试策略:初试时间为 1s 重试 10 次
	// .namespace("super")  // 设置命名空间以及开始建立连接
	.build();

	//开启连接
	client.start();
	//分布锁

	System.out.println(client.getState());
	}
	}
	}
	return client;
	}
	
	/**
	 * 创建节点   不加 withMode 默认为持久类型节点
	 * @param path  节点路径
	 * @param value  值
	 */
	public static String create(String path,String value){
		try {
			//若创建节点的父节点不存在会先创建父节点再创建子节点
			return getConnection().create().creatingParentsIfNeeded().forPath("/super"+path,value.getBytes());
		} catch (Exception e) {
			e.printStackTrace();
		}
		return null;
	}
	
	/**
	 * 创建节点 
	 * @param path  节点路径
	 * @param value  值
	 * @param modeType 节点类型
	 */
	public static String create(String path,String value,String modeType){
		try {
			if(StringUtils.isEmpty(modeType)){
				return null;
			}
			//持久型节点
			if(CreateMode.PERSISTENT.equals(modeType)){
				//若创建节点的父节点不存在会先创建父节点再创建子节点
				return getConnection().create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/super"+path,value.getBytes());
			}
			//临时节点
			if(CreateMode.EPHEMERAL.equals(modeType)){
				//若创建节点的父节点不存在会先创建父节点再创建子节点
				return getConnection().create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath("/super"+path,value.getBytes());
			}
			
			//持久类型顺序性节点
			if(CreateMode.PERSISTENT_SEQUENTIAL.equals(modeType)){
				//若创建节点的父节点不存在会先创建父节点再创建子节点
				return getConnection().create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT_SEQUENTIAL).forPath("/super"+path,value.getBytes());
			}
			
			//临时类型顺序性节点
			if(CreateMode.EPHEMERAL_SEQUENTIAL.equals(modeType)){
				//若创建节点的父节点不存在会先创建父节点再创建子节点
				return getConnection().create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath("/super"+path,value.getBytes());
			}
		} catch (Exception e) {
			e.printStackTrace();
		}
		return null;
	}

	/**
	 * 获取单个节点
	 * @param path
	 * @return
	 */
	public static String getData(String path){
		try {
			String str = new String(getConnection().getData().forPath("/super"+path));
			return str;
		} catch (Exception e) {
			e.printStackTrace();
		}
		return null;
	}

	/**
	 *获取字节点
	 * @param path
	 * @return
	 */
	public static List<String> getChildren(String path){
		try {
			List<String> list = getConnection().getChildren().forPath("/super"+path);
			return list;
		} catch (Exception e) {
			e.printStackTrace();
		}
		return null;
	}
	
	
	/**
	 * 修改节点值
	 * @param path
	 * @param valu
	 * @return
	 */
	public static String setData(String path,String valu){
		try {
			getConnection().setData().forPath("/super"+path,valu.getBytes());
			String str = new String(getConnection().getData().forPath("/super"+path));
			return str;
		} catch (Exception e) {
			e.printStackTrace();
		}
		return null;
	}

	/**
	 * 删除节点
	 * @param path
	 */
	public static void  delete(String path){
		try {
			getConnection().delete().guaranteed().deletingChildrenIfNeeded().forPath("/super"+path);
		} catch (Exception e) {
			e.printStackTrace();
		}
		
	}
	/**
	 * 检测节点是否存在
	 * @param path
	 * @return
	 */
	public static boolean  checkExists(String path){
		try {
			Stat s=getConnection().checkExists().forPath("/super"+path);
			return s==null? false:true;
		} catch (Exception e) {
			e.printStackTrace();
		}
		return false;
		
	}
	
	/**
	 * 分布式锁 对象
	 * @param path
	 * @return
	 */
	public static InterProcessMutex getLock(String path){
		InterProcessMutex lock=null;
		try {
			lock=new InterProcessMutex(getConnection(), "/super"+path);
			 return  lock;
		} catch (Exception e) {
			e.printStackTrace();
		}
		return null;
	}
	
	
	
	public static void main(String[] args) throws Exception {
//		if(checkExists("/qxw")){
//			delete("/qxw");
//		}
//		System.out.println("创建节点:"+create("/qxw/q1", "苏打水法萨芬撒"));
//		System.out.println("创建节点:"+create("/qxw/q2", "苏打水法萨芬撒"));
//		System.out.println("创建节点:"+create("/qxw/q3", "苏打水法萨芬撒"));
//		
//
//		
//		ExecutorService pool = Executors.newCachedThreadPool();
//		getConnection().create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).inBackground(new BackgroundCallback() {
//			public void processResult(CuratorFramework cf, CuratorEvent ce) throws Exception {
//				System.out.println("code:" + ce.getResultCode());
//				System.out.println("type:" + ce.getType());
//				System.out.println("线程为:" + Thread.currentThread().getName());
//			}
//		}, pool)
//		.forPath("/super/qxw/q4","q4 内容".getBytes());
//		
//		System.out.println("读取节点: "+getData("/qxw"));
//		System.out.println("读取字节点:"+getChildren("/qxw").toString());
		
		test();
		
	}
	
	/***
	 * 分布锁演示
	 */
	private static int count=0;
	public  static void test() throws InterruptedException{
		final InterProcessMutex lock=getLock("/lock");
		final CountDownLatch c=new CountDownLatch(10);
		ExecutorService pool = Executors.newCachedThreadPool();
		for (int i = 0; i <10; i++) {
			pool.execute(new Runnable() {
				public void run() {
					try {		
						c.countDown();
						Thread.sleep(1000);
						//加锁
						lock.acquire();
						System.out.println(System.currentTimeMillis()+"___"+(++count));
					} catch (Exception e) {
						e.printStackTrace();
					}finally{
						try {
							lock.release();
						} catch (Exception e) {
							e.printStackTrace();
						}
					}
					
				}
			});
		}
		pool.shutdown();
		c.await();
		System.out.println("CountDownLatch 执行完");
	}
}

zookeeper 集群的 监控图形化页面

https://gitee.com/crystony/zookeeper-web

如果你是 gradle 用户(2.0 以上),请直接执行以下命令运行项目:

gradle jettyRun

如果你没使用 gralde,执行项目跟路径下的脚本,linux/windows 用户执行

gradlew/gradlew.bat jettyRun

自动下载 gralde 完成后,会自动使用 jetty 启动项目

如果想将项目导入 IDE 调试,eclipse 用户执行

 gradlew/gradlew.bat eclipse

idea 用户执行

gradlew/gradlew.bat idea

zookeeper 分布式锁原理

分布式锁主要用于在分布式环境中保护跨进程、跨主机、跨网络的共享资源实现互斥访问,以达到保证数据的一致性。
输入图片说明

左边的整个区域表示一个 Zookeeper 集群,locker 是 Zookeeper 的一个持久节点,node_1、node_2、node_3 是 locker 这个持久节点下面的临时顺序节点。client_1、client_2、client_n 表示多个客户端,Service 表示需要互斥访问的 共享资源。

分布式锁获取思路

  1. 在获取分布式锁的时候在 locker 节点下创建临时顺序节点,释放锁的时候删除该临时节点。
  2. 客户端调用 createNode 方法在 locker 下创建临时顺序节点,然后调用 getChildren(“locker”) 来获取 locker 下面的所有子节点,注意此时不用设置任何 Watcher。
  3. 客户端获取到所有的子节点 path 之后,如果发现自己创建的子节点序号最小,那么就认为该客户端获取到了锁。
  4. 如果发现自己创建的节点并非 locker 所有子节点中最小的,说明自己还没有获取到锁,此时客户端需要找到比自己小的那个节点,然后对其调用 exist() 方法,同时对其注册事件监听器。
  5. 之后,等待它释放锁,也就是等待获取到锁的那个客户端 B 把自己创建的那个节点删除。,则客户端 A 的 Watcher 会收到相应通知,此时再次判断自 己创建的节点是否是 locker 子节点中序号最小的,如果是则获取到了锁,如果不是则重复以上步骤继续获取到比自己小的一个节点并注册监听。

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据

关于作者

顾忌

暂无简介

文章
评论
272 人气
更多

推荐作者

櫻之舞

文章 0 评论 0

弥枳

文章 0 评论 0

m2429

文章 0 评论 0

野却迷人

文章 0 评论 0

我怀念的。

文章 0 评论 0

    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
    原文