在 Golang 中使用 NATS 的 HTTP over Message Broker

举报
Rolle 发表于 2023/12/21 22:38:05 2023/12/21
【摘要】 分布式服务之间基于消息的通信并非易事,因为响应目标并不明显。发送响应的惯用方式是具有第二个队列的异步请求/响应。第二个队列可以是扇出主题,所有请求发布者都订阅其中,也可以是临时队列,该队列是为每条消息创建的,将在收到响应后删除。(可选)可以使用第 3 个队列将响应传播给其他参与者:如果我们的架构不完全支持服务网格,那么使用异步请求/响应很重要。一个典型的例子是在 Kubernetes 中运行...

分布式服务之间基于消息的通信并非易事,因为响应目标并不明显。发送响应的惯用方式是具有第二个队列的异步请求/响应。

image.png

第二个队列可以是扇出主题,所有请求发布者都订阅其中,也可以是临时队列,该队列是为每条消息创建的,将在收到响应后删除。(可选)可以使用第 3 个队列将响应传播给其他参与者:

image.png

如果我们的架构不完全支持服务网格,那么使用异步请求/响应很重要。一个典型的例子是在 Kubernetes 中运行迷你服务。Kubernetes 是运行微服务的著名编排器,但运行多个具有不同配置的微服务很复杂。在微服务中部署服务降低了复杂性,但引入了一个新问题:很难访问短期服务(内部 worker 和 jobs)或单例服务,因为 Kubernetes Service 负载均衡器不支持它,请参阅示例:

image.png

此问题有几种解决方案,例如:

  • Kubernetes 服务网格
  • 服务注册表(例如:Consul)
  • 使用消息代理进行异步请求/响应

本文介绍异步请求/响应。

本文的第二部分解释了消息传递如何成为 HTTP 客户端-服务器通信(包括 REST)的传输层。也可以与 HTTP 客户端和服务器中间件集成。如果 HTTP 客户端和服务器存根是从 OpenAPI 规范生成的,则说明的解决方案也支持它。

Message Broker 客户端库的适配器

使用依赖关系注入 (DI) 来实现 SOLID 的依赖关系反转原则 (DIP),应为客户端和服务器端定义接口,这些接口由消息代理客户端库适配器实现。

软件架构应该使用接口进行规划,其中的实现可以通过简单的方式进行更改。

image.png

接口定义对双方来说都很简单:


type Request struct {
 Queue   string              `json:"queue"`
 Header  map[string][]string `json:"header"`
 Payload []byte              `json:"payload"`
}

type Response struct {
 Header  map[string][]string `json:"header"`
 Payload []byte              `json:"payload"`
 Status  int                 `json:"status"`
 Error   string              `json:"error"`
}

type MsgTransporter interface {
 MsgReqResp(ctx context.Context, req Request) (*Response, error)
}

所有消息代理都支持发布/订阅消息传递。一些消息代理客户端库本身支持异步请求/响应。

适配器

NATS 是一个现代著名的微服务消息代理。NATS Go 客户端库本身支持异步请求/响应,因此在示例中选择了 NATS。

NATS 核心支持井消息主题,但不完全支持传统队列。客户端库或多或少地隐藏了这个缺点,因此必须牢记此信息,请参阅队列组。

客户端

请求/响应教程显示了请求/响应调用的简单程度。让我们看看客户端代码:


nc, err := nats.Connect("demo.nats.io")
(...)
msg, err := nc.Request("time", nil, time.Second)
(...)
nc.Close()

超时可以用 context.Context 处理,只需选择另一个函数即可。NATS 客户端库也支持类似 HTTP 标头的键值对。最后,the RequestMsqWithContext 是最好的函数,其中 Msg.Subject 、 Msh.Header 和 Msg.Data 字段很有用,请看它的声明:


func (nc *Conn) RequestMsgWithContext(ctx context.Context, msg *Msg) (*Msg, error)


RequestMsgWithContext 用法示例,其中 用作 Msg.Subject 队列名称:


 respMsg, err := c.conn.RequestMsgWithContext(ctx, &nats.Msg{
  Subject: queue,
  Header:  header,
  Data:    payload,
 })

在后台,NATS 客户端库为响应创建一个临时队列,并使用标头和有效负载将其包装。

服务器端

本教程在服务器端使用 SubscribeSync 函数,但它的行为类似于扇出主题,将消息传递给所有订阅者。例如,它可用于收集所有订阅者的状态。对于点对点通信,是正确的 QueueSubscribe 选项,它仅将消息传递给一个订阅者


nc, err := nats.Connect("demo.nats.io")
(...)
err := nc.QueueSubscribe("updates", "workers", func(m *nats.Msg) {
    (...)
    err = m.Respond([]byte("Done"))
    (...)
})

不支持 Respond 标头,但 RespondMsg 支持,请参阅其声明:

func (m *Msg) RespondMsg(msg *Msg) error

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。