如何在EasyCVR中实现NSQ延时推流技术?

举报
TSINGSEE青犀视频 发表于 2021/11/18 16:57:59 2021/11/18
【摘要】 EasyCVR 使用 NSQ 时,希望延时 60s,消费端才能够收到对应的消息,因此我们本文主要是调研是否有该功能的过程,我们主要使用 DeferredPublish 方法实现。

EasyCVR 各模块之间进行消息通信时,需要一款消息中间件进行消息的传输和发送。在调研各种 MQ 中间件后,确定采用 NSQ 来进行编译。

EasyCVR 使用 NSQ 时,希望延时 60s,消费端才能够收到对应的消息,因此我们本文主要是调研是否有该功能的过程,我们主要使用 DeferredPublish 方法实现,方法代码如下:

package main

import (
   "fmt"
   "github.com/nsqio/go-nsq"
   "log"
   "time"
   "zhangqiadams.com/gotools/model/consts"
)

func main() {
   config := nsq.NewConfig()
   // 1. 向 nsqd 的 tcp 端口发送消息,因此进行对应的配置
   producer, err := nsq.NewProducer("127.0.0.1:4154", config)
   if err != nil {
      log.Fatal(err)
   }

   messageBody := []byte("hello world delay")
   topicName := "topic2"

   // 2. 同步推流到 nspd, 同步推流代表等待 nspd 的响应,如果发送失败返回错误。
   // PublishAsync 代表是异步推送消息给 nspd,发送完消息后立刻返回
   err = producer.DeferredPublish(topicName, 60*time.Second, messageBody)
   fmt.Println("发送消息时间为", time.Now().Format(consts.TimeFormat))
   if err != nil {
      log.Fatal(err)
   }

   /*sigChan := make(chan os.Signal, 1)
   signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
   <-sigChan*/

   // 3. 停止生产者,一般在停止服务,停止进程的时候需要调用
   producer.Stop()
}

在 14:06:45 开始发送了一个消息。

消费者在 60s 后收到消息,14:07:46 收到对应的消息。

经过代码确认,延时消息的发送是在 nsqd 中进行实现的,延时推流功能已经实现。

【声明】本内容来自华为云开发者社区博主,不代表华为云及华为云开发者社区的观点和立场。转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息,否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。