【Go语言实战】 (17) gRPC 集成 ETCD 进行服务发现
【摘要】
文章目录
写在前面1. 构造发现结构体2. 实现机制
写在前面
上一篇文章 gRPC集成ETCD进行服务注册 中,我们已经知道了如何对集合ETCD进行注册。这一次我们继续下一步,对ETCD...
写在前面
上一篇文章 gRPC集成ETCD进行服务注册 中,我们已经知道了如何对集合ETCD进行注册。这一次我们继续下一步,对ETCD进行服务的发现。
1. 构造发现结构体
- 构造服务发现结构体
type Resolver struct {
schema string
EtcdAddrs []string
DialTimeout int
closeCh chan struct{}
watchCh clientv3.WatchChan
cli *clientv3.Client
keyPrifix string
srvAddrsList []resolver.Address
cc resolver.ClientConn
logger *logrus.Logger
}
- 创建一个基于ETCD的Resolver
func NewResolver(etcdAddrs []string, logger *logrus.Logger) *Resolver {
return &Resolver{
schema: schema,
EtcdAddrs: etcdAddrs,
DialTimeout: 3,
logger: logger,
}
}
2. 实现机制
- 开始进行服务发现机制
func (r *Resolver) start() (chan<- struct{}, error) {
var err error
r.cli, err = clientv3.New(clientv3.Config{
Endpoints: r.EtcdAddrs,
DialTimeout: time.Duration(r.DialTimeout) * time.Second,
})
if err != nil {
return nil, err
}
resolver.Register(r)
r.closeCh = make(chan struct{})
if err = r.sync(); err != nil {
return nil, err
}
go r.watch()
return r.closeCh, nil
}
- watch机制的实现
func (r *Resolver) watch() {
ticker := time.NewTicker(time.Minute)
r.watchCh = r.cli.Watch(context.Background(), r.keyPrifix, clientv3.WithPrefix())
for {
select {
case <-r.closeCh:
return
case res, ok := <-r.watchCh:
if ok {
r.update(res.Events)
}
case <-ticker.C:
if err := r.sync(); err != nil {
r.logger.Error("sync failed", err)
}
}
}
}
- 更新节点操作
func (r *Resolver) update(events []*clientv3.Event) {
for _, ev := range events {
var info Server
var err error
switch ev.Type {
case clientv3.EventTypePut:
info, err = ParseValue(ev.Kv.Value)
if err != nil {
continue
}
addr := resolver.Address{Addr: info.Addr, Metadata: info.Weight}
if !Exist(r.srvAddrsList, addr) {
r.srvAddrsList = append(r.srvAddrsList, addr)
r.cc.UpdateState(resolver.State{Addresses: r.srvAddrsList})
}
case clientv3.EventTypeDelete:
info, err = SplitPath(string(ev.Kv.Key))
if err != nil {
continue
}
addr := resolver.Address{Addr: info.Addr}
if s, ok := Remove(r.srvAddrsList, addr); ok {
r.srvAddrsList = s
r.cc.UpdateState(resolver.State{Addresses: r.srvAddrsList})
}
}
}
}
- 同步获取所有地址信息
func (r *Resolver) sync() error {
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
res, err := r.cli.Get(ctx, r.keyPrifix, clientv3.WithPrefix())
if err != nil {
return err
}
r.srvAddrsList = []resolver.Address{}
for _, v := range res.Kvs {
info, err := ParseValue(v.Value)
if err != nil {
continue
}
addr := resolver.Address{Addr: info.Addr, Metadata: info.Weight}
r.srvAddrsList = append(r.srvAddrsList, addr)
}
r.cc.UpdateState(resolver.State{Addresses: r.srvAddrsList})
return nil
}
文章来源: blog.csdn.net,作者:小生凡一,版权归原作者所有,如需转载,请联系作者。
原文链接:blog.csdn.net/weixin_45304503/article/details/125592233
【版权声明】本文为华为云社区用户转载文章,如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱:
cloudbbs@huaweicloud.com
- 点赞
- 收藏
- 关注作者
作者其他文章
评论(0)