我的超细!细说Zookeeper选举的一个案例
今天我们来带着大家实现用Zookeeper实现选举的案例,帮助大家更好的学习Zookeeper。
使用Go来对Zookeeper进行操作需要使用go get github.com/samuel/go-zookeeper/zk
。根据GitHub上有关它的描述它是一个Zookeeper客户端~
当然现在它的最新仓库地址是https://github.com/go-zookeeper/zk
你也可以下载这个仓库的第三方库。
二、定义配置信息和选举管理信息的结构体
type ZookeeperConfig struct {
Servers []string
RootPath string
MasterPath string
}
type ElectionManager struct {
ZKClinet *zk.Conn
ZkConfig *ZookeeperConfig
IsMaster chan bool
}
配置信息就是包含zookeeper集群服务器地址,根路径以及它的Master节点路径。
选举管理信息包括zookeeper的连接信息、zookeeper的配置信息、以及传递选举信息的管道。
三、程序主逻辑
func main() {
zkconfig := &ZookeeperConfig{
Servers: []string{"node01:2181", "node02:2181", "node03:2181"},
RootPath: "/test04",
MasterPath: "/master",
}
isMasterChan := make(chan bool)
electionManager := NewElectionManager(zkconfig, isMasterChan)
go electionManager.Run()
var isMaster bool
for {
select {
case isMaster = <-isMasterChan:
if isMaster {
fmt.Println("实现具体的业务逻辑")
}
}
}
}
这个zkconfig是填入咱们要使用的zookeeper集群配置信息,isMasterChan建立用于返回选取结果的信道,然后创建选举管理器,正如我们平时的选举有选举委员会一样,zookeeper的选举也需要选举管理器。然后开辟协程来进行选举:go electionManager.Run()
,除了每次启动集群时会选举主节点之外,还需要监视主节点,如果主节点出问题了,需要立刻选举出一个新的主节点。下面的isMaster是判断是否是主节点,是通过信道的返回值来判断的。然后下面的for循环,就是不断从管道中读取选举结果,是否成功,如果成功表示集群可以正常运作了,就可以实现具体的业务逻辑了,如果没选举成功就只能一直等待了。
四、创建选举管理器
func NewElectionManager(zkConfig *ZookeeperConfig, isMaster chan bool) *ElectionManager {
electionManager := &ElectionManager{
nil,
zkConfig,
isMaster,
}
electionManager.initConnection()
return electionManager
}
创建选举管理器比较简单,就是输入一些配置信息,来初始化连接,然后将初始化后的选举管理器传递出去。
五、初始化Zookeeper连接
在创建选举管理器中用到了初始化Zookeeper连接的方法,实现如下:
func (electionManager *ElectionManager) initConnection() error {
if !electionManager.isConnected() {
conn, connChan, err := zk.Connect(electionManager.ZKConfig.Servers, time.Second*5)
if err != nil {
return err
}
for {
isConnected := false
select {
case connEvcent := <-connChan:
if connEvcent.State == zk.StateConnected {
isConnected = true
fmt.Println("zk连接成功")
}
case _ = <-time.After(time.Second * 3):
return errors.New("zk连接超时!")
}
if isConnected {
break
}
}
electionManager.ZKClientConn = conn
}
return nil
}
先是判断是否已经连接Zookeeper,如果连接有问题,或者没有连接的情况下就进行连接zookeeper。
下面来介绍这个zk.Connect,Connect 用于建立到 Zookeeper 服务器池的新连接。第一个参数是服务器集群地址,第二个参数是在失去与服务器的连接后当前会话仍被视为有效的时间量。 在会话超时之前,可以重新建立到不同服务器的连接并保持相同的会话。 这样可以维护和监控任何临时节点。
然后就是进入循环,不断去connChan取东西,当connChan是zk.StateConnected时,表示连接成功,就给isConnected赋值为true,如果3秒都未成功,则报错连接超时。当连接成功时跳出这个for循环,electionManager.ZKClientConn = conn
表示将获得的连接给选举管理器的连接。
六、判断是否连接Zookeeper
func (electionManager *ElectionManager) isConnected() bool {
if electionManager.ZKClientConn == nil {
return false
} else if electionManager.ZKClientConn.State() != zk.StateConnected {
return false
}
return true
}
在初始化Zookeeper连接时需要判断是否连接了Zookeeper,其实我们可以看连接是否为nil值来判断有没有连接,如果想判断连接是否有问题的话,我们最好用连接的State()方法,如果是zk.StateConnected,就表示连接成功了,如果是其他就表示连接异常,下面列举连接异常和成功的值。
StateUnknown State = -1
StateDisconnected State = 0
StateConnecting State = 1
StateAuthFailed State = 4
StateConnectedReadOnly State = 5
StateSaslAuthenticated State = 6
StateExpired State = -112
StateConnected = State(100)
StateHasSession = State(101)
可以看到,zk.StateConnected也就是100,所以这里可能可以将zk.StateConnected替换为100,我也不知道是否可行,你可以试一试~ StateUnknown也就是-1表示状态是未知的,StateDisconnected也就是0表示状态是未连接,其它我就不细说了。
七、选举的逻辑
func (electionManager *ElectionManager) Run() {
err := electionManager.electMaster()
if err != nil {
fmt.Println(err)
}
electionManager.watchMaster()
}
是不是很眼熟,这就是main函数中开辟协程运行的东西——选举。
很简单,直接进行选举选出master,然后对master节点进行监听。
八、选举逻辑
func (electionManager *ElectionManager) electMaster() error {
err := electionManager.initConnection()
if err != nil {
return err
}
isExist, _, err := electionManager.ZKClientConn.Exists(electionManager.ZKConfig.RootPath)
if err != nil {
return err
}
if !isExist {
path, err := electionManager.ZKClientConn.Create(electionManager.ZKConfig.RootPath,
nil, 0, zk.WorldACL(zk.PermAll))
if err != nil {
return err
}
if electionManager.ZKConfig.RootPath != path {
return errors.New("创建的" + electionManager.ZKConfig.RootPath + " !=" + path)
}
}
masterPath := electionManager.ZKConfig.RootPath + electionManager.ZKConfig.MasterPath
path, err := electionManager.ZKClientConn.Create(masterPath, nil, zk.FlagEphemeral, zk.WorldACL(zk.PermAll))
if err == nil {
if path == masterPath {
fmt.Println("选举master成功")
electionManager.IsMaster <- true
} else {
return errors.New("创建的" + masterPath + "!=" + path)
}
} else {
//创建master节点失败
fmt.Println("选举master失败!", err)
electionManager.IsMaster <- false
}
return nil
}
要开始选举,必须要先连接Zookeeper,我们编写的initConnection()进行连接zk并可以通过返回值来判断连接是否有问题。
然后判断zk中是否存在根目录,如果没有,就创建根目录,根目录一般数据先不设置,flags是为0,要创建持久化节点,权限不进行控制。
来介绍一下这个Create()叭~
Create()有四个参数,第一个是要创建的路径,第二个参数是节点中的数据内容,第三个参数是节点类型参数,flag=0表示这是一个持久化的节点,第四个参数是权限。有下面这些权限。
PermRead = 1 << iota
PermWrite
PermCreate
PermDelete
PermAdmin
PermAll = 0x1f
使用zk.WorldACL(zk.PermAll)表示该节点没有权限限制
然后再拼接master地址,创建master节点,因为需要创建个临时节点,所以Create()中的第三个参数参数使用zk.FlagEphemeral表示创建临时节点。创建成功,表示选举master成功,哪个客户端节点创建了master,认为是选举哪个节点作为master节点。
创建成功后要给连接的isMaster写入true,不成功则写入false。
九、监听Master节点
func (electionManager *ElectionManager) watchMaster() error {
for {
children, state, childCh, err := electionManager.ZKClientConn.ChildrenW(electionManager.ZKConfig.RootPath + electionManager.ZKConfig.MasterPath)
if err != nil {
fmt.Println("监听失败!", err)
}
fmt.Println("监听到子节点", children, state)
select {
case childEvent := <-childCh:
if childEvent.Type == zk.EventNodeDeleted {
fmt.Println("接收到znode的删除事件", childEvent)
fmt.Println("开始选举新的master...")
err = electionManager.electMaster()
if err != nil {
fmt.Println("选举新的master失败", err)
}
}
}
}
}
需要监听zookeeper根节点下的子节点,因为如果连接断开或对应的子znode被删除,则触发重新选举,所以需要监听目录下所有子节点。
ChildrenW()会返回监听事件,如果监听到子节点被删除,也就是监听到的事件类型是zk.EventNodeDeleted,就重新调用electMaster()进行选举。
看到这的宝宝,肯定都是真爱了,如果喜欢博主,记得点赞、关注、分享三连哦~
- 点赞
- 收藏
- 关注作者
评论(0)