Go + gRPC 实现流模式通信

举报
yd_254179665 发表于 2025/07/22 15:12:32 2025/07/22
【摘要】 安装 gRPC 和 Protocol Buffers 编译器# 安装 Protocol Buffers 编译器sudo apt-get install protobuf-compiler# 安装 gRPC Go 插件go get -u google.golang.org/grpcgo get -u google.golang.org/protobuf/cmd/protoc-gen-gogo ...

安装 gRPC 和 Protocol Buffers 编译器

# 安装 Protocol Buffers 编译器
sudo apt-get install protobuf-compiler

# 安装 gRPC Go 插件
go get -u google.golang.org/grpc
go get -u google.golang.org/protobuf/cmd/protoc-gen-go
go get -u google.golang.org/grpc/cmd/protoc-gen-go-grpc

创建一个 service.proto 文件,定义三种流模式的服务:

syntax = "proto3";

package example;

service StreamService {
  // 客户端流模式
  rpc ClientStream (stream ClientRequest) returns (ClientResponse) {}

  // 服务端流模式
  rpc ServerStream (ServerRequest) returns (stream ServerResponse) {}

  // 双向流模式
  rpc BidirectionalStream (stream BidirectionalRequest) returns (stream BidirectionalResponse) {}
}

message ClientRequest {
  string message = 1;
}

message ClientResponse {
  string message = 1;
}

message ServerRequest {
  string message = 1;
}

message ServerResponse {
  string message = 1;
}

message BidirectionalRequest {
  string message = 1;
}

message BidirectionalResponse {
  string message = 1;
}

使用 Protocol Buffers 编译器生成 Go 代码:

protoc --go_out=. --go-grpc_out=. service.proto

创建一个 server.go 文件,实现 gRPC 服务:

package main

import (
	"context"
	"log"
	"net"

	"google.golang.org/grpc"
	pb "path/to/your/service"
)

type server struct {
	pb.UnimplementedStreamServiceServer
}

func (s *server) ClientStream(stream pb.StreamService_ClientStreamServer) error {
	var messages []string
	for {
		req, err := stream.Recv()
		if err == io.EOF {
			return stream.SendAndClose(&pb.ClientResponse{Message: "Received: " + strings.Join(messages, ", ")})
		}
		if err != nil {
			return err
		}
		messages = append(messages, req.Message)
	}
}

func (s *server) ServerStream(req *pb.ServerRequest, stream pb.StreamService_ServerStreamServer) error {
	for i := 0; i < 5; i++ {
		if err := stream.Send(&pb.ServerResponse{Message: "Response " + strconv.Itoa(i)}); err != nil {
			return err
		}
	}
	return nil
}

func (s *server) BidirectionalStream(stream pb.StreamService_BidirectionalStreamServer) error {
	for {
		req, err := stream.Recv()
		if err == io.EOF {
			return nil
		}
		if err != nil {
			return err
		}
		if err := stream.Send(&pb.BidirectionalResponse{Message: "Echo: " + req.Message}); err != nil {
			return err
		}
	}
}

func main() {
	lis, err := net.Listen("tcp", ":50051")
	if err != nil {
		log.Fatalf("failed to listen: %v", err)
	}
	s := grpc.NewServer()
	pb.RegisterStreamServiceServer(s, &server{})
	if err := s.Serve(lis); err != nil {
		log.Fatalf("failed to serve: %v", err)
	}
}

创建一个 client.go 文件,实现 gRPC 客户端:

package main

import (
	"context"
	"log"
	"time"

	"google.golang.org/grpc"
	pb "path/to/your/service"
)

func main() {
	conn, err := grpc.Dial("localhost:50051", grpc.WithInsecure())
	if err != nil {
		log.Fatalf("did not connect: %v", err)
	}
	defer conn.Close()
	client := pb.NewStreamServiceClient(conn)

	// 客户端流模式
	clientStream, err := client.ClientStream(context.Background())
	if err != nil {
		log.Fatalf("could not create client stream: %v", err)
	}
	for i := 0; i < 5; i++ {
		if err := clientStream.Send(&pb.ClientRequest{Message: "Message " + strconv.Itoa(i)}); err != nil {
			log.Fatalf("could not send message: %v", err)
		}
	}
	resp, err := clientStream.CloseAndRecv()
	if err != nil {
		log.Fatalf("could not receive response: %v", err)
	}
	log.Printf("ClientStream response: %v", resp.Message)

	// 服务端流模式
	serverStream, err := client.ServerStream(context.Background(), &pb.ServerRequest{Message: "Hello"})
	if err != nil {
		log.Fatalf("could not create server stream: %v", err)
	}
	for {
		resp, err := serverStream.Recv()
		if err == io.EOF {
			break
		}
		if err != nil {
			log.Fatalf("could not receive message: %v", err)
		}
		log.Printf("ServerStream response: %v", resp.Message)
	}

	// 双向流模式
	bidirectionalStream, err := client.BidirectionalStream(context.Background())
	if err != nil {
		log.Fatalf("could not create bidirectional stream: %v", err)
	}
	go func() {
		for i := 0; i < 5; i++ {
			if err := bidirectionalStream.Send(&pb.BidirectionalRequest{Message: "Message " + strconv.Itoa(i)}); err != nil {
				log.Fatalf("could not send message: %v", err)
			}
			time.Sleep(1 * time.Second)
		}
		bidirectionalStream.CloseSend()
	}()
	for {
		resp, err := bidirectionalStream.Recv()
		if err == io.EOF {
			break
		}
		if err != nil {
			log.Fatalf("could not receive message: %v", err)
		}
		log.Printf("BidirectionalStream response: %v", resp.Message)
	}
}
【声明】本内容来自华为云开发者社区博主,不代表华为云及华为云开发者社区的观点和立场。转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息,否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。