【深入浅出etcd系列】4. 客户端
绪论
etcd的v2可以完全使用rest方式访问,v3则也可以通过部署一个grpc-gateway实现rest访问。但是一般来说,我们代码中操作etcd的读写还是会通过etcd提供的client来做。client 屏蔽了etcd server多节点访问的负载均衡问题,v3的的client采用grpc client可以维持长连接,断链自动重连。k8s的storage也是封装了etcd的client给上层提供了一个Storage的API出去的。
理解client的工作原理对理解k8s的持久化存储功能和定位k8s与连接etcd相关的问题大有帮助,因此本篇专门分析etcd client v2和v3的代码。梳理etcd client与server建立连接和访问的流程。
clientv2
v2开发的api完全是rest方式的,因此客户端的代码也都是通过rest访问server的。代码比较简单,我们首先介绍整体的框架和设计思路,然后分创建和发送请求两部分讲述clientv2的代码流程。
总体框架
首先我们看Client接口对外提供哪些操作:
type Client interface { // 同步更新client中保存的etcd集群的节点配置。etcd集群的节点是可以动态的增删的,这个操作可以同步更新client侧的节点配置情况。 Sync(context.Context) error // 自动同步,一般会在一个协程里面运行,周期性地调用Sync函数。 AutoSync(context.Context, time.Duration) error // 返回当前服务端集群的节点client url列表。因为etcd的集群节点可以动态增删,这个列表可能跟初始传入的列表不一样。 Endpoints() []string // 设置服务端集群的节点client url列表。 SetEndpoints(eps []string) error // 获取当前etcd服务节点和整个集群的etcd版本号。 GetVersion(ctx context.Context) (*version.Versions, error) //这个接口提供了一个方法 Do(),就是发送http请求的。客户端的增删改查操作都是调用这个Do函数实现的。 httpClient } type httpClient interface { Do(context.Context, httpAction) (*http.Response, []byte, error) }
client包中提供了这个接口的几种实现,如果直接调用client包的New函数创建client,会生成一个httpClusterClient 结构的对象。这个对象有一个工厂方法,创建针对单个节点的http client。如果直接使用Client接口操作etcd,虽然已经屏蔽了多节点的路由选择问题,但是对每种操作构造http请求依然比较复杂,因此client包中另外还有两个接口KeysAPI、MembersAPI,封装了对key-value的操作和对member的操作。整体的类图如下图所示:
如此封装以后,客户端的操作变得非常简单,下面的代码是官方提供的v2操作示例:
package main import ( "log" "time" "golang.org/x/net/context" "github.com/coreos/etcd/client" ) func main() { cfg := client.Config{ Endpoints: []string{"http://127.0.0.1:2379"}, Transport: client.DefaultTransport, // set timeout per request to fail fast when the target endpoint is unavailable HeaderTimeoutPerRequest: time.Second, } c, err := client.New(cfg) if err != nil { log.Fatal(err) } kapi := client.NewKeysAPI(c) // set "/foo" key with "bar" value log.Print("Setting '/foo' key with 'bar' value") resp, err := kapi.Set(context.Background(), "/foo", "bar", nil) if err != nil { log.Fatal(err) } else { // print common key info log.Printf("Set is done. Metadata is %q\n", resp) } // get "/foo" key's value log.Print("Getting '/foo' key value") resp, err = kapi.Get(context.Background(), "/foo", nil) if err != nil { log.Fatal(err) } else { // print common key info log.Printf("Get is done. Metadata is %q\n", resp) // print value log.Printf("%q key has %q value\n", resp.Node.Key, resp.Node.Value) } }
可以看到,通过client.New工厂可以简单地创建Client,传入client即可创建KeysAPI。便可以对etcd进行数据的读写操作了。
实现细节
对于client的实现细节,最关键的两点是:负载均衡和Watch机制,下面我们将分开讲述这两部分的细节。
1. 负载均衡
etcdv2负载均衡其实非常简单,它是从一个pinned的地址开始,遍历所有的节点,直到请求成功或者遍历了所有的服务器。pinned是一个状态变量,如果某次请求成功了,就会把成功返回的服务端节点地址设置为pinned。如此,下一个请求会首先尝试这个pinned。
可能有人会有疑问,如果所有客户端都是按照相同的方式传入服务端节点的url。那么只要第一个节点没有挂,那岂不是所有的流量都导到第一个节点上了吗?实际不是的,我们看到httpClusterClient有一个随机数生成变量rand,这个是在初始化和每次Sync重新获取节点以后打乱节点顺序用的。所以,不会出现所有流量都聚集在第一个节点的问题了。当然,client还提供了一种leader节点优先的路由选择模式,请求首先尝试leader,leader不成功尝试其他节点。
2. Watch机制
客户端调用KeyAPI的Watcher函数,可以获得一个Watcher,连续调用Watcher的Next函数,可以获取**的key值的修改,获得最新修改的value。Next的逻辑是循环调用client的Do函数,传入一个wait参数,服务端在看到wait参数的时候,会hold住连接,直到key上有数据更新,或者时间超过了wait定义的时间,就返回。client从返回的数据中解析出Response,如果Response是空的,就继续下一趟循环。如果获取到了value,先把nextWait的等待index加1。表示等待下一个index的数据。
服务端的处理,如果在请求url中设置了stream=true,那么就会一直hold住连接,发送更新,直到超过了wait定义的超时时间。否则一旦有更新就立即返回。实际上,前者就是长连接的方式,后者即是长轮询的方式。而client端的代码中,是没有在url中设置stream=true的,并且从处理逻辑看也不是长连接的方式,而是长轮询的方式。k8s apiserver对外提供的watch机制是长连接的,而apiserver向后端etcd v2的watch则是采用长轮询的方式。
client v3
相对于v2来说,v3由于默认采用grpc作为client和server之间的通信方式,并且提供的api也丰富很多,因此,client v3相对v2更复杂。
rpc服务列表
rpc服务都定义在etcdserver/etcdserverpb/rpc.proto文件中,grpc框架根据proto文件生成rpc.pb.go文件,go文件中定义了grpc的接口、注册函数和client代码。
KV: 用于client对数据增删改查操作。
Range:获取在一个范围内的key的数据。
Put:更新一个key的数据。
DeleteRange:删除一个范围内的key的数据。
Txn:在一个事务中执行多个请求。
Compact:压缩历史数据。
Watch:**数据变化,单个请求可以**一个范围的key的变化。
Watch: **函数。
Lease:租约相关操作,实现TTL的。可以将key绑在一个租约上,这样key的生命周期将跟租约绑定,一旦租约过期,key值自动删除。
LeaseGrant:创建一个租约。
LeaseRevoke:废除一个租约,绑在这个租约上的key也被删除。
LeaseKeepAlive:刷新租约的TTL,保持租约不过期。
LeaseTimeToLive:获取租约信息。
Cluster: 集群操作,包括集群成员节点的增删改查。
MemberAdd: 添加节点。
MemberRemove: 删除节点。
MemberUpdate: 更新节点信息。
MemberList: 获取所偶节点。
Maintenance:运维接口,包括获取状态、告警、快照,后端数据库碎片整理等。
Alarm:获取告警信息。
Status: 获取状态信息。
Defragment:后端数据库碎片整理。
Hash:获取本地KV状态的hash值,测试用的。
Snapshot: 获取后端数据快照。
Auth:认证相关操作,包括使能、去使能认证,增删用户,角色操作等等。
AuthEnable:使能认证。
AuthDisable:去使能认证。
Authenticate:执行认证。
UserAdd:添加用户。
UserGet:获取用户信息。
UserList:获取所有用户。
UserDelete:删除用户。
UserChangePassword:修改用户密码。
UserGrantRole:将某角色赋予用户。
UserRevokeRole: 收回用户的角色。
RoleAdd: 添加角色。
RoleGet:获取角色信息。
RoleList:获取所有角色。
RoleDelete: 删除角色。
RoleGrantPermission:给角色赋予权限。
RoleRevokePermission:收回角色的权限。
范例代码
对于使用者来说,v3的client sdk使用并没有更复杂,甚至因为v3的Client是一个struct,组合了KV、Cluster、Lease、Watcher等众多接口,相当于一个门面。对客户端统一就呈现Client,使得客户端代码显得更加整洁。
实现细节
client在grpc的生成的client基础上,还有很多封装的代码,其中我们重点关注负载均衡功能和重试机制。
1. 负载均衡
v3因为采用了grpc,其负载均衡机制和v2完全不同。grpc本身提供了一个Balancer的扩展接口,客户端可以扩展这个接口定制自己的负载均衡功能。etcd v3的client就是定制了这个接口实现负载均衡的。
etcd定义了一个simpleBalancer结构实现了这个接口,实现了自己的负载均衡逻辑。简单地说,etcd的负载均衡策略是:类似于client v2的pinned地址的功能,即集群中去找一个能连接的节点连接上,一旦连接上,就会把另外尝试建立的长连接给关掉,只留下一个地址的连接,这个地址也就是pinAddr。然后就用这个长连接发请求,除非连接出现问题,否则会一直维持这个长连接不变。它在第一次通知grpc地址列表的时候没有打乱顺序,传入的地址发起连接是并发的过程,所以谁先建立连接,就会一直用这个连接,直到把它用坏了为止。 做成这样一个策略,应该是为了节省TCP连接考虑。
simpleBalancer启动时,会启动一个协程,**当前连接的状况,然后做出行动(逻辑在updateNotifyLoop函数中)。如果是当前没有连接,downc就会被关闭,于是updateNotifyLoop函数中的select走到downc==nil的分支里,通知grpc集群所有地址,grpc就会对所有的地址发起连接。如果一旦有一个连接建立,grpc会调用Up()函数,Up()函数关闭upc通道。updateNotifyLoop的逻辑就会走到upc == nil的分支,这个分支会通知已经建立的连接(也就是pinAddr),grpc框架就会关闭其他多余的连接。
2. 错误重试
grpc本身有错误重试功能,如果你调用grpc的时候传入failFast是false,是几它就会对错误进行重试,重试的方式策略是:只有连接类型的错误才会去重试。重试就是去load balancer重新拿一个地址再次重试,但是我们在刚才负载均衡的策略中看到etcd client实现的load balancer并不适合直接拿连接,它是没有一个连接池在那里的。
因此etcd client的retry不是通过grpc自身的重试来实现的,而是它自己用一个装饰器把etcd 它的client保证成retry的client,retye的逻辑在retry.go的newRetryWrapper()返回的函数中。可以看到执行了rpc调用以后如果失败,要看失败类型。etcd自身的错误导致的失败,是不会重试的。还有就是grpc的错误码如果不是Unavailable(14),也不重试了。重试前会通过simpleBalancer.ConnectNotify()返回的通道等待负载均衡器重新创建好连接。
这里要强调的是,因为这种错误重试机制,使用etcd的client发起rpc请求时,建议在context中配置好超时时间,不然一直重试的请求会将程序阻塞不能往下运行。
- 点赞
- 收藏
- 关注作者
评论(0)