go语言 grpc 拦截器
拦截器
gRPC拦截器(interceptor)是一种函数,它可以在gRPC调用之前和之后执行一些逻辑,例如认证、授权、日志记录、监控和统计等。拦截器函数是gRPC中非常重要的概念,它允许我们在服务端和客户端添加自定义逻辑,以满足业务需求和运维需求。
在gRPC中,拦截器函数通常通过实现grpc.UnaryServerInterceptor和grpc.StreamServerInterceptor接口来定义。UnaryServerInterceptor用于拦截一元RPC请求,而StreamServerInterceptor用于拦截流式RPC请求。在客户端中,我们可以使用grpc.UnaryClientInterceptor和grpc.StreamClientInterceptor来拦截gRPC调用。
在gRPC中,拦截器函数可以被链接起来,形成一个拦截器链。在这个拦截器链中,每个拦截器函数都可以处理请求并将其转发给下一个拦截器函数,或者直接返回响应。因此,我们可以在拦截器函数中编写不同的逻辑,例如实现认证、授权、监控和统计等。
以下是一些常见的gRPC拦截器:
- 认证和授权拦截器:用于对gRPC调用进行身份验证和权限控制,例如检查token、验证用户名和密码、检查访问控制列表等;
- 日志记录拦截器:用于记录gRPC调用的日志,例如记录请求的方法、参数、响应状态等;
- 监控和统计拦截器:用于监控gRPC调用的性能和吞吐量,例如记录调用次数、响应时间、错误率等;
- 缓存拦截器:用于在服务端或客户端缓存一些数据,例如缓存计算结果、缓存数据库查询结果等。
服务端拦截器
一元拦截器
package main
import (
"context"
"flag"
"log"
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
pb "mygrpc/proto/hello" // 引入编译生成的包
)
const (
defaultName = "world"
)
var (
addr = flag.String("addr", "localhost:50051", "the address to connect to")
name = flag.String("name", defaultName, "Name to greet")
)
func main() {
flag.Parse()
// 与服务建立连接.
conn, err := grpc.Dial(*addr, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
log.Fatalf("did not connect: %v", err)
}
defer conn.Close()
// 创建指定服务的客户端
c := pb.NewGreeterClient(conn)
// 连接服务器并打印出其响应。
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
// 调用指定方法
r, err := c.SayHello(ctx, &pb.HelloRequest{Name: *name})
if err != nil {
log.Fatalf("could not greet: %v", err)
}
log.Printf("Greeting: %s", r.GetMessage())
}
结果
2023/12/07 14:52:55 ======= [Server Interceptor] /hello.Greeter/SayHello
2023/12/07 14:52:55 Pre Proc Message : name:"world"
2023/12/07 14:52:55 Received: world
2023/12/07 14:52:55 Post Proc Message : message:"Hello world"
流拦截器
流式拦截器需要对grpc.ServerStream进行包装,重新实现RecvMsg和SendMsg方法。
func (s *server) SearchOrders(req *pb.HelloRequest, stream pb.Greeter_SearchOrdersServer) error {
log.Printf("Recved %v", req.GetName())
// 具体返回多少个response根据业务逻辑调整
for i := 0; i < 10; i++ {
// 通过 send 方法不断推送数据
err := stream.Send(&pb.HelloReply{})
if err != nil {
log.Fatalf("Send error:%v", err)
return err
}
}
return nil
}
type wrappedStream struct {
// 包装器流
grpc.ServerStream
}
// 接受信息拦截器
func (w *wrappedStream) RecvMsg(m interface{}) error {
log.Printf("====== [Server Stream Interceptor Wrapper] Receive a message (Type: %T) at %s", m, time.Now().Format(time.RFC3339))
return w.ServerStream.RecvMsg(m)
}
// 发送消息拦截器
func (w *wrappedStream) SendMsg(m interface{}) error {
log.Printf("====== [Server Stream Interceptor Wrapper] Send a message (Type: %T) at %v", m, time.Now().Format(time.RFC3339))
return w.ServerStream.SendMsg(m)
}
func newWrappedStream(s grpc.ServerStream) grpc.ServerStream {
return &wrappedStream{s}
}
func orderServerStreamInterceptor(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
// 前置处理
log.Println("====== [Server Stream Interceptor] ", info.FullMethod)
// 包装器流调用 流RPC
err := handler(srv, newWrappedStream(ss))
if err != nil {
log.Printf("RPC failed with error %v", err)
}
return err
}
func main() {
flag.Parse()
lis, err := net.Listen("tcp", fmt.Sprintf(":%d", *port))
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
// 开启rpc
s := grpc.NewServer(grpc.StreamInterceptor(orderServerStreamInterceptor))
// 注册服务
pb.RegisterGreeterServer(s, &server{})
log.Printf("service listening at %v", lis.Addr())
if err := s.Serve(lis); err != nil {
log.Fatalf("failed to serve: %v", err)
}
}
结果
GOROOT=D:\software\Go #gosetup
GOPATH=D:\software\golibrary #gosetup
D:\software\Go\bin\go.exe build -o C:\Users\29071\AppData\Local\JetBrains\GoLand2023.3\tmp\GoLand\___go_build_mygrpc_service_steamInterceptorservice.exe mygrpc/service/steamInterceptorservice #gosetup
C:\Users\29071\AppData\Local\JetBrains\GoLand2023.3\tmp\GoLand\___go_build_mygrpc_service_steamInterceptorservice.exe
2023/12/07 15:07:48 service listening at [::]:50051
2023/12/07 15:08:07 ====== [Server Stream Interceptor] /hello.Greeter/searchOrders
2023/12/07 15:08:07 ====== [Server Stream Interceptor Wrapper] Receive a message (Type: *hello.HelloRequest) at 2023-12-07T15:08:07+08:00
2023/12/07 15:08:07 Recved 开始服务端rpc流测试
2023/12/07 15:08:07 ====== [Server Stream Interceptor Wrapper] Send a message (Type: *hello.HelloReply) at 2023-12-07T15:08:07+08:00
2023/12/07 15:08:07 ====== [Server Stream Interceptor Wrapper] Send a message (Type: *hello.HelloReply) at 2023-12-07T15:08:07+08:00
2023/12/07 15:08:07 ====== [Server Stream Interceptor Wrapper] Send a message (Type: *hello.HelloReply) at 2023-12-07T15:08:07+08:00
2023/12/07 15:08:07 ====== [Server Stream Interceptor Wrapper] Send a message (Type: *hello.HelloReply) at 2023-12-07T15:08:07+08:00
2023/12/07 15:08:07 ====== [Server Stream Interceptor Wrapper] Send a message (Type: *hello.HelloReply) at 2023-12-07T15:08:07+08:00
2023/12/07 15:08:07 ====== [Server Stream Interceptor Wrapper] Send a message (Type: *hello.HelloReply) at 2023-12-07T15:08:07+08:00
2023/12/07 15:08:07 ====== [Server Stream Interceptor Wrapper] Send a message (Type: *hello.HelloReply) at 2023-12-07T15:08:07+08:00
2023/12/07 15:08:07 ====== [Server Stream Interceptor Wrapper] Send a message (Type: *hello.HelloReply) at 2023-12-07T15:08:07+08:00
2023/12/07 15:08:07 ====== [Server Stream Interceptor Wrapper] Send a message (Type: *hello.HelloReply) at 2023-12-07T15:08:07+08:00
2023/12/07 15:08:07 ====== [Server Stream Interceptor Wrapper] Send a message (Type: *hello.HelloReply) at 2023-12-07T15:08:07+08:00
Process finished with the exit code -1073741510 (0xC000013A: interrupted by Ctrl+C)
客户端拦截器
一元拦截器
func orderUnaryClientInterceptor(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
// 前置处理逻辑
log.Println("Method : " + method)
// 调用invoker 执行远程方法
err := invoker(ctx, method, req, reply, cc, opts...)
// 后置处理逻辑
log.Println(reply)
return err
}
func main() {
flag.Parse()
// 与服务建立连接.
conn, err := grpc.Dial(*addr, grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithUnaryInterceptor(orderUnaryClientInterceptor)) //添加拦截器
if err != nil {
log.Fatalf("did not connect: %v", err)
}
defer conn.Close()
// 创建指定服务的客户端
c := pb.NewGreeterClient(conn)
// 连接服务器并打印出其响应。
ctx, cancel := context.WithTimeout(context.Background(), time.Second) // 设置超时时间为一秒
defer cancel()
// 调用指定方法
r, err := c.SayHello(ctx, &pb.HelloRequest{Name: *name})
if err != nil {
log.Fatalf("could not greet: %v", err)
}
log.Printf("Greeting: %s", r.GetMessage())
}
结果
2023/12/07 16:37:28 Method : /hello.Greeter/SayHello
2023/12/07 16:37:28 message:"Hello world"
2023/12/07 16:37:28 Greeting: Hello worl
流拦截`
type wrappedStream struct {
grpc.ClientStream
}
func (w *wrappedStream) RecvMsg(m interface{}) error {
log.Printf("====== [Client Stream Interceptor] Receive a message (Type: %T) at %v", m, time.Now().Format(time.RFC3339))
return w.ClientStream.RecvMsg(m)
}
func (w *wrappedStream) SendMsg(m interface{}) error {
log.Printf("====== [Client Stream Interceptor] Send a message (Type: %T) at %v", m, time.Now().Format(time.RFC3339))
return w.ClientStream.SendMsg(m)
}
func newWrappedStream(s grpc.ClientStream) grpc.ClientStream {
return &wrappedStream{s}
}
func clientStreamInterceptor(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
// 前置处理逻辑
log.Println("======= [Client Interceptor] ", method)
// 调用streamer 来获取客户端流
s, err := streamer(ctx, desc, cc, method, opts...)
if err != nil {
return nil, err
}
return newWrappedStream(s), nil
}
func main(){
// 注册拦截器到客户端流
conn,err:=grpc.Dial(address,grpc.WithInsecure(),grpc.WithStreamInterceptor(clientStreamInterceptor))
if err != nil {
log.Fatalf("did not connect: %v", err)
}
defer conn.Close()
c := pb.NewOrderManagementClient(conn)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
// 调用客户端流RPC方法
searchStream, _ := c.SearchOrders(ctx, &wrapper.StringValue{Value: "Google"})
for {
searchOrder, err := searchStream.Recv()
if err == io.EOF {
log.Print("EOF")
break
}
if err == nil {
log.Print("Search Result : ", searchOrder)
}
}
}
结果
2023/12/07 17:10:43 ====== [Client Stream Interceptor] Send a message (Type: *hello.HelloRequest) at 2023-12-07T17:10:43+08:00
2023/12/07 17:10:43 ====== [Client Stream Interceptor] Send a message (Type: *hello.HelloRequest) at 2023-12-07T17:10:43+08:00
2023/12/07 17:10:43 客户端流传输结束
多个拦截器
在grpc中默认的拦截器不可以传多个,因为在源码中,存在一些问题
func chainUnaryClientInterceptors(cc *ClientConn) {
interceptors := cc.dopts.chainUnaryInts
if cc.dopts.unaryInt != nil {
interceptors = append([]UnaryClientInterceptor{cc.dopts.unaryInt}, interceptors...)
}
var chainedInt UnaryClientInterceptor
if len(interceptors) == 0 {
chainedInt = nil
} else if len(interceptors) == 1 {
chainedInt = interceptors[0]
} else {
chainedInt = func(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, invoker UnaryInvoker, opts ...CallOption) error {
return interceptors[0](ctx, method, req, reply, cc, getChainUnaryInvoker(interceptors, 0, invoker), opts...)
}
}
cc.dopts.unaryInt = chainedInt
}
当存在多个拦截器时,取的就是第一个拦截器。因此结论是允许传多个,但并没有用。
如果真的需要多个拦截器,可以使用 go-grpc-middleware 提供的 grpc.UnaryInterceptor 和 grpc.StreamInterceptor 链式方法。核心方法如下
func ChainUnaryClient(interceptors ...grpc.UnaryClientInterceptor) grpc.UnaryClientInterceptor {
n := len(interceptors)
if n > 1 {
lastI := n - 1
return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
var (
chainHandler grpc.UnaryInvoker
curI int
)
chainHandler = func(currentCtx context.Context, currentMethod string, currentReq, currentRepl interface{}, currentConn *grpc.ClientConn, currentOpts ...grpc.CallOption) error {
if curI == lastI {
return invoker(currentCtx, currentMethod, currentReq, currentRepl, currentConn, currentOpts...)
}
curI++
err := interceptors[curI](currentCtx, currentMethod, currentReq, currentRepl, currentConn, chainHandler, currentOpts...)
curI--
return err
}
return interceptors[0](ctx, method, req, reply, cc, chainHandler, opts...)
}
}
...
}
代码仓库
https://github.com/onenewcode/mygrpc.git
也可以直接下载绑定的资源。
- 点赞
- 收藏
- 关注作者
评论(0)