C/S服务实时双工通信

举报
码乐 发表于 2024/02/21 17:42:12 2024/02/21
【摘要】 2 简介这一节我们实现服务监听客户端,服务处理程序。一句话概括如下:当用户登录到指定页面后,用户保持在线状态时与后端接口建立一个长链接.有相关事件发生时,而该用户参与(订阅)了相关的事件,那么就推送消息给用户。 2.1 处理消息首先我们创建一个路由组 e = gin.Default() router = e.Group("/")并且初始化一个消息更新器,这用于将指定的http...

2 简介

这一节我们实现服务监听客户端,服务处理程序。

一句话概括如下:

当用户登录到指定页面后,用户保持在线状态时与后端接口建立一个长链接.
有相关事件发生时,而该用户参与(订阅)了相关的事件,那么就推送消息给用户。

2.1 处理消息

首先我们创建一个路由组

  e      = gin.Default()
  router = e.Group("/")

并且初始化一个消息更新器,这用于将指定的http链接升级到websocket 链接。 并发地使用upgrader的方法是安全的。

其中包括以下指定参数: handshaketimeout,握手超时时间,用于指定握手完成的持续时间。

ReadBufferSize 和 WriteBufferSize 指定 I/O 缓冲区的bytes格式大小。如果一个缓冲器大小为0,然后这个缓冲器被http服务分配和使用。

这个I/O缓冲器大小将不会对消息的发生和接收做任何限制。

其Upgrade 方法更新 HTTP 服务链接到 websocket协议链接,这个响应头被包括在到客户端更新请求的响应消息中。

在指定cookies中(Set-Cookie)使用这些响应头. 为了指定子协议支持这个服务,直接设置 Upgrader.Subprotocols。 如果更新失败,这个方法 Upgrade 回复给客户端一个 HTTP错误响应。

       upgrader = websocket.Upgrader{
		CheckOrigin: func(r *http.Request) bool {
			return true
		},
		ReadBufferSize:  1024,
		WriteBufferSize: 1024,
		Subprotocols:    []string{"json"},
	}

写缓冲池 是一个为写操作准备的缓冲器池。如果这个值没有被设置,那么写缓冲器将被分配到链接,伴随整个链接的持续时间。

一个池最常被使用在跨越大量链接的写大量数据的操作。 WriteBufferPool BufferPool

应用程序应该使用一个单独的池,为了每个独特的写缓冲大小。

当链接发生错误时,但是没有没有指定 error 类型,那么http.Error 将被使用做为 http 的响应。

如果请求的源中的 头协议可以被接收,CheckOrigin将返回 true。 如果这个函数为 nil,那么一个安全的默认值 false 将被返回。

这表示此时请求头中的 Origin 是存在的,并且原始主机与请求头中的host 不匹配。

一个CheckOrigin函数应该小心地验证这些请求源,以阻止cross-site 伪造攻击。 CheckOrigin

启用压缩指定,如果这个服务需要对每个消息进行压缩,RFC7692 则设置它为true. EnableCompression
但是设置该参数为true,并不能保证压缩将被支持,现在仅在没有上下文接管时被支持。

由此,我们建立了以json为第二协议的ws连接。 这将 按順序指定服務器支持的协议。 Subprotocols []string

如果此字段不為零,則upgrader方法将从携带子协议的客戶端請求列表中选择,使用协议选择此列表中的第一个匹配項。

如果沒有匹配的,則不需要协议协商。(Sec-Websocket-Protocol 标头不包含在握手响应中)。

子协议返回这些客户端 ,在Sec-Websocket-Protocol 头中的请求子协议。

   Subprotocols(r *http.Request) []string

然后我们封装一个接口,处理服务器发送到客户端的消息:

    func MessageHandler(ctx *gin.Context) { 

		writer := ctx.Writer
		reader := ctx.Request
		conn, err := upgrader.Upgrade(writer, reader, nil)
		if err != nil {
			panic(err)
		}

		writer.Header().Set("Content-Type", "application/json")
		writer.Header().Set("Sec-WebSocket-Protocol", "json")

		client := pubsub.NewWsClient(conn)

		Ps = Ps.AddClient(client) 

		go client.Read()
		go client.Write() 
    }
    
    GET("/message", MessageHandler)

在初始化加载函数中,加载静态页面文件路径到根路径

   e.StaticFile("/", "./static/index.html")

最后启动我们的服务:

    monitorPort := ":3002"
	fmt.Printf("board monitor:%v\n", monitorPort)
	e.Run(monitorPort)

2.2 接收消息

当连接成功后,这里的服务器将返回一个消息如下:

     Welcome to ws client.
     connected to server:ws://localhost:3002/message

在html的页面脚本中,我们创建一个连接名称为socket,

       let addr = 'ws://localhost:3002/message';
       let socket = new WebSocket(addr);

然后加载到浏览器,这样我们可以使用浏览器 console 控制台给服务器接口发消息。
例如

  socket.send('{"action":"subscribe","topic":"init"}')

这将触发服务器订阅规则,使得该客户端订阅 init 事件的全部消息。

当我们从另一个客户端 发一条消息到这个 名称为init 的 topic 时,订阅了这个主题的全部客户端都将收到消息。这已经很类似与IM的房间概念了。
在矩阵协议中,每个服务器都由无数的房间组成,全部服务器构成一个矩阵。

   socket.send('{"action":"publish","topic":"init", "message":"this is from earth."}') 

其订阅者客户端效果如下

 Welcome to ws client. 
 connected to server:ws://localhost:3002/message 
 new message from ws://localhost:3002/message : Subscribe success. 
 new message from ws://localhost:3002/message : {"action":"publish","topic":"init", "message":"this is from earth."}

3 小结

我们完成了客户端到服务器的连接,并且支持客户端与服务的交互,一些客户端订阅主题消息,因此这些客户端可以接收到或不能接收到消息。

参考连接:

互联网档案馆
【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。