在客户端或者页面进行消息推送
1 前言
从服务推送消息到客户端,通常是件麻烦事,因为请求调用通常在客户端发起,而http协议为请求一次,响应一次或多次。
如果服务器有频繁推送需求,现在有大量的解决方法,请看之前写的服务推送http消息的10大方法。而如果需要客户端到服务的建立长期连接,可以通过websocket这类的长链接执行。
2.1 消息推送
我们将设计一个简单的推送系统,其数据保存在内存的键值对中,只要是订阅者,都可以收到消息。
我们之前创建了一个连接,用于连接服务。
然后我们创建了一个扫描器, 返回一个布尔状态,当服务一直存在时,我们可以接受数据并处理他们。
因此我们仍然使用tcp的方式订阅频道,并实现监控功能。
2.2 程序客户端:开启实时页面面板, 开始搜索和预定操作,选择是否需要监控
在启动预定模拟程序时,如果缓存服务不在本地localhost,那么需要指定缓存服务地址
.\monitor 192.168.3.1:6379
否则直接启动
.\monitor
需要指定是否开启实时监控,有提示如下
[WARNING] => 22:30:24 main.go:40: #########Start Monitor request...###########
将要进行30次查询和订票,是否启用监控后台 true 或 false:
选择 true, 表示需要开启监控,如果缓存服务地址输入错误,那么缓存服务连接失败,则直接退出
panic: 缓存连接失败:&net.OpError{Op:"dial", Net:"tcp", Source:net.Addr(nil), Addr:(*net.TCPAddr)(0xc0000bd320), Err:(*os.SyscallError)(0xc000052100)}
选择false, 表示不需要开启监控,直接发起搜索和预定操作。
2.3 程序客户端:控制台提示解读
在执行搜索和预定时,服务器将返回一系列消息,他们分别表示如下含义:
预定的信息提示
[INFO -]22:20:26 restparams.go:114: booked info: id=en00029
请求发起后,传输到哪里,以那种方式预定的
Request INFO -22:20:26 requests.go:129: POST <-- URL ->> : http://127.0.0.1:8080/resource/search ,request body: &map[id:[en00029]]
服务的响应信息提示:
"{\"code\":200,\"data\":{\"Visits\":0,\"created_at\":1672928426453,\"founded\":1672729697872,\"id\":\"en00029\",\"introduction\":\"new one to visitor.\",\"price\":10,\"title\":\"asia_owl_01\",\"url\":\"zoo.asdaliyun.com/oss/owl\"},\"message\":\"Success\"}"
一次预定完成的提示
Booked Done.
2.4 程序客户端: 监控信息
如果启用了监控服务,在每次预定成功后,将会返回当前的预定状态
首先是三个分隔行
============================================================================================
然后是返回的原始状态数据
monitor cache service msg length of :31 new book msg:{M:1672929371945 1672929370942:745:11010:en00028}
继续是状态信息解析,分别显示
总预定:745
总销售额:11010
新增预定:en00029
2.5 接口:实现服务消息推送到页面
我们将推送消息结构体定义如下,Message 为将要推送的消息通道,NewClients为新的客户端对象,
type ServiceMsg struct {
Message chan string
NewClients chan chan string
ClosedClients chan chan string
TotalClients map[chan string]bool
}
在新建服务消息监听对象时,初始化为空通道
ServiceMsgListen = &ServiceMsg{
Message: make(chan string),
NewClients: make(chan chan string),
ClosedClients: make(chan chan string),
TotalClients: make(map[chan string]bool),
}
在监听进程中,处理和删除全部客户端的请求,添加活跃客户端,并广播给全部活跃的客户端连接。
删除已被移除的客户端,它们已经被服务断开,或已经从客户端侧断开
select {
case client := <-stream.NewClients:
stream.TotalClients[client] = true
log.Printf(" registered " )
case client := <-stream.ClosedClients:
delete(stream.TotalClients, client)
log.Printf(" unregistered " )
case eventMsg := <-stream.Message:
for clientMessageChan := range stream.TotalClients {
clientMessageChan <- eventMsg
}
}
在推送服务中,初始化客户端通道,并负责处理服务端的关闭和客户端的关闭操作。 关闭
clientChan := make(ClientChan)
stream.NewClients <- clientChan
defer func() {
stream.ClosedClients <- clientChan
}()
go func() {
<-c.Done()
stream.ClosedClients <- clientChan
}()
c.Next()
在请求头中,不保留任何缓存,并且要求客户端保持连接不要断开
c.Writer.Header().Set("Cache-Control", "no-cache")
c.Writer.Header().Set("Connection", "keep-alive")
最后在主服务路由中,将消息绑定到客户端通道
GET("/board", func(ctx *gin.Context) {
Service(ctx, stream)
})
其中有新的消息到来时,放入到流服务的Message中
stream.Message <- PushMessage
并且在协程中处理消息的发送,从消息通道发送到流客户端
ctx.Stream(func(w io.Writer) bool {
if msg, ok := <-stream.Message; ok {
ctx.SSEvent("message", msg)
}
})
2.6 启动实时页面面板
下载示例程序
https://github.com/hahamx/examples/tree/main/service/v1_monitor/service_web_push.exe
在启动预定模拟程序时,如果缓存服务不在本地localhost,那么需要指定缓存服务地址
.\service_web_push.exe
mac或linux则直接启动
.\service_web_push
该服务将启动于8085端口,因此只需要在web页面(chrome)访问如下地址
http://127.0.0.1:8085/board
基础认证用户名密码为 admin/123456
需要指定是否开启实时监控,页面有提示如下
data:The Board started. Current Time Is 2022-11-06 20:02:22
选择客户端模拟发起预定请求,将有以下消息同步推送到web界面
data:booked: 25, sold: 370, current booked: en00029
data:booked: 26, sold: 385, current booked: en00028
data:booked: 27, sold: 405, current booked: en00030
其页面效果如下
3 小结
这一节实现了简单的广播消息推送服务,分别实现了从客户端接收 和 从web页面接收。
消息推送通常在IM应用中有广泛使用,在网站开始也有一些应用。 频繁的推送也造成骚扰。
- 点赞
- 收藏
- 关注作者
评论(0)