ZooKeeper
ZooKeeper 简介
https://zookeeper.apache.org/
ZooKeeper 是一个分布式的,开放源码的分布式应用程序协调服务。
ZooKeeper 从文件系统 API 得到启发,提供一组简单的 API,使得开发人员可以实现通用的协作任务,例如选举主节点,管理组内成员的关系,管理元数据等。
同时 ZooKeeper 的服务组件运行在一组专用的服务器之上,也保证了高容错性和可扩展性。
ZooKeeper 的使命
ZooKeeper 之前,一些系统也可以采用分布式锁管理器或者分布式数据库来实现协作,例如,用数据库,redis 实现分布式锁。
ZooKeeper 的设计更专注于任务协作,它不提供任何锁的接口或者通用的存储数据的接口,也没有强加任何特殊的同步原语,而是提供一个更加敏捷健壮的分布协作方案,例如在主-从模型中,ZooKeeper 没有为应用实现主节点选举,或者进程存活与否的跟踪功能,但是,ZooKeeper 提供了实现这些任务的工具,对于实现什么样的协同任务,有开发人员自己决定。
分布式系统中关键在于进程通信,其有两种选择:直接通过网络进行信息交换,或者读写某些共享存储。对于 ZooKeeper 实现协作和同步原语本质上是使用共享存储模型,即开发的应用是连接到 ZooKeeper 服务器端的客户端,他们连接到 ZooKeeper 服务器端进行相关的操作,以来影响服务器端存储的共享数据,最终应用间实现协作。
ZooKeeper 不适合用作海量的数据存储,对于需要海量的应用数据的情况,可以使用数据库和分布式文件系统,所以在设计应用时,最佳实践是把应用数据和协同数据独立分开。
ZooKeeper 基础简介
ZooKeeper 数据结构
ZooKeeper 采用类似于文件系统的层级树状结构进行管理 Znode,并且暴露操作 API 接口。
在新建 znode 节点,需要指定该节点的类型,znode 的类型分为持久节点、临时节点、有序节点,组合成 4 种类型,持久的,临时的,持久有序的,临时有序的。对于持久节点,只能主动调用 delete 来删除,而临时的 znode,在当创建该节点的客户端崩溃或者关闭了与 ZooKeeper 的连接时,这个节点就会被删除。有序 Znode 节点是被分配唯一一个单调递增的整数。
API 接口
1 | create /path data 创建一个名为/path的znode节点,并包含数据data。 |
需要注意的是,ZooKeeper 并不允许局部写入或读取 znode 节点的数据。当设置一个 znode 节点的数据或读取时,znode 节点的内容会被整个替换或者全部读取进来,特别是 getChildren,如果是数据量比较大,会获取大量的数据。
ZooKeeper 监视与通知
ZooKeeper 客户端获得服务器的数据或者变化,不是通过轮询的模式,而是基于通知的机制,客户端向 ZooKeeper 服务器端注册需要接收通知的 znode,通过对 znode 设置监视点来接收通知,需要强调的是监视点是一个单次触发的操作。
ZooKeeper 架构
ZooKeeper 服务器端运行于两种模式下:独立模式和仲裁模式。
独立服务器只有一个单独的服务器,ZooKeeper 状态无法复制。
仲裁模式,具有一组 ZooKeeper 服务器,称为 ZooKeeper 集合,它们之间可以进行状态的复制,并同时服务客户端的请求。不过服务器集合并不会让客户端等待每个服务器完成数据保存后再继续,而是在满足仲裁数目的服务器保存或者同步了状态就会返回给客户端。
在解决这一分布式数据一致性,ZooKeeper 采用 ZAB(ZooKeeper Atomic Broadcast)的一致性协议,后面会详细的介绍。
ZooKeeper 客户端在服务器集群中执行任何请求前必须先与服务器建立会话(session),客户端提交给 ZooKeeper 的所有操作均关联在一个会话上。客户端初始化连接到集合中某个服务器或一个独立的服务器,客户端提供TCP 协议与服务器进行连接并通信,但当会话无法与当前连接的服务器继续通信时,会话就可能转移到另外一个服务器,ZooKeeper 客户端透明地转移一个会话到不同的服务器。需要指明的,会话提供了顺序保障,同一个会话中的请求会以 FIFO(先进先出)顺序执行。
ZooKeeper 应用案例
Apache HBase
HBase 是一个通常与 Hadoop 一起使用的数据存储仓库。在 HBase 中,ZooKeeper 用于选举一个集群内的主节点,以便跟踪可用的服务器,并保持集群的元数据。
Apache Kafka
Kafka 是一个基于发布-订阅模型的消息系统。其中 ZooKeeper 用于检测崩溃,实现主题的发现,并保持主题的生产和消费状态。
Apache Solr
Solr 是一个企业级的搜索平台,它使用 ZooKeeper 来存储集群的元数据,并协作更新这些元数据。
ZooKeeper 应该是 “The King Of Coordination for Big Data”!
ZooKeeper 内部实现原理
总体来说 ZooKeeper 运行于一个集群环境中,选举出某个服务器作为群首(Leader),其他服务器追随群首(Follower)。群首作为中心处理所有对 ZooKeeper 系统变更的请求,它就像一个定序器,建立了所有对 ZooKeeper 状态的更新的顺序,追随者接收群首所发出更新操作请求,并对这些请求进行处理,以此来保障状态更新操作不会发生碰撞。
请求、事务、标识符
ZooKeeper 服务器会在本地处理只读请求(exists、getData、getChildren),例如一个服务器接收客户端的 getData 请求,服务器读取该状态信息,并把这些信息返回给客户端。
那些会改变 ZooKeeper 状态的客户端请求(create,delete 和 setData)将会转发到群首,群首执行对应的请求,并形成状态的更新,称为事务(transaction),其中事务要以原子方式执行。同时,一个事务还要具有幂等性,事务的幂等性在我们进行恢复处理时更加简单,后面我们可以看到如何利用幂等性进行数据恢复或者灾备。
在群首产生了一个事务,就会为该事务分配一个标识符,称为会话 id(zxid),通过 Zxid 对事务进行标识,就可以按照群首所指定的顺序在各个服务器中按序执行。服务器之间在进行新的群首选举时也会交换 zxid 信息,这样就可以知道哪个无故障服务器接收了更多的事务,并可以同步他们之间的状态信息。
Zxid 为一个 long 型(64 位)整数,分为两部分:时间戳(epoch)部分和计数器(counter)部分。每一部分为 32 位,在我们讨论 zab 协议时,我们就会发现时间戳(epoch)和计数器(counter)的具体作用,我们通过 zab 协议来广播各个服务器的状态变更信息。
群首选举
群首为集群中的服务器选择出来的一个服务器,并会一直被集群所认可。设置群首的目的是为了对客户端所发起的 ZooKeeper 状态更新请求进行排序,包括 create,setData 和 delete 操作。群首将每一个请求转换为一个事务,将这些事务发送给追随者,确保集群按照群首确定的顺序接受并处理这些事务。
每个服务器启动后进入 LOOKING 状态,开始选举一个新的群首或者查找已经存在的群首。如果群首已经存在,其他服务器就会通知这个新启动的服务器,告知哪个服务器是群首,于此同时,新服务器会与群首建立连接,以确保自己的状态与群首一致。如果群首中的所有的服务器均处于 LOOKING 状态,这些服务器之间就会进行通信来选举一个群首,通过信息交换对群首选举达成共识的选择。在本次选举过程中胜出的服务器将进入 LEADING 状态,而集群中其他服务器将会进入 FOLLOWING 状态。
具体看,一个服务器进入 LOOKING 状态,就会发送向集群中每个服务器发送一个通知信息,该消息中包括该服务器的投票(vote)信息,投票中包含服务器标识符(sid)和最近执行事务的 zxid 信息。
当一个服务器收到一个投票信息,该服务器将会根据以下规则修改自己的投票信息:
将接收的 voteId 和 voteZxid 作为一个标识符,并获取接收方当前的投票中的 zxid,用 myZxid 和 mySid 表示接收方服务器自己的值。
如果(voteZxid > myZxid)或者(voteZxid == myZxid 且 voteId >mySid),保留当前的投票信息。
否则,修改自己的投票信息,将 voteZxid 赋值给 myZxid,将 voteId 赋值给 mySid。
从上面的投票过程可以看出,只有最新的服务器将赢得选举,因为其拥有最近一次的 zxid。如果多个服务器拥有的最新的 zxid 值,其中的 sid 值最大的将会赢得选举。
当一个服务器连接到仲裁数量的服务器发来的投票都一样时,就表示群首选举成功,如果被选举的群首为某个服务器自己,该服务器将会开始行使群首角色,否则就会成为一个追随者并尝试连接被选举的群首服务器。一旦连接成功,追随者和群首之间将会进行状态同步,在同步完成后,追随者才可以进行新的请求。
Zab:状态更新的广播协议
在接收到一个写请求操作后,追随者会将请求转发给群首,群首将会探索性的执行该请求,并将执行结果以事务的方式对状态更新进行广播。如何确认一个事务是否已经提交,ZooKeeper 由此引入了 zab 协议,即原子广播协议(ZooKeeper Atomic Broadeast protocol)。该协议提交一个事务非常简单,类型于一个两阶段提交。
群首向所有追随者发送一个 PROPOSAL 消息 p。
当一个追随者接收到消息 p 后,会响应群首一个 ACK 消息,通知群首其已接受该提案(proposal)。
当收到仲裁数量的服务器发送的确认消息后(该仲裁数包括群首自己),群首就会发送消息通知追随者进行提交(COMMIT)操作。
Zab 保障了以下几个重要的属性
如果群首按顺序广播了事务 T 和事务 T,那么每个服务器在提交 T 事务前保证事务 T 已经完成提交。
如果某个服务器按照事务 T 和事务 T 的顺序提交了事务,所有其他服务器也必然会在提交事务 T 前提交事务 T。
第一个属性保证事务在服务器之间传送顺序的一致,而第二个竖向保证服务器不会跳过任何事务。
观察者
观察者与追随者有一些共同的特点,他们提交来自群首的提议,不同于追随者的是,观察者不参与选举过程,他们仅仅学习经由 INFORM 消息提交的提议。
引入观察者的一个主要原因是提高读请求的可扩展性。通过加入多个观察者,我们可以在不牺牲写操作的吞吐率的前提下服务更多的读操作。但是引入观察者也不是完全没有开销,每一个新加入的观察者将对应于每一个已提交事务点引入的一条额外消息。
采用观察者的另外一个原因是进行跨多个数据中心部署。由于数据中心之间的网络链接延时,将服务器分散于多个数据中心将明显地降低系统的速度。引入观察者后,更新请求能够先以高吞吐量和低延迟的方式在一个数据中心内执行,接下来再传播到异地的其他数据中心得到执行。
服务器的构成
群首,追随者,观察者根本上都是服务器。在实现服务器主要抽象概念是请求处理器。请求处理器是对处理流水线上不同阶段的抽象,每个服务器实现一个请求处理器的序列。
独立服务器
PrepRequestProcessor 接受客户端的请求并执行这个请求,处理结果则是生成一个事务。不过只有改变 ZooKeeper 状态的操作才会产生事务,对于读操作并不会产生任何事务。
SyncRequestProcessor 负责将事务持久化到磁盘上。实际上就是将事务数据按照顺序追加到事务日志中,并形成快照数据。
最后一个处理器为 FinalRequestProcessor,如果 Request 对象包含事务数据,该处理器就会接受对 ZooKeeper 数据树的修改,否则,该处理器会从数据树中读取数据并返回客户端。
群首服务器
在切换到仲裁模式时,服务器的流水线则有一些变化。
第一个处理器同样是 PrepRequestProcessor,而之后的处理器则为 ProposalRequestProcessor,该处理器会准备一个提议,并将该提议发送给跟随者,并且会把所有请求转发给 CommitRequestProcessor,对于写操作请求,还会把请求转发给 SyncRequestProcessor 处理器。
SyncRequestProcessor 和独立服务器的功能一样,是持久化事务到磁盘上,执行完后会触发 AckRequestProcessor 处理器,它仅仅生成确认消息并返回给自己。
CommitRequestProcessor 会将收到足够多的确认消息的提议进行提交。
追随者和观察者服务器
Follower 服务器是先从 FollowerRequestProcessors 处理器开始,该处理器接收并处理客户端请求,FollowerRequestProcessors 处理器之后转发请求给 CommitRequestProcessor,同时也会转发写请求到群首服务器。CommitRequestProcessor 会直接转发读取请求到 FinalRequestProcessor 处理器,而且对于写请求,在转发前会等待提交事务。而群首接收到一个新的写请求时会生成一个提议,之后转发到追随者服务器,在收到一个提议,追随服务器会发送这个提议到 SyncRequestProcessor,SendRequestProcessor 会向群首发送确认消息。
当群首服务器接收到足够多确认消息来提交这个提议是,群首就会发送提交事务消息给追随者,当收到提交的事务消息时,追随者就通过 CommitRequestProcessor 处理器进行处理。为了保证执行的顺序,CommitRequestProcessor 处理器会在收到一个写请求处理器时暂停后续的请求处理。
对于观察者服务器不需要确认提议消息,因此观察者服务器并不需要发送确认消息给群首服务器,一般情况下,也不用持久化事务到磁盘。对于观察者服务器是否持久化事务到磁盘,以便加速观察者服务器的恢复速度,可以根据具体情况决定。
本地存储
SyncRequestProcessor 处理器是用于处理提议写入的日志和快照。
日志和磁盘的使用
服务器通过事务日志来持久化事务。在接受一个提议时,一个服务器就会将提议的事务持久化到事务日志中,该事务日志保存在服务器本地磁盘中,而事务将会按照顺序追加其后。写事务日志是写请求操作的关键路径,因此 ZooKeeper 必须有效处理写日志问题。在持久化事务到磁盘时,还有一个重要说明:现代操作系统通常会缓存脏页(Dirty Page),并将他们异步写入磁盘介质。然而,我们需要在继续之前,要确保事务已经被持久化。因此我们需要冲刷(Flush)事务到磁盘介质。
冲刷在这里就是指我们告诉操作系已经把脏页写入到磁盘,并在操作完成后返回。同时为了提高 ZooKeeper 系统的运行速度,也会使用组提交和补白的。其中组提交是指一次磁盘写入时追加多个事务,可以减少磁盘寻址的开销。补白是指在文件中预分配磁盘存储块。
快照
快照是 ZooKeeper 数据树的拷贝副本,每一个服务器会经常以序列化整个数据树的方式来提取快照,并将这个提取的快照保存到文件。服务器在进行快照时不需要进行协作,也不需要暂停处理请求。因此服务器在进行快照时还会继续处理请求,所以当快照完成时,数据树可能又发生了变化,称为快照是模糊的,因为它们不能反映出在任意给定的时间点数据树的准确的状态。
服务器与会话
会话(session)是 ZooKeeper 的一个重要的抽象。保证请求有序,临时 znode 节点,监控点都与会话密切相关。因此会话的跟踪机制对 ZooKeeper 来说也是非常重要的。
在独立模式下,单个服务器会跟踪所有的会话,而在仲裁模式下则由群首服务器来跟踪和维护。而追随者服务器仅仅是简单地把客户端连接的会话信息转发到群首服务器。
为了保证会话的存活,服务器需要接收会话的心跳信息。心跳的形式可以是一个新的请求或者显式的 ping 信息。两种情况下,服务器通过更新会话的过期时间来触发会话活跃,在仲裁模式下,群首服务器发送一个 PING 信息给它的追随者们,追随者们返回自从最新一次 PING 消息之后的一个 session 列表。群首服务器每半个 tick 就会发送一个 ping 信息给追随者们。
服务器与监视点
监视点是由读取操作所设置的一次性触发器,每个监视点有一个特定操作来触发,即通过监视点,客户端可以对指定的 znode 节点注册一个通知请求,在发生时就会收到一个单次的通知。监视点只会存在内存,而不会持久化到硬盘,当客户端与服务端的连接断开时,它的所有的监视点会从内存中清除。因为客户端也会维护一份监视点的数据,在重连之后,监视点数据会再次同步到服务端。
客户端
在客户端库中有 2 个主要的类:ZooKeeper 和 ClientCnxn,写客户端应用程序时通过实例化 ZooKeeper 类来建立一个会话。一旦建立起一个会话,ZooKeeper 就会使用一个会话标识符来关联这个会话。这个会话标识符实际上是有服务端所生产的。
ClientCnxn 类管理连接到 server 的 socket 连接。该类维护一个可连接的 ZooKeeper 的服务列表,并当连接断掉的时候无缝地切换到其他服务器,当重连到一个其他的服务器时会使用同一个会话,客户端也会重置所有的监视点到刚连接的服务器上。
序列化
对于网络传输和磁盘保存的序列化消息和事务,ZooKeeper 使用了 Hadoop 中的 Jute 来做序列化。
ZooKeeper 源码浅析
以 3.5.5 版本作为分析。主要从服务端,客户端,以及服务端和客户端结合的部分分析源码。在分析源码时,主要从数据结构,类结构,线程模型,流程等方面看。
服务端
ZooKeeper 服务的启动方式分为三种,即单机模式、伪分布式模式、分布式模式。本章节主要研究分布式模式的启动模型,其主要要经过 Leader 选举,集群数据同步,启动服务器。
分布式模式下的启动过程包括如下阶段,
解析 config 文件;
数据恢复;
监听 client 连接(但还不能处理请求);
bind 选举端口监听 server 连接;
选举;
初始化 ZooKeeperServer;
数据同步;
同步结束,启动 client 请求处理能力。
客户端
从整体看,客户端启动的入口时 ZooKeeperMain,在 ZooKeeperMain 的 run()中,创建出控制台输入对象(jline.console.ConsoleReader),然后它进入 while 循环,等待用户的输入。同时也调用 connectToZK 连接服务器并建立会话(session),在 connect 时创建 ZooKeeper 对象,在 ZooKeeper 的构造函数中会创建客户端使用的 NIO socket,并启动两个工作线程 sendThread 和 eventThread,两个线程被初始化为守护线程。
sendThread 的 run()是一个无限循环,除非运到了 close 的条件,否则他就会一直循环下去,比如向服务端发送心跳,或者向服务端发送我们在控制台输入的数据以及接受服务端发送过来的响应。
eventThread 线程负责队列事件和处理 watch。
客户端也会创建一个 clientCnxn,由 ClientCnxnSocketNIO.java 负责 IO 数据通信。
客户端的场景说明(事务、非事务请求类型)。
服务端和客户端结合部分
会话(Session)
Client 建立会话的流程如下,
服务端启动,客户端启动;
客户端发起 socket 连接;
服务端 accept socket 连接,socket 连接建立;
客户端发送 ConnectRequest 给 server;
server 收到后初始化 ServerCnxn,代表一个和客户端的连接,即 session,server 发送 ConnectResponse 给 client;
client 处理 ConnectResponse,session 建立完成。
监视(Watch)
本小节主要看看 ZooKeeper 怎么设置监视和监控点的通知。ZooKeeper 可以定义不同类型的通知,如监控 znode 的数据变化,监控 znode 子节点的变化,监控 znode 的创建或者删除。ZooKeeper 的服务端实现了监视点管理器(watch manager)。
一个 WatchManager 类的实例负责管理当前已经注册的监视点列表,并负责触发他们,监视点只会存在内存且为本地服务端的概念,所有类型的服务器都是使用同样的方式处理监控点。
DataTree 类中持有一个监视点管理器来负责子节点监控和数据的监控。
在服务端触发一个监视点,最终会传播到客户端,负责处理传播的为服务端的 cnxn 对象(ServerCnxn 类),此对象表示客户端和服务端的连接并实现了 Watcher 接口。Watch.process 方法序列化了监视点事件为一定的格式,以便于网络传送。ZooKeeper 客户端接收序列化的监视点事件,并将其反序列化为监控点事件的对象,并传递给应用程序。
客户端 watcher 实现
在客户端 GetData 时,如果注册 watch 监控点到服务端,在 watch 的 path 的 value 变化时,服务端会通知客户端该变化。
在客户端的 GetData 方法中(ZooKeeper 类的 GetData):
创建 WatchRegistration wcb= new DataWatchRegistration(watcher, clientPath),path 和 watch 封装进了一个对象;
创建一个 request,设置 type 为 GetData 对应的数值;
request.setWatch(watcher != null),setWatch 参数为一个 bool 值。
调用 ClientCnxn.submitRequest(…) , 将请求包装为 Packet,queuePacket()方法的参数中存在创建的 path+watcher 的封装类 WatchRegistration,请求会被 sendThread 消费发送到服务端。
实践采坑和优化
Title: ZooKeeper
Author: Jiandong
Date: 2020-05-06
Last Update: 2025-04-29
Blog Link: https://mjd507.github.io/2020/05/06/ZooKeeper/
Copyright Declaration: Please refer carefully, most of the content I have not fully mastered.