在 Golang 的六边形架构中通过优雅关闭来掌握 gRPC 服务器
探索 gRPC——一个高性能的 RPC 框架。
想象一下,有一天你的服务崩溃或突然关闭,以及后果:资源泄漏、交易不完整和整个微服务生态系统的混乱。这个问题的解决方案是什么?
在这篇博客中,我将深入探讨如何使用 Golang 在六边形架构的范围内实现 gRPC 服务器。了解 gRPC 的优点、忽略关闭挂钩的陷阱,以及服务中干净、优雅的关闭艺术。
为什么我们更喜欢 gRPC 进行微服务开发?
有许多关于介绍 gRPC 主题的博客文章,我们可以轻松地搜索和探索它。今天,我不会详细介绍,而只是谈谈它带来的突出优势,这也是我们想要使用它的动力。
微服务开发需要一种在效率、灵活性和可扩展性方面表现出色的通信协议。这就是为什么 gRPC 是我们的首选:
HTTP/2 的效率:利用 HTTP/2 同时多路复用请求,最大限度地减少延迟。
- 紧凑的序列化:采用协议缓冲区实现紧凑和快速的数据序列化。
- 与语言无关:支持多种编程语言,提供实现的灵活性。
- 双向流式处理:通过双向流式处理支持促进实时通信。
- 自动代码生成:通过自动生成各种语言的 API 代码来简化开发。
- 自记录 API:使用自记录 gRPC API 确保清晰的文档。
- 强类型和代码生成:通过强类型和自动代码生成减少集成错误。
- 互操作性和生态系统:利用丰富的生态系统,与各种工具和技术无缝集成。
在为微服务模型选择了合适的框架之后,我们继续讨论今天博客文章的重要部分,即如何解决优雅关闭服务的问题。
为什么正常关闭服务很重要?
突然停止服务可能会导致一系列问题,例如将事情做成一半,浪费资源,甚至可能弄乱一些数据。平稳关闭可确保服务完成其工作,归还借用的内容,并且离开时不会给整个系统造成混乱。
防止数据损坏
突然终止服务可能会导致事务不完整,从而可能导致数据损坏。正常关机可确保正在进行的进程正确完成,从而保持数据的完整性。
避免资源泄漏
如果在突然关闭期间未能释放获取的资源,可能会导致资源泄漏。这可能会占用本应用于其他任务(如管理服务连接、数据库连接等)的资源,从而影响系统的整体性能。
保持通信完整性
微服务通常依赖于无缝通信。突然关闭可能会使连接挂起,从而中断服务之间的信息流。正常关机有助于避免通信中断,并确保整个系统的稳定性。
完成正在进行的任务
启动关闭时,服务可能正处于重要任务的中间。正常退出允许这些任务完成,防止任何可能导致错误的剩余未完成操作。
确保可预测的系统行为
可预测性是微服务的关键。正常关闭提供可预测的退出策略,允许其他服务等待并适应更改。这有助于保持整体系统稳定性。
最大限度减少停机影响
在需要重启或更新服务的场景中,正常关闭可最大程度地减少停机影响。服务可以正常脱机并恢复,而不会对整个系统造成中断。
以上几个方面足以证明,优雅地关闭服务非常重要。那么,如何以正确的方式关闭gRPC服务呢?在本博客的下一部分中,我们将介绍一些简单的策略和代码示例,以展示如何在不引起任何问题的情况下正常关闭实现 gRPC 服务。
在六边形体系结构中实现 gRPC 服务器
在下面的文件中,我添加了文件夹以明确 http 和 gRPC 协议的结构。
├── Dockerfile
├── api
│ ├── buf.yaml
│ ├── user_service.pb.go
│ ├── user_service.proto
│ └── user_service_grpc.pb.go
├── buf.gen.yaml
├── buf.work.yaml
├── build_proto.sh
├── cmd
│ ├── grpc
│ │ └── runner.go
│ └── http
│ └── runner.go
├── conf
│ └── app.yaml
├── go.mod
├── go.sum
├── internal
│ ├── controller
│ │ ├── grpc
│ │ │ └── controller.go
│ │ └── http
│ │ └── controller.go
│ ├── core
│ │ ├── common
│ │ │ ├── router
│ │ │ │ └── router.go
│ │ │ └── utils
│ │ │ ├── converter.go
│ │ │ ├── datetime.go
│ │ │ └── logger.go
│ │ ├── config
│ │ │ └── config.go
│ │ ├── dto
│ │ │ └── user.go
│ │ ├── entity
│ │ │ └── error_code
│ │ │ └── error_code.go
│ │ ├── model
│ │ │ ├── request
│ │ │ │ └── request.go
│ │ │ └── response
│ │ │ ├── response.go
│ │ │ └── sign_up.go
│ │ ├── port
│ │ │ ├── repository
│ │ │ │ ├── db.go
│ │ │ │ └── user.go
│ │ │ └── service
│ │ │ └── user.go
│ │ ├── server
│ │ │ ├── grpc
│ │ │ │ ├── grpc_server.go
│ │ │ └── http
│ │ │ └── http_server.go
│ │ └── service
│ │ ├── user.go
│ │ └── user_test.go
│ └── infra
│ ├── config
│ │ └── config.go
│ └── repository
│ ├── db.go
│ └── user.go
├── schema
│ └── schema.sql
└── script
└── run.sh
为了展示 Hexagonal 架构的灵活性并适应变化,我们只需要编写一个额外的 gRPC 控制器(主适配器)和 gRPC 服务器(核心)来处理来自客户端的请求。无需修改核心服务和基础结构。
首先,我们开始实现 gRPC 服务器。
构建 gRPC 服务器
- Prepare Protobuf file 准备 Protobuf 文件
若要设置 gRPC 服务器,必须为我们的服务创建一个 Protobuf 文件。这些服务的设计在下面的 Protobuf 文件中进行了概述:
// ./api/user_service.proto
syntax = "proto3";
package protobuf.user.service;
option go_package = "user/proto";
enum ErrorCode {
EC_UNKNOWN = 0;
SUCCESS = 1;
INVALID_REQUEST = 2;
DUPLICATE_USER = 3;
}
message SignUpRequest{
string user_name = 1;
string password = 2;
}
message SignUpResponse{
bool status = 1;
ErrorCode error_code = 2;
string error_message = 3;
string display_name = 4;
}
service UserService{
rpc SignUp(SignUpRequest) returns (SignUpResponse);
}
我使用 Buf 将 Protobuf 文件生成为 Golang 代码文件。我们运行命令从上面的 proto 文件生成到 Golang 代码后,我们就有了代码文件:
- user_service.pb.go:包含生成的消息代码,在 protobuf 文件中定义的枚举,例如 SignUpRequest、SignUpResponse、ErrorCode。
// ./api/user_service.pb.go
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
// protoc-gen-go v1.28.1
// protoc (unknown)
// source: user_service.proto
package proto
import (
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
protoimpl "google.golang.org/protobuf/runtime/protoimpl"
reflect "reflect"
sync "sync"
)
const (
// Verify that this generated code is sufficiently up-to-date.
_ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion)
// Verify that runtime/protoimpl is sufficiently up-to-date.
_ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20)
)
type ErrorCode int32
const (
ErrorCode_EC_UNKNOWN ErrorCode = 0
ErrorCode_SUCCESS ErrorCode = 1
ErrorCode_INVALID_REQUEST ErrorCode = 2
ErrorCode_DUPLICATE_USER ErrorCode = 3
)
..... Detail code in my github repo
- user_service_grpc.pb.go:包含 gRPC 服务器和客户端生成的代码,例如 UserServiceClient、UserServiceServer。
// ./api/user_service_grpc.pb.go
// Code generated by protoc-gen-go-grpc. DO NOT EDIT.
// Code generated by protoc-gen-go-grpc. DO NOT EDIT.
// versions:
// - protoc-gen-go-grpc v1.2.0
// - protoc (unknown)
// source: user_service.proto
package proto
import (
context "context"
grpc "google.golang.org/grpc"
codes "google.golang.org/grpc/codes"
status "google.golang.org/grpc/status"
)
// This is a compile-time assertion to ensure that this generated file
// is compatible with the grpc package it is being compiled against.
// Requires gRPC-Go v1.32.0 or later.
const _ = grpc.SupportPackageIsVersion7
// UserServiceClient is the client API for UserService service.
//
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream.
type UserServiceClient interface {
SignUp(ctx context.Context, in *SignUpRequest, opts ...grpc.CallOption) (*SignUpResponse, error)
}
..... Detail code in my github repo
- gRPC 服务器实施指南
// ./internal/core/server/grpc/grpc_server.go
import (
"io"
...
)
type GRPCServer interface {
Start(serviceRegister func(server *grpc.Server))
io.Closer
}
type gRPCServer struct {
grpcServer *grpc.Server
config config.GrpcServerConfig
}
GRPCServer 接口概述了 gRPC 服务器所需的基本方法。启动功能,用于启动具有服务注册功能的服务器。此外,GRPCServer 接口已扩展为嵌入 io.Closer 接口,指示 gRPC 服务器现在应遵循 Close 方法。
gRPCServer 结构封装了 gRPC 服务器实例和配置详细信息。它充当 GRPCServer 接口的实现。
// ./internal/core/server/grpc/grpc_server.go
func NewGrpcServer(config config.GrpcServerConfig) (GRPCServer, error) {
options, err := buildOptions(config)
if err != nil {
return nil, err
}
server := grpc.NewServer(options...)
return &gRPCServer{
config: config,
grpcServer: server,
}, err
}
此函数创建 gRPC 服务器的新实例,并使用从 buildOptions 函数获取的配置和选项对其进行初始化。
// ./internal/core/server/grpc/grpc_server.go
func buildOptions(config config.GrpcServerConfig) ([]grpc.ServerOption, error) {
return []grpc.ServerOption{
grpc.KeepaliveParams(buildKeepaliveParams(config.KeepaliveParams)),
grpc.KeepaliveEnforcementPolicy(buildKeepalivePolicy(config.KeepalivePolicy)),
}, nil
}
buildOptions 函数根据提供的配置构造 gRPC 服务器选项。ServerOption 提供了许多其他选项,例如 keepalive、TLS、拦截器、缓冲区读/写等,我们将在下一篇博客中讨论它们。
buildKeepalivePolicy 和 buildKeepaliveParams 定义 gRPC 服务器中 keepalive 功能的行为。让我们分解每个函数:
// ./internal/core/server/grpc/grpc_server.go
func buildKeepalivePolicy(config keepalive.EnforcementPolicy) keepalive.EnforcementPolicy {
return keepalive.EnforcementPolicy{
MinTime: config.MinTime * time.Second,
PermitWithoutStream: config.PermitWithoutStream,
}
}
// in package keepalive file of grpc bibrary
// EnforcementPolicy is used to set keepalive enforcement policy on the
// server-side. Server will close connection with a client that violates this
// policy.
type EnforcementPolicy struct {
// MinTime is the minimum amount of time a client should wait before sending
// a keepalive ping.
MinTime time.Duration // The current default value is 5 minutes.
// If true, server allows keepalive pings even when there are no active
// streams(RPCs). If false, and client sends ping when there are no active
// streams, server will send GOAWAY and close the connection.
PermitWithoutStream bool // false by default.
}
// ./internal/core/server/grpc/grpc_server.go
func buildKeepaliveParams(config keepalive.ServerParameters) keepalive.ServerParameters {
return keepalive.ServerParameters{
MaxConnectionIdle: config.MaxConnectionIdle * time.Second,
MaxConnectionAge: config.MaxConnectionAge * time.Second,
MaxConnectionAgeGrace: config.MaxConnectionAgeGrace * time.Second,
Time: config.Time * time.Second,
Timeout: config.Timeout * time.Second,
}
}
// in package keepalive file of grpc bibrary
// ServerParameters is used to set keepalive and max-age parameters on the
// server-side.
type ServerParameters struct {
// MaxConnectionIdle is a duration for the amount of time after which an
// idle connection would be closed by sending a GoAway. Idleness duration is
// defined since the most recent time the number of outstanding RPCs became
// zero or the connection establishment.
MaxConnectionIdle time.Duration // The current default value is infinity.
// MaxConnectionAge is a duration for the maximum amount of time a
// connection may exist before it will be closed by sending a GoAway. A
// random jitter of +/-10% will be added to MaxConnectionAge to spread out
// connection storms.
MaxConnectionAge time.Duration // The current default value is infinity.
// MaxConnectionAgeGrace is an additive period after MaxConnectionAge after
// which the connection will be forcibly closed.
MaxConnectionAgeGrace time.Duration // The current default value is infinity.
// After a duration of this time if the server doesn't see any activity it
// pings the client to see if the transport is still alive.
// If set below 1s, a minimum value of 1s will be used instead.
Time time.Duration // The current default value is 2 hours.
// After having pinged for keepalive check, the server waits for a duration
// of Timeout and if no activity is seen even after that the connection is
// closed.
Timeout time.Duration // The current default value is 20 seconds.
}
Start 方法通过创建 TCP 侦听器、注册服务并开始为传入请求提供服务来初始化服务器。
// ./internal/core/server/grpc/grpc_server.go
func (g gRPCServer) Start(serviceRegister func(server *grpc.Server)) {
grpcListener, err := net.Listen("tcp", ":"+strconv.Itoa(int(g.config.Port)))
if err != nil {
zap.L().Fatal("failed to start grpc server", zap.Any("err", err))
}
serviceRegister(g.grpcServer)
zap.L().Info("start grpc server success ", zap.Any("endpoint", grpcListener.Addr()))
if err := g.grpcServer.Serve(grpcListener); err != nil {
zap.L().Fatal("failed to grpc server serve", zap.Any("err", err))
}
}
Stop 方法触发 gRPC 服务器的正常关闭,确保在终止之前完成正在进行的请求。它还会阻止服务器在此过程中接受新的连接或请求。
// ./internal/core/server/grpc/grpc_server.go
func (g gRPCServer) Close() error {
g.grpcServer.GracefulStop()
return nil
}
在上面的代码中,我利用 Uber-go Zap 库进行服务日志记录。我将在下一篇博客,我探讨该库的功能和用法,增强 gRPC 服务器的日志记录功能。
现在,让我们继续讨论本文中的另一个重要主题:实现正常关闭。
构建正常关机
我们生成了 AddShutdownHook 函数来处理服务的正常关闭。下面是代码的细分:
// ./internal/core/server/shutdown_hook.go
func AddShutdownHook(closers ...io.Closer) {
zap.L().Info("listening signals...")
c := make(chan os.Signal, 1)
signal.Notify(
c, os.Interrupt, syscall.SIGHUP, syscall.SIGINT, syscall.SIGQUIT, syscall.SIGTERM,
)
<-c
zap.L().Info("graceful shutdown...")
for _, closer := range closers {
if err := closer.Close(); err != nil {
zap.L().Error("failed to stop closer", zap.Any("err", err))
}
}
zap.L().Info("completed graceful shutdown")
if err := zap.L().Sync(); err != nil {
if !errors.Is(err, syscall.ENOTTY) {
log.Printf("failed to flush logger err=%v\n", err)
}
}
}
以下是对关键组件的分析:
信号处理:
- 该函数首先创建一个通道 (c) 来接收信号。
- 它注册通道以侦听特定信号(例如,os.中断,系统调用。SIGINT) 使用信号。通知功能。
我们监听信号:
- 操作系统。Interrup(或系统调用。SIGINT):中断进程的信号,通常由终端中的 Ctrl+C 触发。
- 系统调用。SIGHUP:历史上在终端关闭时使用,现在通常用于重新加载配置。
- 系统调用。SIGINT:类似于 os。中断 ,当用户使用 Ctrl+C 中断进程时,操作系统发送。
- 系统调用。SIGQUIT:用于在终止之前创建核心转储的信号,由 Ctrl+\ 触发。
- 系统调用。SIGTERM:通用终止信号,允许进程在关闭之前执行清理。
正常关机触发:
- 该功能等待通道 (<-c) 上接收到信号,然后继续正常关断。
关闭资源:
- 这是重要的一步。然后,它遍历提供的 io.关闭实例(关闭器)并调用其 Close 方法。
记录器同步:
- 最后,它会尝试同步记录器,以确保在程序终止之前刷新所有日志。
- 当然,我们负责实现 io。更紧密的界面,用于释放服务器、数据库连接、计划/后台作业、缓存、队列、文件以及外部/内部客户端的连接、临时数据、内存等资源。这确保了适当的清理和资源释放,有助于实现管理良好且高效的应用程序生命周期。在此示例中,我提供了 gRPC 服务器关闭和数据库关闭机制。
构建 grpc 控制器
- 与 HTTP 控制器类似,实现 gRPC 控制器(主适配器)对于处理传入的 gRPC 请求至关重要。在提供的 Go 代码中,userController 充当管理与用户相关的 gRPC 操作的主要适配器。此控制器有助于 gRPC 请求和内部表示形式之间的转换。
// ./internal/controller/grpc/controller.go
package grpc
import (
"context"
proto "user-service/api"
"user-service/internal/core/entity/error_code"
"user-service/internal/core/model/request"
"user-service/internal/core/model/response"
"user-service/internal/core/port/service"
)
var errorCodeMapper = map[error_code.ErrorCode]proto.ErrorCode{
error_code.Success: proto.ErrorCode_SUCCESS,
error_code.InternalError: proto.ErrorCode_EC_UNKNOWN,
error_code.InvalidRequest: proto.ErrorCode_INVALID_REQUEST,
error_code.DuplicateUser: proto.ErrorCode_DUPLICATE_USER,
}
type userController struct {
userService service.UserService
}
func NewUserController(userService service.UserService) proto.UserServiceServer {
return &userController{
userService: userService,
}
}
func (u userController) SignUp(
ctx context.Context, request *proto.SignUpRequest,
) (*proto.SignUpResponse, error) {
resp := u.userService.SignUp(u.newSignUpRequest(request))
return u.newSignUpResponse(resp)
}
func (u userController) newSignUpRequest(protoRequest *proto.SignUpRequest) *request.SignUpRequest {
return &request.SignUpRequest{
Username: protoRequest.GetUserName(),
Password: protoRequest.GetPassword(),
}
}
func (u userController) newSignUpResponse(resp *response.Response) (
*proto.SignUpResponse, error,
) {
if !resp.Status {
return &proto.SignUpResponse{
Status: resp.Status,
ErrorCode: u.mapErrorCode(resp.ErrorCode),
ErrorMessage: resp.ErrorMessage,
}, nil
}
data := resp.Data.(response.SignUpDataResponse)
return &proto.SignUpResponse{
Status: resp.Status,
ErrorCode: u.mapErrorCode(resp.ErrorCode),
ErrorMessage: resp.ErrorMessage,
DisplayName: data.DisplayName,
}, nil
}
func (u userController) mapErrorCode(errCode error_code.ErrorCode) proto.ErrorCode {
code, existed := errorCodeMapper[errCode]
if existed {
return code
}
return proto.ErrorCode_EC_UNKNOWN
}
我们需要实现一些组件:
- 内部错误代码映射到特定于 gRPC 的代码。
- 实现 proto。UserServiceServer。
- 注册函数:处理用户注册 gRPC 请求。
- 在 gRPC 和核心服务格式之间进行转换。
构建运行器
- 最后,此主函数启动 gRPC 服务器,并使用 AddShutdownHook 函数进行正常关闭。
// ./cmd/grpc/runner.go
package main
import (
"log"
"go.uber.org/zap"
googleGrpc "google.golang.org/grpc"
"google.golang.org/grpc/keepalive"
proto "user-service/api"
grpcCtrl "user-service/internal/controller/grpc"
"user-service/internal/core/config"
"user-service/internal/core/server"
"user-service/internal/core/server/grpc"
"user-service/internal/core/service"
infraConf "user-service/internal/infra/config"
"user-service/internal/infra/repository"
)
func main() {
// Initialize logger
logger, _ := zap.NewProduction()
undo := zap.ReplaceGlobals(logger)
defer undo()
// Initialize the database connection
db, err := repository.NewDB(
infraConf.DatabaseConfig{
Driver: "mysql",
Url: "user:password@tcp(127.0.0.1:3306)/your_database_name?charset=utf8mb4&parseTime=true&loc=UTC&tls=false&readTimeout=3s&writeTimeout=3s&timeout=3s&clientFoundRows=true",
ConnMaxLifetimeInMinute: 3,
MaxOpenConns: 10,
MaxIdleConns: 1,
},
)
if err != nil {
log.Fatalf("failed to new database err=%s\n", err.Error())
}
// Create the UserRepository
userRepo := repository.NewUserRepository(db)
// Create the UserService
userService := service.NewUserService(userRepo)
// Create the UserController
userController := grpcCtrl.NewUserController(userService)
// Create the gRPC server
grpcServer, err := grpc.NewGrpcServer(
config.GrpcServerConfig{
Port: 9090,
KeepaliveParams: keepalive.ServerParameters{
MaxConnectionIdle: 100,
MaxConnectionAge: 7200,
MaxConnectionAgeGrace: 60,
Time: 10,
Timeout: 3,
},
KeepalivePolicy: keepalive.EnforcementPolicy{
MinTime: 10,
PermitWithoutStream: true,
},
},
)
if err != nil {
log.Fatalf("failed to new grpc server err=%s\n", err.Error())
}
// Start the gRPC server
go grpcServer.Start(
func(server *googleGrpc.Server) {
proto.RegisterUserServiceServer(server, userController)
},
)
// Add shutdown hook to trigger closer resources of service
server.AddShutdownHook(grpcServer, db)
}
与上一篇博客中的 HTTP 运行器相比,gRPC 的实现涉及其他步骤。其中包括初始化 Zap 记录器、设置 gRPC 服务器、注册 userController 以及使用 AddShutdownHook 函数。在提供的示例中,我们确保正确释放 gRPC 服务器和数据库的资源。
我们提到了数据库的关闭功能。
关闭数据库可防止来自服务的新查询或连接。此外,它还确保允许在关闭之前完成服务器上已开始处理的所有查询。
package sql
// Close closes the database and prevents new queries from starting.
// Close then waits for all queries that have started processing on the server
// to finish.
//
// It is rare to Close a DB, as the DB handle is meant to be
// long-lived and shared between many goroutines.
func (db *DB) Close() error {
....
}
确保在关闭后正确释放服务中组件的资源不仅是必要的,而且对于整个系统的运行状况和可靠性也至关重要。这种做法有助于优雅地处理关闭程序,防止潜在问题并确保整个系统的运行生命周期更加顺畅。
总结
在本文中,我们深入探讨了 gRPC 服务器的实现,提供了有关实现正常关闭的包容性指南。我们探讨了利用 gRPC 服务器的优势以及实现优雅关闭机制的重要性。此外,我提供了 gRPC 服务和关闭钩子机制的代码示例,可以在我的 github 中完整代码源。这种方法不仅提高了服务器的运行效率,而且确保了平稳可靠的关机过程,有助于系统的整体稳健性。
- 点赞
- 收藏
- 关注作者
评论(0)