高性能消息中间件 NSQ 解析-应用实践

举报
aoho 发表于 2021/03/14 20:58:17 2021/03/14
【摘要】 Nsq 是用 Go 语言开发的轻量级的分布式消息队列,适合小型项目使用、用来学习消息队列实现原理,对于学习 Go channel的原理和用法,以及如何用 Go 语言来写分布式是一个很不错的入门项目。我们在上一篇文章整体介绍了 nsq 的组成以及各个模块的功能,本文将会带领大家一起实践 nsq 的安装,并基于 nsq 提供的 API 进行实践。 安装使用在官网(https://nsq.io/o...

Nsq 是用 Go 语言开发的轻量级的分布式消息队列,适合小型项目使用、用来学习消息队列实现原理,对于学习 Go channel的原理和用法,以及如何用 Go 语言来写分布式是一个很不错的入门项目。

我们在上一篇文章整体介绍了 nsq 的组成以及各个模块的功能,本文将会带领大家一起实践 nsq 的安装,并基于 nsq 提供的 API 进行实践。

安装使用

在官网(https://nsq.io/overview/quick_start.html) 下载对应的二进制可执行文件。

# 启动nsqlookupd
$ nsqlookupd
# 启动 nsqd
$ nsqd --lookupd-tcp-address=127.0.0.1:4160
# 启动 nsqadmin
$ nsqadmin --lookupd-http-address=127.0.0.1:4161

# 创建topic,发送消息
$ curl -d 'hello world 1' 'http://127.0.0.1:4151/pub?topic=test'
# 启动nsq_to_file
$ nsq_to_file --topic=test --output-dir=/tmp --lookupd-http-address=127.0.0.1:4161
# 发布消息到 nsqd
$ curl -d 'hello world 2' 'http://127.0.0.1:4151/pub?topic=test'
$ curl -d 'hello world 3' 'http://127.0.0.1:4151/pub?topic=test'

在本地按照上述步骤就可以跑起来了。

创建生产者

安装好 nsq 的几个服务之后,我们来实现基于 nsq 的生产和消费示例。首先是创建生产者:

package main

import (
  "fmt"
  "log"
  "time"

  "github.com/nsqio/go-nsq"
)

func main() {
  config := nsq.NewConfig()
  p, err := nsq.NewProducer("127.0.0.1:4150", config)

  if err != nil {
    log.Panic(err)
  }

  for i := 0; i < 1000; i++ {
    msg := fmt.Sprintf("num-%d", i)
    log.Println("Pub:" + msg)
    err = p.Publish("testTopic", []byte(msg))
    if err != nil {
      log.Panic(err)
    }
    time.Sleep(time.Second * 1)
  }

  p.Stop()
}

生产者的逻辑比较简单,基于 nsq 官方提供的 github.com/nsqio/go-nsq包,通过调用,循环写 1000 个字符+数字,即 num-n 的形式,通过 p.Publish 发送到消息队列中,等待消费。

消费者

接着,我们创建消费者:consumer.go 来消费刚刚生产的消息。

package main

import (
  "log"
  "sync"

  "github.com/nsqio/go-nsq"
)

func main() {
  wg := &sync.WaitGroup{}
  wg.Add(1000)

  config := nsq.NewConfig()
  c, _ := nsq.NewConsumer("testTopic", "ch", config)
  c.AddHandler(nsq.HandlerFunc(func(message *nsq.Message) error {
    log.Printf("Got a message: %s", message.Body)
    wg.Done()
    return nil
  }))

  // 1.直连nsqd
  // err := c.ConnectToNSQD("127.0.0.1:4150")

  // 2.通过 nsqlookupd 服务发现
  err := c.ConnectToNSQLookupd("127.0.0.1:4161")
  if err != nil {
    log.Panic(err)
  }
  wg.Wait()
}

可通过两种方式与 nsqd 连接:

  • 直连 nsqd,适用于单机(standalone)版;
  • 通过 nsqlookupd 服务发现,适用于集群(cluster)版;

消费消息的动作,主要逻辑就是打印出来,实际业务中需要进行其他处理。

运行结果

依次启动生产者和消费者的服务,可以分别看到如下的输出结果:

$go run producer.go

2020/12/28 20:29:51 Pub:num-0
2020/12/28 20:29:51 INF    1 (127.0.0.1:4150) connecting to nsqd
2020/12/28 20:29:52 Pub:num-1
2020/12/28 20:29:53 Pub:num-2
2020/12/28 20:29:54 Pub:num-3
2020/12/28 20:29:55 Pub:num-4
2020/12/28 20:29:56 Pub:num-5
2020/12/28 20:29:57 Pub:num-6
2020/12/28 20:29:58 Pub:num-7
2020/12/28 20:29:59 Pub:num-8
2020/12/28 20:30:00 Pub:num-9
2020/12/28 20:30:01 Pub:num-10

$ go run consumer.go

2020/12/28 20:30:08 INF    1 [testTopic/ch] querying nsqlookupd http://127.0.0.1:4161/lookup?topic=testTopic
2020/12/28 20:30:08 INF    1 [testTopic/ch] (10.236.92.208:4150) connecting to nsqd
2020/12/28 20:30:08 Got a message: num-0
2020/12/28 20:30:08 Got a message: num-1
2020/12/28 20:30:08 Got a message: num-2
2020/12/28 20:30:08 Got a message: num-3
2020/12/28 20:30:08 Got a message: num-4
2020/12/28 20:30:08 Got a message: num-5
2020/12/28 20:30:08 Got a message: num-6
2020/12/28 20:30:08 Got a message: num-7
2020/12/28 20:30:08 Got a message: num-8
2020/12/28 20:30:08 Got a message: num-9
2020/12/28 20:30:08 Got a message: num-10

通过如上的示例,我们已经成功地实现 NSQ 的应用。下面我们将解析 NSQ 的几个核心部分。

小结

本文主要介绍 nsq 的安装使用,下载好可执行文件之后,依次启动 nsqlookupd、nsqd、nsqadmin 几个服务。接着我们基于官方提供的客户端 API 包实现了生产消费模型的案例。通过简单的案例,我们能够对 nsq 的安装和基本使用有一个了解。

下一篇文章,将会具体分析 nsq 实现的细节。

推荐阅读

高性能消息中间件 NSQ 解析-整体介绍

微服务架构中使用 ELK 进行日志采集以及统一处理

没有 try-catch,该如何处理 Go 错误异常?

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。