【Go语言实战】 (16) gRPC 集成 ETCD 进行服务注册

举报
小生凡一 发表于 2022/07/05 00:05:53 2022/07/05
【摘要】 文章目录 写在前面1. 服务实例定义2. 服务实例注册3. 使用示例 写在前面 本文采用的 ETCD 版本库是 go.etcd.io/etcd/client/v3 采用的 gRPC ...

写在前面

本文采用的 ETCD 版本库是 go.etcd.io/etcd/client/v3
采用的 gRPC 版本库是 google.golang.org/grpc

在Go语言的RPC框架中,gRPC 是比较原生的,并没有集成 ETCD 服务发现的集成,需要我们去稍微封装一下。而像 micro 框架这种封装性比较好的就有集成 ETCD、consul 等等的服务发现功能,就直接调用就行了。

本文实战例子源码在 https://github.com/CocaineCong/gRPC-todoList 各个模块下的discovery中

在这里插入图片描述

1. 服务实例定义

  • 定义我们所需要注入进ETCD的服务结构体
type Server struct {
	Name    string `json:"name"`
	Addr    string `json:"addr"`    // 地址
	Version string `json:"version"` // 版本
	Weight  int64  `json:"weight"`  // 权重
}

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

Name:名字为服务的名字(用来进行服务的发现)
Addr:服务的地址(存储服务地址)
Version:服务的版本(方便服务的版本迭代)
Weight:服务的权重(后续用来降级熔断)

  • 定义服务名字前缀的函数
func BuildPrefix(server Server) string {
	if server.Version == "" {
		return fmt.Sprintf("/%s/", server.Name)
	}
	return fmt.Sprintf("/%s/%s/", server.Name, server.Version)
}

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 定义注册的地址函数
func BuildRegisterPath(server Server) string {
	return fmt.Sprintf("%s%s", BuildPrefix(server), server.Addr)
}

  
 
  • 1
  • 2
  • 3
  • 将值反序列化成一个注册 Server 服务
func ParseValue(value []byte) (Server, error) {
	server := Server{}
	if err := json.Unmarshal(value, &server); err != nil {
		return server, err
	}

	return server, nil
}

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 分割路径,后续用作 Server 地址的更新
func SplitPath(path string) (Server, error) {
	server := Server{}
	strs := strings.Split(path, "/")
	if len(strs) == 0 {
		return server, errors.New("invalid path")
	}
	server.Addr = strs[len(strs)-1]

	return server, nil
}

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 判断这个服务地址是否已经存在,防止服务访问冲突
func Exist(l []resolver.Address, addr resolver.Address) bool {
	for i := range l {
		if l[i].Addr == addr.Addr {
			return true
		}
	}

	return false
}

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 移除服务
func Remove(s []resolver.Address, addr resolver.Address) ([]resolver.Address, bool) {
	for i := range s {
		if s[i].Addr == addr.Addr {
			s[i] = s[len(s)-1]
			return s[:len(s)-1], true
		}
	}
	return nil, false
}

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

2. 服务实例注册

  • 定义服务实例的实例,用来存储全部的实例信息,并且维持各个服务之间的执行,防止宕机等情况
type Register struct {
	EtcdAddrs   []string
	DialTimeout int

	closeCh     chan struct{}
	leasesID    clientv3.LeaseID
	keepAliveCh <-chan *clientv3.LeaseKeepAliveResponse

	srvInfo Server
	srvTTL  int64
	cli     *clientv3.Client
	logger  *logrus.Logger
}

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 创建一个注册对象
func NewRegister(etcdAddrs []string, logger *logrus.Logger) *Register {
	return &Register{
		EtcdAddrs:   etcdAddrs,
		DialTimeout: 3,
		logger:      logger,
	}
}

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 注册服务到 ETCD 中
func (r *Register) Register(srvInfo Server, ttl int64) (chan<- struct{}, error) {
	var err error

	if strings.Split(srvInfo.Addr, ":")[0] == "" {
		// 判断服务地址的正确性
		return nil, errors.New("invalid ip address")
	}
	// 对服务进行注册
	if r.cli, err = clientv3.New(clientv3.Config{
		Endpoints:   r.EtcdAddrs,
		DialTimeout: time.Duration(r.DialTimeout) * time.Second,
	}); err != nil {
		return nil, err
	}

	r.srvInfo = srvInfo  // 服务信息的注册
	r.srvTTL = ttl 		 // 服务的存活时间

	if err = r.register(); err != nil {
		return nil, err
	}
	// 初始化一个切片来判断这个服务连接是否关闭
	r.closeCh = make(chan struct{}) 
	// 异步进行心跳检测
	go r.keepAlive()

	return r.closeCh, nil
}

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28

这里我们要先说明一个名词:租约

ETCD的 Lease 租约,它类似 TTL(Time To Live)用于 etcd 客户端与服务端之间进行活性检测

在到达 TTL 时间之前,etcd 服务端不会删除相关租约上绑定的键值对;超过 TTL 时间,则会删除。因此我们需要在到达 TTL 时间之前续租,以实现客户端与服务端之间的保活。

func (r *Register) register() error {
	//设置超时时间,访问etcd有超时控制
	ctx, cancel := context.WithTimeout(context.Background(), time.Duration(r.DialTimeout)*time.Second)
	defer cancel()

	// 注册一个新的租约
	leaseResp, err := r.cli.Grant(ctx, r.srvTTL) 
	if err != nil {
		return err
	}
	
	// 赋值租约的ID
	r.leasesID = leaseResp.ID

	// 对这个 cli 进行心跳检测
	if r.keepAliveCh, err = r.cli.KeepAlive(context.Background(), r.leasesID); err != nil {
		return err
	}
	
	data, err := json.Marshal(r.srvInfo)
	if err != nil {
		return err
	}
	// 将服务写到 ETCD 中
	_, err = r.cli.Put(context.Background(), BuildRegisterPath(r.srvInfo), string(data), clientv3.WithLease(r.leasesID))

	return err
}

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 关闭服务连接
func (r *Register) Stop() {
	r.closeCh <- struct{}{}
}

  
 
  • 1
  • 2
  • 3
  • 删除节点
func (r *Register) unregister() error {
	_, err := r.cli.Delete(context.Background(), BuildRegisterPath(r.srvInfo))
	return err
}

  
 
  • 1
  • 2
  • 3
  • 4
  • 存活检测
func (r *Register) keepAlive() {
	ticker := time.NewTicker(time.Duration(r.srvTTL) * time.Second)

	for {
		select {
		case <-r.closeCh: // 是否存在这个服务
			if err := r.unregister(); err != nil {
				r.logger.Error("unregister failed, error: ", err)
			}
			// 撤销租约
			if _, err := r.cli.Revoke(context.Background(), r.leasesID); err != nil {
				r.logger.Error("revoke failed, error: ", err)
			}
		case res := <-r.keepAliveCh:
			if res == nil {
				if err := r.register(); err != nil {
					r.logger.Error("register failed, error: ", err)
				}
			}
		case <-ticker.C:
			if r.keepAliveCh == nil {
				if err := r.register(); err != nil {
					r.logger.Error("register failed, error: ", err)
				}
			}
		}
	}
}

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 获取注册服务的信息
func (r *Register) GetServerInfo() (Server, error) {
	resp, err := r.cli.Get(context.Background(), BuildRegisterPath(r.srvInfo))
	if err != nil {
		return r.srvInfo, err
	}

	server := Server{}
	if resp.Count >= 1 {
		if err := json.Unmarshal(resp.Kvs[0].Value, &server); err != nil {
			return server, err
		}
	}

	return server, err
}

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

3. 使用示例

  • 服务注册
etcdRegister := discovery.NewRegister(etcdAddress, logrus.New())

  
 
  • 1
  • 定义一个Node存放服务信息
userNode := discovery.Server{
		Name: viper.GetString("server.domain"),
		Addr: grpcAddress,
	}

  
 
  • 1
  • 2
  • 3
  • 4
  • 注册
if _, err := etcdRegister.Register(userNode, 10); err != nil {
	panic(fmt.Sprintf("start server failed, err: %v", err))
}

  
 
  • 1
  • 2
  • 3

文章来源: blog.csdn.net,作者:小生凡一,版权归原作者所有,如需转载,请联系作者。

原文链接:blog.csdn.net/weixin_45304503/article/details/125591718

【版权声明】本文为华为云社区用户转载文章,如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。