使用 Golang 进行长时间轮询!

举报
Rolle 发表于 2023/12/28 17:47:28 2023/12/28
【摘要】 让我们想象一下,我们正在创建一个简单的多人棋盘游戏。玩家不会经常移动,但我们希望确保他们立即收到新的更新。你可能会说这是显而易见的,我们应该使用 WebSocket。你基本上是对的,但让我们先探索其他选项。 以恒定的时间间隔检索数据!这很简单。玩家每秒检索一次数据。但是如果我们这样做,我们将收到许多无用的请求。无论如何,让我们创建一个简单的项目,允许用户收听并向其他人广播消息。想象一下,这些...

让我们想象一下,我们正在创建一个简单的多人棋盘游戏。玩家不会经常移动,但我们希望确保他们立即收到新的更新。你可能会说这是显而易见的,我们应该使用 WebSocket。你基本上是对的,但让我们先探索其他选项。

以恒定的时间间隔检索数据!

这很简单。玩家每秒检索一次数据。但是如果我们这样做,我们将收到许多无用的请求。无论如何,让我们创建一个简单的项目,允许用户收听并向其他人广播消息。想象一下,这些信息就像棋盘游戏中的动作。玩家 1 使用了他的终极牌!玩家 2 将他的经纪人移动到该坐标。在本教程中,我们将保持它非常简单。


// CappedQueue stores items in FIFO order, but it has a capacity and will delete older items if capacity is reached.
type CappedQueue[T any] struct {
   items    []T
   lock     *sync.RWMutex
   capacity int
}

func NewCappedQueue[T any](capacity int) *CappedQueue[T] {
   return &CappedQueue[T]{
      items:    make([]T, 0, capacity),
      lock:     new(sync.RWMutex),
      capacity: capacity,
   }
}

func (q *CappedQueue[T]) Append(item T) {
   q.lock.Lock()
   defer q.lock.Unlock()

   if l := len(q.items); l == 0 {
      q.items = append(q.items, item)
   } else {
      to := q.capacity - 1
      if l < q.capacity {
         to = l
      }
      q.items = append([]T{item}, q.items[:to]...)
   }
}

func (q *CappedQueue[T]) Copy() []T {
   q.lock.RLock()
   defer q.lock.RUnlock()

   copied := make([]T, len(q.items))
   for i, item := range q.items {
      copied[i] = item
   }
   return copied
}

首先,我创建了一个简单的结构来保存最新消息。如果容量已满,它将删除较旧的邮件。


type SendMessageRequest struct {
   Message string `json:"message"`
}

func main() {
   q := queue.NewCappedQueue[string](10)
   e := echo.New()
   e.GET("updates", func(c echo.Context) error {
      return c.JSON(200, q.Copy())
   })
   e.POST("send", func(c echo.Context) error {
      var request SendMessageRequest
      if err := c.Bind(&request); err != nil {
         return c.String(400, fmt.Sprintf("Bad request: %v", err))
      }
      q.Append(request.Message)
      return c.JSON(201, "I've sent your request.")
   })
   e.Logger.Fatal(e.Start(":8000"))
}

正如你所看到的,我用 Echo 创建了一个简单的 HTTP 路由器,并使用队列向其他用户发送消息。如果我们向 /send 端点发送消息,其他用户可以通过调用 /updates 来获取该消息。让我们试试吧。

现在,每个客户端必须每隔几秒钟调用一次 /updates,以确保每个人都几乎可以立即看到消息。但是,让我们计算一下如果我们在生产中使用此解决方案会发生什么。

  • 如果我们有十个在线用户在思考,他们一分钟没有采取任何行动。我们每秒发出十个无用的请求!

  • 如果他们在玩呢?然后我们每分钟大约有十个动作。但服务器每秒接收大约 600 个请求。其中590个没用!

让我们用长轮询来解决这些问题!

等到你有新的更新给我!

长轮询相当简单。简而言之,这意味着在进行更新之前,我们不会回答/updates请求。为此,我们需要确保客户端的超时时间过高。就像 60 秒一样。如果我们没有任何新的更新,我们将不会回答该请求,我们将保持开放状态,直到我们获得新的更新。


type Update struct {
   CreatedAt int64
   Message   string
}
...
func main() {
   q := queue.NewCappedQueue[Update](10)
   ...
   e.GET("updates", func(c echo.Context) error {
      lastUpdate := c.QueryParam("lastUpdate")
      lastUpdateUnix, _ := strconv.ParseInt(lastUpdate, 10, 64)
      var updates []Update
      for {
         updates = pkg.Filter(q.Copy(), func(update Update) bool {
            return update.CreatedAt > lastUpdateUnix
         })
         if len(updates) != 0 {
            break
         }
         select {
         case <-c.Request().Context().Done():
         case <-time.After(time.Second):
         }
      }
      return c.JSON(200, updates)
   })
   e.POST("send", func(c echo.Context) error {
      ...
      q.Append(Update{
         CreatedAt: time.Now().Unix(),
         Message:   request.Message,
      })
      ...
   })
   e.Logger.Fatal(e.Start(":8000"))
}

这似乎令人困惑,但让我们来看看它。

  • 我创建了 Update 类型,它允许我们保存每条消息的创建时间。

  • 如果客户端向我们发送了上次收到的消息的时间戳,我们将过滤消息并仅保留新消息。我们循环执行此操作,以确保如果没有新的更新,我们将等待并重试,直到收到新消息。

建立 TCP 连接的成本很高。我们不需要每秒建立一个新的TCP连接。

客户端不需要每秒发出一个新请求。打开 I/O 会占用大量 CPU,如果我们每秒发出一个请求,较旧的 I/O 会过热。

我们在没有将新技术引入生产的情况下解决了这些问题。

我们还有什么问题?

  • 该应用程序不是实时的!我们有大约 500 毫秒的延迟。

  • 我们每秒检查一次新的更新,这可能会给数据库带来压力。

我们可以使用 pub-sub,比如 Redis,来解决一些提到的问题。如果我们在发布订阅频道上收听,则无需每秒检查一次新的更新。在用户创建新更新后,每个请求都会收到通知,并向客户端显示新的更新。


type PubSub struct {
   channels []chan struct{}
   lock     *sync.RWMutex
}

func NewPubSub() *PubSub {
   return &PubSub{
      channels: make([]chan struct{}, 0),
      lock:     new(sync.RWMutex),
   }
}

func (p *PubSub) Subscribe() (<-chan struct{}, func()) {
   p.lock.Lock()
   defer p.lock.Unlock()

   c := make(chan struct{}, 1)
   p.channels = append(p.channels, c)
   return c, func() {
      p.lock.Lock()
      defer p.lock.Unlock()

      for i, channel := range p.channels {
         if channel == c {
            p.channels = append(p.channels[:i], p.channels[i+1:]...)
            close(c)
            return
         }
      }
   }
}

func (p *PubSub) Publish() {
   p.lock.RLock()
   defer p.lock.RUnlock()

   for _, channel := range p.channels {
      channel <- struct{}{}
   }
}


...
func main() {
   q := queue.NewCappedQueue[Update](10)
   ps := pubsub.NewPubSub()
   ...
   e.GET("updates", func(c echo.Context) error {
      lastUpdate := c.QueryParam("lastUpdate")
      lastUpdateUnix, _ := strconv.ParseInt(lastUpdate, 10, 64)
      getUpdates := func() []Update {
         return pkg.Filter(q.Copy(), func(update Update) bool {
            return update.CreatedAt > lastUpdateUnix
         })
      }
      // show it to user if we already have an update
      if updates := getUpdates(); len(updates) > 0 {
         return c.JSON(200, updates)
      }
      ch, close := ps.Subscribe()
      defer close()

      select {
      case <-ch:
         return c.JSON(200, getUpdates())
      case <-c.Request().Context().Done():
         return c.String(http.StatusRequestTimeout, "timeout")
      }
   })

   e.POST("send", func(c echo.Context) error {
      ...
      ps.Publish()
      return c.JSON(201, "I've sent your request.")
   })
    ...
}

在某些情况下,我们可以使用长轮询而不是套接字。它为我们提供了一个相当简单的通信协议。如果我们没有太多更新(比如群聊!),并且立即更新对我们来说至关重要,那么长轮询是一个不错的选择。

【声明】本内容来自华为云开发者社区博主,不代表华为云及华为云开发者社区的观点和立场。转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息,否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。