tRPC-Go 框架 04:客户端开发——Proxy、Selector、超时与重试
tRPC-Go 框架 04:客户端开发——Proxy、Selector、超时与重试
服务调用是分布式系统的常态。本篇从调用方视角出发,讲清楚 tRPC-Go 客户端的开发模式:ClientProxy 怎么用、target 怎么配、超时重试怎么加、并发调用怎么写。 
一、ClientProxy
trpc create 生成的客户端骨架:
type GreeterClientProxy interface {
Hello(ctx context.Context, req *HelloReq, opts ...client.Option) (*HelloRsp, error)
}
func NewGreeterClientProxy(opts ...client.Option) GreeterClientProxy { ... }
最朴素调用:
proxy := pb.NewGreeterClientProxy(
client.WithTarget("ip://127.0.0.1:8000"),
)
rsp, err := proxy.Hello(ctx, &pb.HelloReq{Msg: "World"})
注意 opts 在 New 与 Hello 中都可传,后者会覆盖前者——常用于按调用粒度调超时/重试。
二、Target:寻址语法
target 决定"调谁":
<schema>://<service>?<key=value>&<key=value>
| schema | 用途 | 示例 |
|---|---|---|
ip |
直连 | ip://127.0.0.1:8000 |
dns |
DNS | dns://api.example.com:80 |
polaris |
北极星 | polaris://trpc.app.user |
cl5 |
腾讯 CL5 | cl5://12345:67890 |
consul / etcd |
自定义 | 通过插件注册 |
参数示例:
polaris://trpc.app.user?namespace=Production&version=v2
target 可以放在配置文件里:
client:
service:
- name: trpc.app.user.User
target: polaris://trpc.app.user.User
protocol: trpc
timeout: 1000
filter: [retry, debuglog]
代码中只需:
proxy := pb.NewUserClientProxy()
// 自动从配置中加载 target/protocol/timeout/filter
生产推荐:调用参数尽量走配置,代码只调用,部署时改 yaml。
三、协议
tRPC 客户端可选多种协议:
client.WithProtocol("trpc") // 默认
client.WithProtocol("http")
client.WithProtocol("grpc")
切到 HTTP 时还可以用 JSON:
client.WithSerializationType(codec.SerializationTypeJSON)
四、超时设置
超时有三个层级,优先级从高到低:
单次调用 opt > Service 级配置 > 全局默认
// 1. 全局
client.DefaultClient = client.New() // 默认 1s
// 2. 配置文件
service:
timeout: 1000
// 3. 单次
proxy.Hello(ctx, req, client.WithTimeout(500*time.Millisecond))
配合 context:
ctx, cancel := context.WithTimeout(ctx, 500*time.Millisecond)
defer cancel()
proxy.Hello(ctx, req)
ctx 的 deadline 会与 client 配置中取较小值。
五、重试
启用 retry filter:
client:
filter: [retry]
service:
- name: trpc.app.user
retry: 2
或代码注册:
filter.Register("retry", retryFilter, nil)
实际生产中更推荐自定义重试 filter:
func retryFilter(maxRetry int) filter.ClientFilter {
return func(ctx context.Context, req, rsp any, next filter.ClientHandleFunc) error {
var err error
for i := 0; i <= maxRetry; i++ {
err = next(ctx, req, rsp)
if err == nil {
return nil
}
if !shouldRetry(err) || ctx.Err() != nil {
return err
}
time.Sleep(backoff(i))
}
return err
}
}
func shouldRetry(err error) bool {
code := errs.Code(err)
// 仅对网络/超时错误重试
return code == errs.RetClientNetErr || code == errs.RetClientTimeout
}
func backoff(n int) time.Duration {
base := 50 * time.Millisecond
max := 2 * time.Second
d := base << n
if d > max { d = max }
jitter := time.Duration(rand.Int63n(int64(d) / 4))
return d + jitter
}
重要:写操作非幂等时禁止重试。
六、负载均衡选择
client:
service:
- name: trpc.app.user
target: polaris://trpc.app.user
loadbalance: weighted_random # weighted_random / round_robin / consistent_hash / p2c
或代码:
proxy.Hello(ctx, req, client.WithBalancerName("p2c"))
按需用一致性哈希:
proxy.Hello(ctx, req,
client.WithBalancerName("consistent_hash"),
client.WithKey(userID),
)
七、节点过滤与路由
通过 metadata 选定一组节点:
proxy.Hello(ctx, req,
client.WithCalleeSetName("set.shanghai.1"),
client.WithCalleeContainerName("prod"),
)
或在 Polaris 中配置路由规则,按 header.x-uin 走特定 set。
八、并发调用
8.1 errgroup 并发批量调用
import "golang.org/x/sync/errgroup"
func (s *svc) Aggregate(ctx context.Context, ids []int64) ([]*User, error) {
users := make([]*User, len(ids))
g, gctx := errgroup.WithContext(ctx)
for i, id := range ids {
i, id := i, id
g.Go(func() error {
rsp, err := s.userProxy.GetUser(gctx, &pb.GetUserReq{Id: id})
if err != nil { return err }
users[i] = rsp.User
return nil
})
}
if err := g.Wait(); err != nil {
return nil, err
}
return users, nil
}
8.2 限制并发数
sem := make(chan struct{}, 10) // 最多 10 并发
for _, id := range ids {
sem <- struct{}{}
go func(id int64) {
defer func() { <-sem }()
proxy.GetUser(ctx, &pb.GetUserReq{Id: id})
}(id)
}
8.3 部分失败容忍
g, gctx := errgroup.WithContext(ctx)
g.SetLimit(10)
results := make([]*User, len(ids))
for i, id := range ids {
i, id := i, id
g.Go(func() error {
rsp, err := s.userProxy.GetUser(gctx, req)
if err != nil {
log.WithContext(gctx).Warnf("get user %d failed: %v", id, err)
return nil // 容忍单个失败
}
results[i] = rsp.User
return nil
})
}
g.Wait()
九、透传字段(trans-info)
调用时传 metadata:
proxy.Hello(ctx, req,
client.WithMetaData("x-trace-id", []byte("xxxxxx")),
client.WithMetaData("x-channel", []byte("ios")),
)
服务端读取:
msg := trpc.Message(ctx)
md := msg.ServerMetaData()
trace := md["x-trace-id"]
十、Mock 测试
trpc create 自动生成的 helloworld_mock.go 可在测试中替换 proxy:
func TestSvc_DoSomething(t *testing.T) {
ctrl := gomock.NewController(t)
mockUser := pb.NewMockUserClientProxy(ctrl)
mockUser.EXPECT().
GetUser(gomock.Any(), &pb.GetUserReq{Id: 1}).
Return(&pb.GetUserRsp{User: &pb.User{Name: "Tom"}}, nil)
s := NewMyService(mockUser)
rsp, err := s.DoSomething(context.Background(), 1)
assert.NoError(t, err)
assert.Equal(t, "Tom", rsp.Name)
}
十一、调用其他协议(HTTP / gRPC)
11.1 调用 HTTP+JSON
proxy := pb.NewUserClientProxy(
client.WithTarget("ip://api.example.com"),
client.WithProtocol("http"),
client.WithSerializationType(codec.SerializationTypeJSON),
)
11.2 调用 gRPC
proxy := pb.NewUserClientProxy(
client.WithTarget("ip://127.0.0.1:9090"),
client.WithProtocol("grpc"),
)
服务端只要也是 grpc 协议或 gRPC 服务即可互通。
十二、连接池
tRPC 默认按 target 复用连接池,参数可调:
client:
service:
- name: trpc.app.user
conn_type: long # long(默认) / short
max_idle: 100
idle_timeout: 60s
短连接极少使用,除非访问外部不可控网关。
十三、与服务端开发的对照表
| 概念 | 服务端 | 客户端 |
|---|---|---|
| 入口 | s.Serve() |
proxy.Method(ctx, req) |
| Filter | server.filter |
client.filter |
| 错误 | return errs.Error |
errs.Code(err) 解析 |
| 超时 | 配置接收上限 | 配置发起上限 |
| LB | 不关心 | loadbalance 配置 |
十四、客户端的"自我修养"
在生产环境使用 tRPC 客户端,请永远做到:
- ✅ 设置合理超时(永远不要无限等待);
- ✅ 重试只用于幂等接口;
- ✅ 启用 debuglog(开发期)/ 监控 filter(生产期);
- ✅ 不要每次调用都
Newproxy(连接池缓存); - ✅ context 透传贯穿全链路(trace-id、超时、取消信号)。
十五、小结
- ClientProxy 是与服务端约定的契约接口;
- target 用
<schema>://service?args寻址; - 超时三层级:调用 > 配置 > 默认;
- 重试要看错误类型 + 幂等性 + 退避;
- mock 让单测彻底脱离网络。
下一篇是 tRPC-Go 系列收官:生产级实践——配置、日志、监控、可观测性。
- 点赞
- 收藏
- 关注作者
评论(0)