参考资料
ZooKeeper 是一个典型的发布/订阅模式的分布式数据管理与协调框架。
ZooKeeper is a centralized service for maintaining configuration information, naming, providing distributed synchronization, and providing group services.
ZooKeeper 解决了什么问题?
ZooKeepr 提供基于类似于文件系统的目录节点树方式的数据存储,这是一个共享的内存中的树型结构。有几个概念需要关注一下。
/xxxx
就是一个 Znode,会保存自己的数据内容和属性信息,分为持久和临时节点,节点有 SEQUENTIAL 属性/
├── /Apps
|
│ ├── /App1
|
│ └── /App2
|
│ ├── /SubApp1
│ └── /SubApp2
│
├── /Configuration
|
│ ├── /Mysql1
│ ├── /Mysql2
│ └── /Mysql3
│
├── /GroupMembers
|
│ ├── /Member1
│ └── /Member2
│
└── /NameService
|
├── /Server1
└── /Server2
利用 ZooKeeper 可以非常方便构建一系列分布式应用中都会涉及到的核心功能。
多个开源项目中都应用到了 ZooKeeper,例如 HBase, Spark, Flink, Storm, Kafka, Dubbo 等等。
数据发布/订阅的一个常见的场景是配置中心,发布者把数据发布到 ZooKeeper 的一个或一系列的节点上,供订阅者进行数据订阅,达到动态获取数据的目的。
配置信息一般有几个特点:
ZooKeeper 采用的是推拉结合的方式。
实现的思路可以如下。
mysql.driverClassName=com.mysql.jdbc.Driver
dbJDBCUrl=jdbc:mysql://127.0.0.1/runzhlliu
username=runzhliu
password=runzhliu
/Configuration
负载均衡是一种手段,用来把对某种资源的访问分摊给不同的设备,从而减轻单点的压力。
实现的思路:
命名服务就是提供名称的服务。ZooKeeper 的命名服务有两个应用方面。
利用 ZooKeeper 顺序节点的特性,制作分布式的序列号生成器,或者叫 id 生成器。(分布式环境下使用作为数据库 id,另外一种是 UUID(缺点:没有规律)),ZooKeeper 可以生成有顺序的容易理解的同时支持分布式环境的编号。
在创建节点时,如果设置节点是有序的,则 ZooKeeper 会自动在你的节点名后面加上序号,上面说容易理解,是比如说这样,你要获得订单的 id,你可以在创建节点时指定节点名为 order_[日期]_xxxxxx,这样一看就大概知道是什么时候的订单。
/
└── /order
├── /order-date1-000000000000001
├── /order-date2-000000000000002
├── /order-date3-000000000000003
├── /order-date4-000000000000004
└── /order-date5-000000000000005
一种典型的分布式系统机器间的通信方式是心跳。心跳检测是指分布式环境中,不同机器之间需要检测彼此是否正常运行。传统的方法是通过主机之间相互 PING 来实现,又或者是建立长连接,通过 TCP 连接固有的心跳检测机制来实现上层机器的心跳检测。
如果使用 ZooKeeper,可以基于其临时节点的特性,不同机器在 ZooKeeper 的一个指定节点下创建临时子节点,不同机器之间可以根据这个临时节点来判断客户端机器是否存活。
好处就是检测系统和被检系统不需要直接相关联,而是通过 ZooKeeper 节点来关联,大大减少系统的耦合。
集群管理主要指集群监控和集群控制两个方面。前者侧重于集群运行时的状态的收集,后者则是对集群进行操作与控制。开发和运维中,面对集群,经常有如下需求:
分布式集群管理体系中有一种传统的基于 Agent 的方式,就是在集群每台机器部署 Agent 来收集机器的 CPU、内存等指标。但是如果需要深入到业务状态进行监控,比如一个分布式消息中间件中,希望监控每个消费者对消息的消费状态,或者一个分布式任务调度系统中,需要对每个机器删的任务执行情况进行监控。对于这些业务紧密耦合的监控需求,统一的 Agent 是不太合适的。
利用 ZooKeeper 实现集群管理监控组件的思路:
在管理机器上线/下线的场景中,为了实现自动化的线上运维,我们必须对机器的上/下线情况有一个全局的监控。通常在新增机器的时候,需要首先将指定的 Agent 部署到这些机器上去。Agent 部署启动之后,会首先向 ZooKeeper 的指定节点进行注册,具体的做法就是在机器列表节点下面创建一个临时子节点,例如 /machine/[Hostname]
(下文我们以“主机节点”代表这个节点),如下图所示。
当 Agent 在 ZooKeeper 上创建完这个临时子节点后,对 /machines
节点关注的监控中心就会接收到“子节点变更”事件,即上线通知,于是就可以对这个新加入的机器开启相应的后台管理逻辑。另一方面,监控中心同样可以获取到机器下线的通知,这样便实现了对机器上/下线的检测,同时能够很容易的获取到在线的机器列表,对于大规模的扩容和容量评估都有很大的帮助。
分布式系统中 Master 是用来协调集群中其他系统单元,具有对分布式系统状态更改的决定权。比如一些读写分离的应用场景,客户端写请求往往是 Master 来处理的。
利用常见关系型数据库中的主键特性来实现也是可以的,集群中所有机器都向数据库中插入一条相同主键 ID 的记录,数据库会帮助我们自动进行主键冲突检查,可以保证只有一台机器能够成功。
但是有一个问题,如果插入成功的和护短机器成为 Master 后挂了的话,如何通知集群重新选举 Master?
利用 ZooKeeper 创建节点 API 接口,提供了强一致性,能够很好保证在分布式高并发情况下节点的创建一定是全局唯一性。
集群机器都尝试创建节点,创建成功的客户端机器就会成为 Master,失败的客户端机器就在该节点上注册一个 Watcher 用于监控当前 Master 机器是否存活,一旦发现 Master 挂了,其余客户端就可以进行选举了。
分布式锁是控制分布式系统之间同步访问共享资源的一种方式。如果不同系统或是同一个系统的不同主机之间共享了一个或一组资源,那么访问这些资源的时候,一般需要通过一些互斥的手段来防止彼此之间的干扰,以保证一致性。
如果事务 T1 对数据对象 O1 加上了排他锁,那么加锁期间,只允许事务 T1 对 O1 进行读取和更新操作。核心是保证当前有且仅有一个事务获得锁,并且锁释放后,所有正在等待获取锁的事务都能够被通知到。对于实现排他锁,简单地说就是多个客户端同时去竞争创建同一个临时子节点,ZooKeeper 能够保证只有一个客户端创建成功,那么这个创建成功的客户端就获得排他锁。正常情况下,这个客户端执行完业务逻辑会删除这个节点,也就是释放了锁。如果该客户端宕机了,那么这个临时节点会被自动删除,锁也会被释放。
通过 ZooKeeper 上的 Znode 可以表示一个锁,/x_lock/lock
。
create()
接口尝试在 /x_lock
创建临时子节点 /x_lock/lock
。最终只有一个客户端创建成功,那么该客户端就获取了锁。同时没有获取到锁的其他客户端,注册一个子节点变更的 Watcher 监听。又称为读锁。允许一个资源可以被多个读操作访问,或者被一个写操作访问,但两者不能同时进行。
通过 ZooKeeper 上的 Znode 表示一个锁,/s_lock/[HOSTNAME]-请求类型-序号
。
/
├── /host1-R-000000001
├── /host2-R-000000002
├── /host3-W-000000003
├── /host4-R-000000004
├── /host5-R-000000005
├── /host6-R-000000006
└── /host7-W-000000007
s_lock
这个节点下面创建一个临时顺序节点,如果当前是读请求,就创建类型为 R 的临时节点,如果是写请求,就创建类型为 W 的临时节点。s_lock
的所有子节点,并对该节点注册子节点变更的 Watcher 监听
2.2 然后确定自己的节点序号在所有的子节点中的顺序
2.3 对于读请求,如果没有比自己小的子节点,或是所有比自己序号小的子节点都是读请求,那么表明自己已经成功获取到了共享锁,同时开始执行读取逻辑,如果有比自己序号小的写请求,那么就需要进行等待
2.4 接收到 Watcher 通知后重复 2.1在 2.7.2 介绍的共享锁中,在判断读写顺序的时候会出现一个问题,假如 host4 在移除自己的节点的时候,后面 host5-7 都需要接收 Watcher 事件通知,但是实际上,只有 host5 接收到事件就可以了。因此以上的实现方式会产生大量的 Watcher 通知。这样会对 ZooKeeper 服务器造成了巨大的性能影响和网络冲击,这就是羊群效应。
改进的一步在于,调用 getChildren()
接口的时候获取到所有已经创建的子节点列表,但是这个时候不要注册任何的 Watcher。当无法获取共享锁的时候,调用 exist()
来对比自己小的那个节点注册 Wathcer。而对于读写请求,会有不同的定义:
使用 ZooKeeper 实现 FIFO 队列,入队操作 offer就是在 queue_fifo
下创建自增序的子节点,并把数据放入节点内。出队操作 poll就是先找到 queue_fifo
下序号最小的那个节点,取出数据,然后删除此节点。
/queue_fifo
|
├── /host1-000000001
├── /host2-000000002
├── /host3-000000003
└── /host4-000000004
出队操作,根据以下步骤确定执行顺序:
get_children()
接口获取 /queue_fifo
节点下所有子节点Barrier就是栅栏或者屏障,适用于这样的业务场景:当有些操作需要并行执行,但后续操作又需要串行执行,此时必须等待所有并行执行的线程全部结束,才开始串行,于是就需要一个屏障,来控制所有线程同时开始,并等待所有线程全部结束。
利用 ZooKeeper 的实现,开始时 queue_barrier
节点是一个已经存在的默认节点,并且将其节点的数据内容赋值为一个数字 n 来代表 Barrier 值,比如 n=10 代表只有当 /queue_barrier
节点下的子节点个数达到10才会打开 Barrier。之后所有客户端都会在 queue_barrier
节点下创建一个临时节点,如 queue_barrier/host1
。
如何控制所有线程同时开始?
所有的线程启动时在 ZooKeeper 节点 /queue_barrier
下插入顺序临时节点,然后检查 /queue/barrier
下所有 children 节点的数量是否为所有的线程数,如果不是,则等待,如果是,则开始执行。具体的步骤如下:
getData()
获取 /queue_barrier
节点的数据内容getChildren()
获取 /queue_barrier
节点下的所有子节点,同时注册对子节点列表变更的 Watche 监听。如何等待所有线程结束?
所有线程在执行完毕后,都检查 /queue/barrier
下所有 children 节点数量是否为0,若不为0,则继续等待。
用什么类型的节点? 根节点使用持久节点,子节点使用临时节点,根节点为什么要用持久节点?首先因为临时节点不能有子节点,所以根节点要用持久节点,并且在程序中要判断根节点是否存在。 子节点为什么要用临时节点?临时节点随着连接的断开而消失,在程序中,虽然会删除临时节点,但可能会出现程序在节点被删除之前就 crash了,如果是持久节点,节点不会被删除。
Hadoop 利用 ZooKeeper 实现了高可用的功能,包括 HDFS 的 NameNode 和 YARN 的 ResourceManager。此外 YARN 的运行状态是利用 ZooKeeper 来存储的。
/yarn-leader-election/pseudo-yarn-rm-cluster
竞争创建锁节点Fencing 一般指解决脑裂这样的问题,YARN 引入了 Fencing 机制,利用的是 ZooKeeper 数据节点的 ACL 权限控制。
如果 RM1 假死,此时 RM2 成为 Active 状态,当 RM1 恢复之后,会试图去更新 ZooKeeper 里的数据,但此时会发现写上了 ACL 权限的节点无法修改,这样就可以避免了脑裂。
Spark 对 ZooKeeper 的使用主要在以下几个类中。
org.apache.spark.deploy.master.ZooKeeperPersistenceEngine
org.apache.spark.deploy.master.ZooKeeperLeaderElectionAgent
ZooKeeperPersistenceEngine 这个类主要是用于 Spark Master 高可用的持久化工作。其实很容易理解,作为一个分布式内存计算框架,每个环节都有「崩溃」的可能性,那么能不能减少诸如宕机、网络问题带来的影响呢?那么就是将运行中的各个环节,或者说有助于恢复这个集群状态的信息持久化下来。
Spark 中有个参数 spark.deploy.recoveryMode
,是当 Master 有问题的时候,重新启动恢复的时候用到的。有几个恢复的方式,从本地文件中恢复,又或者是本文介绍的,利用 ZooKeeper 存储状态,并从中恢复。
领导选举机制可以保证集群虽然存在多个 Master,但是只有一个 Master 是 Active 的,其他都会处于 Standby 状态。
应用 ZooKeeper 选主,利用 ZooKeeper 创建节点是强一致性的,可以保证在分布式高并发的情况下创建的节点一定是全局唯一的,因此只要 Spark 的一个 Master 被选为 Leader,那么在相应目录下,就一定只有这个 Master 是创建成功的,而其他 Master 会创建一个子节点的 Watcher,用于监控当前 Master 是否还存活,一旦他挂了,那么就会重新启动选主过程。
Kafka 中大部分组件都应用了 ZooKeeper。
/broker/ids/[0...N]
记录了 Broker 服务器列表记录,这个临时节点的节点数据是 ip 端口之类的信息。/broker/topcs
记录了 Topic 的分区信息和 Broker 的对应关系Dubbo 基于 ZooKeeper 实现了服务注册中心。哪一个服务由哪一个机器来提供必需让调用者知道,简单来说就是 ip 地址和服务名称的对应关系。ZooKeeper 通过心跳机制可以检测挂掉的机器并将挂掉机器的 ip 和服务对应关系从列表中删除。
至于支持高并发,简单来说就是横向扩展,在不更改代码的情况通过添加机器来提高运算能力。通过添加新的机器向 ZooKeeper 注册服务,服务的提供者多了能服务的客户就多了。
服务提供者在启动的时候,向 ZooKeeper 上的指定节点 /dubbo/${serviceName}/providers
目录下写入自己的 URL 地址,这个操作就完成了服务的发布。
服务消费者启动的时候,订阅 /dubbo/{serviceName}/providers 目录下的提供者 URL 地址, 并向 /dubbo/{serviceName}/consumers 目录下写入自己的URL地址。
Canal 的 HA 分为两部分,Canal server 和 Canal client 分别有对应的 HA 实现。
由于 ZooKeeper 可以很好保证分布式环境中数据的强一致性,也是基于这个特点,使得越来越多的分布式系统将 ZooKeeper 作为核心组件进行封装使用。在以上提到的这些分布式系统的常见的应用场景下,利用 ZooKeeper 可以快速的实现相关的组件,而无需重新造轮子。
很多大型开源的分布式系统中应用 ZooKeeper 来实现组件的时候,一般都会考虑使用 Apache Curator 是 Netflix 公司开源的一个 ZooKeeper 客户端,提供了更高层次和更易用的操作 ZooKeeper 的接口。