使用 Golang 进行长时间轮询!
让我们想象一下,我们正在创建一个简单的多人棋盘游戏。玩家不会经常移动,但我们希望确保他们立即收到新的更新。你可能会说这是显而易见的,我们应该使用 WebSocket。你基本上是对的,但让我们先探索其他选项。
以恒定的时间间隔检索数据!
这很简单。玩家每秒检索一次数据。但是如果我们这样做,我们将收到许多无用的请求。无论如何,让我们创建一个简单的项目,允许用户收听并向其他人广播消息。想象一下,这些信息就像棋盘游戏中的动作。玩家 1 使用了他的终极牌!玩家 2 将他的经纪人移动到该坐标。在本教程中,我们将保持它非常简单。
// CappedQueue stores items in FIFO order, but it has a capacity and will delete older items if capacity is reached.
type CappedQueue[T any] struct {
items []T
lock *sync.RWMutex
capacity int
}
func NewCappedQueue[T any](capacity int) *CappedQueue[T] {
return &CappedQueue[T]{
items: make([]T, 0, capacity),
lock: new(sync.RWMutex),
capacity: capacity,
}
}
func (q *CappedQueue[T]) Append(item T) {
q.lock.Lock()
defer q.lock.Unlock()
if l := len(q.items); l == 0 {
q.items = append(q.items, item)
} else {
to := q.capacity - 1
if l < q.capacity {
to = l
}
q.items = append([]T{item}, q.items[:to]...)
}
}
func (q *CappedQueue[T]) Copy() []T {
q.lock.RLock()
defer q.lock.RUnlock()
copied := make([]T, len(q.items))
for i, item := range q.items {
copied[i] = item
}
return copied
}
首先,我创建了一个简单的结构来保存最新消息。如果容量已满,它将删除较旧的邮件。
type SendMessageRequest struct {
Message string `json:"message"`
}
func main() {
q := queue.NewCappedQueue[string](10)
e := echo.New()
e.GET("updates", func(c echo.Context) error {
return c.JSON(200, q.Copy())
})
e.POST("send", func(c echo.Context) error {
var request SendMessageRequest
if err := c.Bind(&request); err != nil {
return c.String(400, fmt.Sprintf("Bad request: %v", err))
}
q.Append(request.Message)
return c.JSON(201, "I've sent your request.")
})
e.Logger.Fatal(e.Start(":8000"))
}
正如你所看到的,我用 Echo 创建了一个简单的 HTTP 路由器,并使用队列向其他用户发送消息。如果我们向 /send 端点发送消息,其他用户可以通过调用 /updates 来获取该消息。让我们试试吧。
现在,每个客户端必须每隔几秒钟调用一次 /updates,以确保每个人都几乎可以立即看到消息。但是,让我们计算一下如果我们在生产中使用此解决方案会发生什么。
-
如果我们有十个在线用户在思考,他们一分钟没有采取任何行动。我们每秒发出十个无用的请求!
-
如果他们在玩呢?然后我们每分钟大约有十个动作。但服务器每秒接收大约 600 个请求。其中590个没用!
让我们用长轮询来解决这些问题!
等到你有新的更新给我!
长轮询相当简单。简而言之,这意味着在进行更新之前,我们不会回答/updates请求。为此,我们需要确保客户端的超时时间过高。就像 60 秒一样。如果我们没有任何新的更新,我们将不会回答该请求,我们将保持开放状态,直到我们获得新的更新。
type Update struct {
CreatedAt int64
Message string
}
...
func main() {
q := queue.NewCappedQueue[Update](10)
...
e.GET("updates", func(c echo.Context) error {
lastUpdate := c.QueryParam("lastUpdate")
lastUpdateUnix, _ := strconv.ParseInt(lastUpdate, 10, 64)
var updates []Update
for {
updates = pkg.Filter(q.Copy(), func(update Update) bool {
return update.CreatedAt > lastUpdateUnix
})
if len(updates) != 0 {
break
}
select {
case <-c.Request().Context().Done():
case <-time.After(time.Second):
}
}
return c.JSON(200, updates)
})
e.POST("send", func(c echo.Context) error {
...
q.Append(Update{
CreatedAt: time.Now().Unix(),
Message: request.Message,
})
...
})
e.Logger.Fatal(e.Start(":8000"))
}
这似乎令人困惑,但让我们来看看它。
-
我创建了 Update 类型,它允许我们保存每条消息的创建时间。
-
如果客户端向我们发送了上次收到的消息的时间戳,我们将过滤消息并仅保留新消息。我们循环执行此操作,以确保如果没有新的更新,我们将等待并重试,直到收到新消息。
建立 TCP 连接的成本很高。我们不需要每秒建立一个新的TCP连接。
客户端不需要每秒发出一个新请求。打开 I/O 会占用大量 CPU,如果我们每秒发出一个请求,较旧的 I/O 会过热。
我们在没有将新技术引入生产的情况下解决了这些问题。
我们还有什么问题?
-
该应用程序不是实时的!我们有大约 500 毫秒的延迟。
-
我们每秒检查一次新的更新,这可能会给数据库带来压力。
我们可以使用 pub-sub,比如 Redis,来解决一些提到的问题。如果我们在发布订阅频道上收听,则无需每秒检查一次新的更新。在用户创建新更新后,每个请求都会收到通知,并向客户端显示新的更新。
type PubSub struct {
channels []chan struct{}
lock *sync.RWMutex
}
func NewPubSub() *PubSub {
return &PubSub{
channels: make([]chan struct{}, 0),
lock: new(sync.RWMutex),
}
}
func (p *PubSub) Subscribe() (<-chan struct{}, func()) {
p.lock.Lock()
defer p.lock.Unlock()
c := make(chan struct{}, 1)
p.channels = append(p.channels, c)
return c, func() {
p.lock.Lock()
defer p.lock.Unlock()
for i, channel := range p.channels {
if channel == c {
p.channels = append(p.channels[:i], p.channels[i+1:]...)
close(c)
return
}
}
}
}
func (p *PubSub) Publish() {
p.lock.RLock()
defer p.lock.RUnlock()
for _, channel := range p.channels {
channel <- struct{}{}
}
}
...
func main() {
q := queue.NewCappedQueue[Update](10)
ps := pubsub.NewPubSub()
...
e.GET("updates", func(c echo.Context) error {
lastUpdate := c.QueryParam("lastUpdate")
lastUpdateUnix, _ := strconv.ParseInt(lastUpdate, 10, 64)
getUpdates := func() []Update {
return pkg.Filter(q.Copy(), func(update Update) bool {
return update.CreatedAt > lastUpdateUnix
})
}
// show it to user if we already have an update
if updates := getUpdates(); len(updates) > 0 {
return c.JSON(200, updates)
}
ch, close := ps.Subscribe()
defer close()
select {
case <-ch:
return c.JSON(200, getUpdates())
case <-c.Request().Context().Done():
return c.String(http.StatusRequestTimeout, "timeout")
}
})
e.POST("send", func(c echo.Context) error {
...
ps.Publish()
return c.JSON(201, "I've sent your request.")
})
...
}
在某些情况下,我们可以使用长轮询而不是套接字。它为我们提供了一个相当简单的通信协议。如果我们没有太多更新(比如群聊!),并且立即更新对我们来说至关重要,那么长轮询是一个不错的选择。
- 点赞
- 收藏
- 关注作者
评论(0)