高性能消息中间件 NSQ 解析-nsqd 实现细节介绍
【摘要】 我们在 前面 介绍了 nsq 的相关概念以及 nsq 的安装与应用。从本篇开始将会结合源码介绍 nsq 的实现细节。nsq 中单个 nsqd 可以有多个 topic,每个 topic 可以有多个 channel。channel 接收这个 topic 所有消息的副本,从而实现多播分发,而 channel 上的每个消息被均匀的分发给它的订阅者,从而实现负载均衡。 入口函数首先看下 nsqd 的入...
我们在 前面 介绍了 nsq 的相关概念以及 nsq 的安装与应用。从本篇开始将会结合源码介绍 nsq 的实现细节。
nsq 中单个 nsqd 可以有多个 topic,每个 topic 可以有多个 channel。channel 接收这个 topic 所有消息的副本,从而实现多播分发,而 channel 上的每个消息被均匀的分发给它的订阅者,从而实现负载均衡。
入口函数
首先看下 nsqd 的入口函数:
//位于 apps/nsqd/main.go:26
func main() {
prg := &program{}
if err := svc.Run(prg, syscall.SIGINT, syscall.SIGTERM); err != nil {
logFatal("%s", err)
}
}
func (p *program) Init(env svc.Environment) error {
if env.IsWindowsService() {
dir := filepath.Dir(os.Args[0])
return os.Chdir(dir)
}
return nil
}
func (p *program) Start() error {
opts := nsqd.NewOptions()
flagSet := nsqdFlagSet(opts)
flagSet.Parse(os.Args[1:])
...
}
通过第三方 svc 包进行优雅的后台进程管理,svc.Run() -> svc.Init() -> svc.Start(),启动 nsqd 实例。
配置项初始化
初始化配置项(opts, cfg),加载历史数据(nsqd.LoadMetadata)、持久化最新数据(nsqd.PersistMetadata),然后开启协程,进入 nsqd.Main() 主函数。
// 位于 apps/nsqd/main.go:64
options.Resolve(opts, flagSet, cfg)
nsqd, err := nsqd.New(opts)
if err != nil {
logFatal("failed to instantiate nsqd - %s", err)
}
p.nsqd = nsqd
err = p.nsqd.LoadMetadata()
if err != nil {
logFatal("failed to load metadata - %s", err)
}
err = p.nsqd.PersistMetadata()
if err != nil {
logFatal("failed to persist metadata - %s", err)
}
go func() {
err := p.nsqd.Main()
if err != nil {
p.Stop()
os.Exit(1)
}
}()
接着就是初始化 tcpServer, httpServer, httpsServer,然后循环监控队列信息(n.queueScanLoop)、节点信息管理(n.lookupLoop)、统计信息(n.statsdLoop)输出。
// 位于 nsqd/nsqd.go:262
n.tcpServer.nsqd = n
n.waitGroup.Wrap(func() {
exitFunc(protocol.TCPServer(n.tcpListener, tcpServer, n.logf))
})
httpServer := newHTTPServer(ctx, false, n.getOpts().TLSRequired == TLSRequired)
n.waitGroup.Wrap(func() {
exitFunc(http_api.Serve(n.httpListener, httpServer, "HTTP", n.logf))
})
if n.tlsConfig != nil && n.getOpts().HTTPSAddress != "" {
httpsServer := newHTTPServer(ctx, true, true)
n.waitGroup.Wrap(func() {
exitFunc(http_api.Serve(n.httpsListener, httpsServer, "HTTPS", n.logf))
})
}
n.waitGroup.Wrap(n.queueScanLoop)
n.waitGroup.Wrap(n.lookupLoop)
if n.getOpts().StatsdAddress != "" {
n.waitGroup.Wrap(n.statsdLoop)
}
处理请求
分别处理 tcp/http 请求,开启 handler 协程进行并发处理,其中 newHTTPServer 注册路由采用了 Decorate 装饰器模式(后面会进一步解析);
// 位于 nsqd/http.go:44
router := httprouter.New()
router.HandleMethodNotAllowed = true
router.PanicHandler = http_api.LogPanicHandler(ctx.nsqd.logf)
router.NotFound = http_api.LogNotFoundHandler(ctx.nsqd.logf)
router.MethodNotAllowed = http_api.LogMethodNotAllowedHandler(ctx.nsqd.logf)
s := &httpServer{
ctx: ctx,
tlsEnabled: tlsEnabled,
tlsRequired: tlsRequired,
router: router,
}
router.Handle("GET", "/ping", http_api.Decorate(s.pingHandler, log, http_api.PlainText))
router.Handle("GET", "/info", http_api.Decorate(s.doInfo, log, http_api.V1))
// v1 negotiate
router.Handle("POST", "/pub", http_api.Decorate(s.doPUB, http_api.V1))
router.Handle("POST", "/mpub", http_api.Decorate(s.doMPUB, http_api.V1))
router.Handle("GET", "/stats", http_api.Decorate(s.doStats, log, http_api.V1))
// only v1
router.Handle("POST", "/topic/create", http_api.Decorate(s.doCreateTopic, log, http_api.V1))
router.Handle("POST", "/topic/delete", http_api.Decorate(s.doDeleteTopic, log, http_api.V1))
http-Decorate 路由分发
// 位于 internal/protocol/tcp_server.go:22
for {
clientConn, err := listener.Accept()
if err != nil {
if nerr, ok := err.(net.Error); ok && nerr.Temporary() {
logf(lg.WARN, "temporary Accept() failure - %s", err)
runtime.Gosched()
continue
}
// theres no direct way to detect this error because it is not exposed
if !strings.Contains(err.Error(), "use of closed network connection") {
return fmt.Errorf("listener.Accept() error - %s", err)
}
break
}
go handler.Handle(clientConn)
}
如上的实现为 tcp-handler 处理的主要代码。
tcp 解析协议
tcp 解析 V2 协议,走内部协议封装的 prot.IOLoop(conn) 进行处理;
// 位于 nsqd/tcp.go:34
var prot protocol.Protocol
switch protocolMagic {
case " V2":
prot = &protocolV2{ctx: p.ctx}
default:
protocol.SendFramedResponse(clientConn, frameTypeError, []byte("E_BAD_PROTOCOL"))
clientConn.Close()
p.ctx.nsqd.logf(LOG_ERROR, "client(%s) bad protocol magic '%s'",
clientConn.RemoteAddr(), protocolMagic)
return
}
err = prot.IOLoop(clientConn)
if err != nil {
p.ctx.nsqd.logf(LOG_ERROR, "client(%s) - %s", clientConn.RemoteAddr(), err)
return
}
消息生成与消费
通过内部协议进行 p.Exec(执行命令)、p.Send(发送结果),保证每个 nsqd 节点都能正确的进行消息生成与消费,一旦上述过程有 error 都会被捕获处理,确保分布式投递的可靠性。
// 位于 nsqd/protocol_v2.go:79
params := bytes.Split(line, separatorBytes)
p.ctx.nsqd.logf(LOG_DEBUG, "PROTOCOL(V2): [%s] %s", client, params)
var response []byte
response, err = p.Exec(client, params)
if err != nil {
ctx := ""
if parentErr := err.(protocol.ChildErr).Parent(); parentErr != nil {
ctx = " - " + parentErr.Error()
}
p.ctx.nsqd.logf(LOG_ERROR, "[%s] - %s%s", client, err, ctx)
sendErr := p.Send(client, frameTypeError, []byte(err.Error()))
if sendErr != nil {
p.ctx.nsqd.logf(LOG_ERROR, "[%s] - %s%s", client, sendErr, ctx)
break
}
// errors of type FatalClientErr should forceably close the connection
if _, ok := err.(*protocol.FatalClientErr); ok {
break
}
continue
}
if response != nil {
err = p.Send(client, frameTypeResponse, response)
if err != nil {
err = fmt.Errorf("failed to send response - %s", err)
break
}
}
nsqd 也会同时开启 tcp 和 http 服务,两个服务都可以提供给生产者和消费者,http 服务还提供给 nsqadmin 获取该 nsqd 本地 topic 和 channel 信息。
小结
本文主要介绍 nsqd,总的来说 nsqd 的实现并不复杂。nsqd 是一个守护进程,负责接收(生产者 producer )、排队(最小堆实现)、投递(消费者 consumer )消息给客户端。nsqd 可以独立运行,但通常是由 nsqlookupd 实例所在集群配置的。
下一篇文章,将会具体分析 nsq 中其他模块实现的细节。
推荐阅读
订阅最新文章,欢迎关注我的公众号
【声明】本内容来自华为云开发者社区博主,不代表华为云及华为云开发者社区的观点和立场。转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息,否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱:
cloudbbs@huaweicloud.com
- 点赞
- 收藏
- 关注作者
作者其他文章
评论(0)