Go + gRPC 实现流模式通信
【摘要】 安装 gRPC 和 Protocol Buffers 编译器# 安装 Protocol Buffers 编译器sudo apt-get install protobuf-compiler# 安装 gRPC Go 插件go get -u google.golang.org/grpcgo get -u google.golang.org/protobuf/cmd/protoc-gen-gogo ...
安装 gRPC 和 Protocol Buffers 编译器
# 安装 Protocol Buffers 编译器
sudo apt-get install protobuf-compiler
# 安装 gRPC Go 插件
go get -u google.golang.org/grpc
go get -u google.golang.org/protobuf/cmd/protoc-gen-go
go get -u google.golang.org/grpc/cmd/protoc-gen-go-grpc
创建一个 service.proto
文件,定义三种流模式的服务:
syntax = "proto3";
package example;
service StreamService {
// 客户端流模式
rpc ClientStream (stream ClientRequest) returns (ClientResponse) {}
// 服务端流模式
rpc ServerStream (ServerRequest) returns (stream ServerResponse) {}
// 双向流模式
rpc BidirectionalStream (stream BidirectionalRequest) returns (stream BidirectionalResponse) {}
}
message ClientRequest {
string message = 1;
}
message ClientResponse {
string message = 1;
}
message ServerRequest {
string message = 1;
}
message ServerResponse {
string message = 1;
}
message BidirectionalRequest {
string message = 1;
}
message BidirectionalResponse {
string message = 1;
}
使用 Protocol Buffers 编译器生成 Go 代码:
protoc --go_out=. --go-grpc_out=. service.proto
创建一个 server.go
文件,实现 gRPC 服务:
package main
import (
"context"
"log"
"net"
"google.golang.org/grpc"
pb "path/to/your/service"
)
type server struct {
pb.UnimplementedStreamServiceServer
}
func (s *server) ClientStream(stream pb.StreamService_ClientStreamServer) error {
var messages []string
for {
req, err := stream.Recv()
if err == io.EOF {
return stream.SendAndClose(&pb.ClientResponse{Message: "Received: " + strings.Join(messages, ", ")})
}
if err != nil {
return err
}
messages = append(messages, req.Message)
}
}
func (s *server) ServerStream(req *pb.ServerRequest, stream pb.StreamService_ServerStreamServer) error {
for i := 0; i < 5; i++ {
if err := stream.Send(&pb.ServerResponse{Message: "Response " + strconv.Itoa(i)}); err != nil {
return err
}
}
return nil
}
func (s *server) BidirectionalStream(stream pb.StreamService_BidirectionalStreamServer) error {
for {
req, err := stream.Recv()
if err == io.EOF {
return nil
}
if err != nil {
return err
}
if err := stream.Send(&pb.BidirectionalResponse{Message: "Echo: " + req.Message}); err != nil {
return err
}
}
}
func main() {
lis, err := net.Listen("tcp", ":50051")
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
s := grpc.NewServer()
pb.RegisterStreamServiceServer(s, &server{})
if err := s.Serve(lis); err != nil {
log.Fatalf("failed to serve: %v", err)
}
}
创建一个 client.go
文件,实现 gRPC 客户端:
package main
import (
"context"
"log"
"time"
"google.golang.org/grpc"
pb "path/to/your/service"
)
func main() {
conn, err := grpc.Dial("localhost:50051", grpc.WithInsecure())
if err != nil {
log.Fatalf("did not connect: %v", err)
}
defer conn.Close()
client := pb.NewStreamServiceClient(conn)
// 客户端流模式
clientStream, err := client.ClientStream(context.Background())
if err != nil {
log.Fatalf("could not create client stream: %v", err)
}
for i := 0; i < 5; i++ {
if err := clientStream.Send(&pb.ClientRequest{Message: "Message " + strconv.Itoa(i)}); err != nil {
log.Fatalf("could not send message: %v", err)
}
}
resp, err := clientStream.CloseAndRecv()
if err != nil {
log.Fatalf("could not receive response: %v", err)
}
log.Printf("ClientStream response: %v", resp.Message)
// 服务端流模式
serverStream, err := client.ServerStream(context.Background(), &pb.ServerRequest{Message: "Hello"})
if err != nil {
log.Fatalf("could not create server stream: %v", err)
}
for {
resp, err := serverStream.Recv()
if err == io.EOF {
break
}
if err != nil {
log.Fatalf("could not receive message: %v", err)
}
log.Printf("ServerStream response: %v", resp.Message)
}
// 双向流模式
bidirectionalStream, err := client.BidirectionalStream(context.Background())
if err != nil {
log.Fatalf("could not create bidirectional stream: %v", err)
}
go func() {
for i := 0; i < 5; i++ {
if err := bidirectionalStream.Send(&pb.BidirectionalRequest{Message: "Message " + strconv.Itoa(i)}); err != nil {
log.Fatalf("could not send message: %v", err)
}
time.Sleep(1 * time.Second)
}
bidirectionalStream.CloseSend()
}()
for {
resp, err := bidirectionalStream.Recv()
if err == io.EOF {
break
}
if err != nil {
log.Fatalf("could not receive message: %v", err)
}
log.Printf("BidirectionalStream response: %v", resp.Message)
}
}
【声明】本内容来自华为云开发者社区博主,不代表华为云及华为云开发者社区的观点和立场。转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息,否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱:
cloudbbs@huaweicloud.com
- 点赞
- 收藏
- 关注作者
评论(0)