在 Golang 中使用 NATS 的 HTTP over Message Broker
分布式服务之间基于消息的通信并非易事,因为响应目标并不明显。发送响应的惯用方式是具有第二个队列的异步请求/响应。
第二个队列可以是扇出主题,所有请求发布者都订阅其中,也可以是临时队列,该队列是为每条消息创建的,将在收到响应后删除。(可选)可以使用第 3 个队列将响应传播给其他参与者:
如果我们的架构不完全支持服务网格,那么使用异步请求/响应很重要。一个典型的例子是在 Kubernetes 中运行迷你服务。Kubernetes 是运行微服务的著名编排器,但运行多个具有不同配置的微服务很复杂。在微服务中部署服务降低了复杂性,但引入了一个新问题:很难访问短期服务(内部 worker 和 jobs)或单例服务,因为 Kubernetes Service 负载均衡器不支持它,请参阅示例:
此问题有几种解决方案,例如:
- Kubernetes 服务网格
- 服务注册表(例如:Consul)
- 使用消息代理进行异步请求/响应
本文介绍异步请求/响应。
本文的第二部分解释了消息传递如何成为 HTTP 客户端-服务器通信(包括 REST)的传输层。也可以与 HTTP 客户端和服务器中间件集成。如果 HTTP 客户端和服务器存根是从 OpenAPI 规范生成的,则说明的解决方案也支持它。
Message Broker 客户端库的适配器
使用依赖关系注入 (DI) 来实现 SOLID 的依赖关系反转原则 (DIP),应为客户端和服务器端定义接口,这些接口由消息代理客户端库适配器实现。
软件架构应该使用接口进行规划,其中的实现可以通过简单的方式进行更改。
接口定义对双方来说都很简单:
type Request struct {
Queue string `json:"queue"`
Header map[string][]string `json:"header"`
Payload []byte `json:"payload"`
}
type Response struct {
Header map[string][]string `json:"header"`
Payload []byte `json:"payload"`
Status int `json:"status"`
Error string `json:"error"`
}
type MsgTransporter interface {
MsgReqResp(ctx context.Context, req Request) (*Response, error)
}
所有消息代理都支持发布/订阅消息传递。一些消息代理客户端库本身支持异步请求/响应。
适配器
NATS 是一个现代著名的微服务消息代理。NATS Go 客户端库本身支持异步请求/响应,因此在示例中选择了 NATS。
NATS 核心支持井消息主题,但不完全支持传统队列。客户端库或多或少地隐藏了这个缺点,因此必须牢记此信息,请参阅队列组。
客户端
请求/响应教程显示了请求/响应调用的简单程度。让我们看看客户端代码:
nc, err := nats.Connect("demo.nats.io")
(...)
msg, err := nc.Request("time", nil, time.Second)
(...)
nc.Close()
超时可以用 context.Context 处理,只需选择另一个函数即可。NATS 客户端库也支持类似 HTTP 标头的键值对。最后,the RequestMsqWithContext 是最好的函数,其中 Msg.Subject 、 Msh.Header 和 Msg.Data 字段很有用,请看它的声明:
func (nc *Conn) RequestMsgWithContext(ctx context.Context, msg *Msg) (*Msg, error)
RequestMsgWithContext 用法示例,其中 用作 Msg.Subject 队列名称:
respMsg, err := c.conn.RequestMsgWithContext(ctx, &nats.Msg{
Subject: queue,
Header: header,
Data: payload,
})
在后台,NATS 客户端库为响应创建一个临时队列,并使用标头和有效负载将其包装。
服务器端
本教程在服务器端使用 SubscribeSync 函数,但它的行为类似于扇出主题,将消息传递给所有订阅者。例如,它可用于收集所有订阅者的状态。对于点对点通信,是正确的 QueueSubscribe 选项,它仅将消息传递给一个订阅者
nc, err := nats.Connect("demo.nats.io")
(...)
err := nc.QueueSubscribe("updates", "workers", func(m *nats.Msg) {
(...)
err = m.Respond([]byte("Done"))
(...)
})
不支持 Respond 标头,但 RespondMsg 支持,请参阅其声明:
func (m *Msg) RespondMsg(msg *Msg) error
- 点赞
- 收藏
- 关注作者
评论(0)