分布式学习十四:协调任务
【摘要】 分布式协调/通知服务mysql备份数据时,我们会通过读取binlog方式备份,但是如果当从服务器宕机时,则备份就会停止,我们可以通过zookeeper实现分布式协调备份主服务进行备份提交,其他服务监听主服务器状态,如果宕机失去联系,则替代主服务进行工作.实现原理在zookeeper节点结构如下:test└── customBackUp └── tasks 任务列表 └─...
分布式协调/通知服务
mysql备份数据时,我们会通过读取binlog方式备份,但是如果当从服务器宕机时,则备份就会停止,我们可以通过zookeeper实现分布式协调备份
主服务进行备份提交,其他服务监听主服务器状态,如果宕机失去联系,则替代主服务进行工作.
实现原理
在zookeeper节点结构如下:
test
└── customBackUp
└── tasks 任务列表
└── task01 任务
├── instance 服务实例列表
│ └── server_1_00000001 有序/临时 节点
├── lastCommit 最后提交id
└── status 当前状态
复制
server进程:
1:判断tasks是否存在 task01 任务
2:如果不存在则初始化 task01 任务的节点列表
monitor进程:
1:监听tasks所有任务下的 status 节点,进行监控报警
task进程
1:多台服务初始化之后,先获取指定任务列表的节点数据(task01)
2:在instance中注册自己的有序/临时节点
3:注册完成之后,判断instance自己的节点是否为最小的,如果是,则节点状态为 "主服务"
4:如果不是最小的,则节点状态为:"从服务"
5:主服务进行处理数据,将status状态更新为"running",并将处理的进度id保留到 lastCommit 中
6:从服务进行监听instance节点列表,当主服务断线后,临时节点将会被删除,从而触发监听
7:从服务将status改为"stop"状态,重新进行判断节点是否最小
8:重复3-7
完整架构图解
简单实现代码
package main
import (
"errors"
"fmt"
"github.com/go-zookeeper/zk"
"os"
"strconv"
"strings"
"time"
)
var serverId int = 1
type instanceStatus int
var (
StatusRunning instanceStatus = 1
StatusStandby instanceStatus = -1
)
func main() {
serverId, _ = strconv.Atoi(os.Args[1])
conn, _, err := zk.Connect([]string{"127.0.0.1:20005"}, time.Second*10)
if err != nil {
panic(err)
}
logWithTime(fmt.Sprintf("serverId:%v start.",serverId))
//task path
path := "/customBackUp/tasks/task01"
//check path exits
exists, _, err := conn.Exists(path)
if err != nil {
panic(err)
}
logWithTime(fmt.Sprintf("check task node exist."))
//if path not exits,create path
if exists == false {
err := createPath(conn, path)
if err != nil {
panic(err)
}
err = createPath(conn, path+"/instance/")
if err != nil {
panic(err)
}
err = createPath(conn, path+"/lastCommit/")
if err != nil {
panic(err)
}
err = createPath(conn, path+"/status/")
if err != nil {
panic(err)
}
logWithTime(fmt.Sprintf("create task node"))
}
//register task(create a node sequence and ephemeral path)
registerInstanceNodePath := path + "/instance/" + "server_" + strconv.Itoa(serverId)
createPath, err := conn.Create(registerInstanceNodePath, []byte{}, zk.FlagSequence|zk.FlagEphemeral, zk.WorldACL(zk.PermAll))
if err != nil {
panic(err)
}
logWithTime(fmt.Sprintf("register instance node: %v",createPath))
start_worker:
status, err := checkInstanceStatus(conn, createPath, path+"/instance")
if err != nil {
panic(err)
}
logWithTime(fmt.Sprintf("current server status: %v",status))
if status == StatusRunning {
//status need to update to running
_, err := conn.Set(path+"/status", []byte("1"), -1)
if err != nil {
panic(err)
}
logWithTime(fmt.Sprintf("current server handle task..."))
handleTask(conn, path+"/lastCommit")
} else if status == StatusStandby {
logWithTime(fmt.Sprintf("current server watch node..."))
watchTaskNode(conn, path+"/instance")
//status need to update to stop
_, err := conn.Set(path+"/status", []byte("0"), -1)
if err != nil {
panic(err)
}
goto start_worker
}
}
func handleTask(conn *zk.Conn, commitPath string) {
//get commitId
bytes, _, err := conn.Get(commitPath)
if err != nil {
panic(err)
}
str := string(bytes)
id, _ := strconv.Atoi(str)
for {
id++
str := strconv.Itoa(id)
_, err := conn.Set(commitPath, []byte(str), -1)
if err != nil {
panic(err)
}
fmt.Printf("[%v]serverId(%v),commitId:%v \n", time.Now().Format("2006-01-02 15:04:05"), serverId, id)
time.Sleep(time.Second * 5)
}
}
func watchTaskNode(conn *zk.Conn, watchPath string) {
_, _, events, err := conn.ChildrenW(watchPath)
if err != nil {
panic(err)
}
fmt.Printf("event change: %v", <-events)
return
}
func checkInstanceStatus(conn *zk.Conn, nodeName string, path string) (status instanceStatus, err error) {
currentId := getInstanceNodeId(nodeName)
minId := currentId
//get all child nodes of task Instance node
nodeArr, _, err := conn.Children(path)
if err != nil {
return 0, err
}
for _, v := range nodeArr {
nodeId := getInstanceNodeId(v)
if nodeId <= minId {
minId = nodeId
}
}
if minId == currentId {
return StatusRunning, nil
} else {
return StatusStandby, nil
}
}
func getInstanceNodeId(nodeName string) int {
//nodeanme='/customBackUp/tasks/task01/instance/server_10000000001'
//only need to intercept the last 8 digits
id := nodeName[len(nodeName)-8:]
intId, _ := strconv.Atoi(id)
return intId
}
func createPath(conn *zk.Conn, path string) (err error) {
strArr := strings.Split(path, "/")
var node string
for _, str := range strArr {
if str == "" {
continue
}
node = node + "/" + str
exists, _, err := conn.Exists(node)
if err != nil {
return errors.New(err.Error())
}
if exists {
continue
} else {
_, err = conn.Create(node, []byte{}, 0, zk.WorldACL(zk.PermAll))
if err != nil {
return errors.New(err.Error())
}
}
}
return err
}
func logWithTime(log string) {
fmt.Printf("%v %v\n", time.Now().Format("2006-01-02 15:04:05"), log)
}
复制
运行工作图:
注意:此代码部分逻辑缺失,例如:
1:发布任务的task进程没有体现
2:监控任务的monitor没有体现
【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱:
cloudbbs@huaweicloud.com
- 点赞
- 收藏
- 关注作者
评论(0)