【Go语言实战】基于 WebSocket + MongoDB 的IM即时聊天Demo

举报
小生凡一 发表于 2021/12/10 00:55:09 2021/12/10
【摘要】 文章目录 写在前面1. WebSocket原理2. 具体流程2.1 定义类型2.2 进行连接2.2.1 服务器监听连接2.2.2 服务器监听断开连接2.2.3 用户连接服务器 2.3 写入2...

写在前面

这个项目是基于WebSocket + MongoDB + MySQL + Redis。
业务逻辑很简单,只是两人的聊天。

  • MySQL 用来存储用户基本信息
  • MongoDB 用来存放用户聊天信息
  • Redis 用来存储处理过期信息

github地址

https://github.com/CocaineCong/gin-chat-demo

1. WebSocket原理

WebSocket是应用层第七层上的一个应用层协议,它必须依赖 HTTP 协议进行一次握手。
握手成功后,数据就直接从TCP通道传输,与HTTP无关了。即:WebSocket分为握手和数据传输阶段。
即进行了HTTP握手 + 双工的TCP连接。

WebSocket 是一种在单个TCP连接上进行全双工通信的协议。

WebSocket 使得客户端和服务器之间的数据交换变得更加简单,允许服务端主动向客户端推送数据。

在这里插入图片描述
如果只是想左图这样的不断发送http请求,轮询的效率是非常低,非常浪费资源,所以就有了websocket协议了,建立在 TCP 协议之上,服务器端的实现比较容易。

WebSocket协议一旦建立之后,互相沟通所消耗的请求头是很小的,服务器向客户端推送消息的功耗就小了。

2. 具体流程

2.1 定义类型

  • 发送消息的结构体
type SendMsg struct {
	Type int `json:"type"`
	Content string `json:"content"`
}

  
 
  • 1
  • 2
  • 3
  • 4
  • 回复消息的结构体
type ReplyMsg struct {
	From 	string  `json:"from"`
	Code 	int 	`json:"code"`
	Content string  `json:"content"`
}

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 用户结构体
type Client struct {
	ID 		string
	SendID 	string
	Socket  *websocket.Conn
	Send chan []byte
}

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 广播类(包括广播内容和源用户)
type Broadcast struct {
	Client 	*Client
	Message []byte
	Type 	int
}

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 用户管理
type ClientManager struct {
	Clients 	map[string]*Client
	Broadcast 	chan *Broadcast
	Reply 		chan *Client
	Register 	chan *Client
	Unregister  chan *Client
}

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 信息转JSON (包括:发送者、接收者、内容)
type Message struct {
	Sender 		string 		`json:"sender,omitempty"`
	Recipient 	string 		`json:"recipient,omitempty"`
	Content 	string 		`json:"content,omitempty"`
}

  
 
  • 1
  • 2
  • 3
  • 4
  • 5

2.2 进行连接

  • 定义一个管理Manager
var Manager  = ClientManager{
	Clients	 : 	make(map[string]*Client),   // 参与连接的用户,出于性能的考虑,需要设置最大连接数
	Broadcast:	make(chan *Broadcast),
	Register : 	make(chan *Client),
	Reply 	 : 	make(chan *Client),
	Unregister:	make(chan *Client),
}

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

2.2.1 服务器监听连接

用 for 不断进行监听查看哪个用户进入通道通信,对用户一旦有用户进来,就 Register 进行注册

for {
	case conn := <- Manager.Register:
		log.Printf("建立新连接: %v", conn.ID)
		Manager.Clients[conn.ID] = conn
		replyMsg := &ReplyMsg{
			Code:    e.WebsocketSuccess,
			Content: "已连接至服务器",
		}
		msg , _ := json.Marshal(replyMsg)
		_ = conn.Socket.WriteMessage(websocket.TextMessage, msg)
	}

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

2.2.2 服务器监听断开连接

同样的,也可以用来对服务器和用户之间连接的断开。

	case conn := <-Manager.Unregister: // 断开连接
			log.Printf("连接失败:%v", conn.ID)
			if _, ok := Manager.Clients[conn.ID]; ok {
				replyMsg := &ReplyMsg{
					Code:    e.WebsocketEnd,
					Content: "连接已断开",
				}
				msg , _ := json.Marshal(replyMsg)
				_ = conn.Socket.WriteMessage(websocket.TextMessage, msg)
				close(conn.Send)
				delete(Manager.Clients, conn.ID)
			}

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

2.2.3 用户连接服务器

我们采用的是gin框架,所以这里我们可以先引入路由

	r := gin.Default()
	r.Use(gin.Recovery(),gin.Logger())
	v1 := r.Group("/")
	{
		v1.GET("ping", func(c *gin.Context) {
			c.JSON(200,"SUCCESS")
		})
		v1.GET("ws",service.WsHandler)
	}

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

再在service层创建一个handler处理

  • 读取两人的id
	uid:=c.Query("uid") // 自己的id
	toUid:=c.Query("toUid") // 对方的id

  
 
  • 1
  • 2
  • 升级ws协议
	conn, err := (&websocket.Upgrader{
		CheckOrigin: func(r *http.Request) bool { // CheckOrigin解决跨域问题
		return true
	}}).Upgrade(c.Writer, c.Request, nil) // 升级成ws协议

  
 
  • 1
  • 2
  • 3
  • 4
  • 创建用户实例
	client := &Client{
		ID 		: createId(uid,toUid),
		SendID	: createId(toUid,uid),
		Socket	: conn,
		Send	: make(chan []byte),
	}

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 用户注册到用户管理上面
Manager.Register <- client

  
 
  • 1
  • 开通两个协程, 一个读,一个写
	go client.Read()
	go client.Write()

  
 
  • 1
  • 2

2.3 写入

2.3.1 定义类型

我们定义的接受类型是json形式,结构体如下
我们这里设计了几个type

  • type = 1 接受消息

  • type = 2 获取历史消息

type SendMsg struct {
	Type    int `json:"type"`
	Content string `json:"content"`
}

  
 
  • 1
  • 2
  • 3
  • 4

2.3.2 读取数据

先用 PongHandler 返回当前的 socket 对象

	c.Socket.PongHandler()
	sendMsg := new(SendMsg)
	// _,msg,_:=c.Socket.ReadMessage() // 不是json格式用这个
	err := c.Socket.ReadJSON(&sendMsg)  // json格式就用这个

  
 
  • 1
  • 2
  • 3
  • 4

2.3.3 接受消息

如果传过来的type=1的话,那么我们就可以先去redis上面查询一下当前有多少人进行了连接。

	r1 ,_ := cache.RedisClient.Get(c.ID).Result()
	r2 ,_ := cache.RedisClient.Get(c.SendID).Result()

  
 
  • 1
  • 2

如果有三个人在线上,并且没有接受消息的话,就拒绝访问。

	replyMsg := &ReplyMsg{
		Code:    e.WebsocketLimit,
		Content: "达到限制",
	}
	msg , _ := json.Marshal(replyMsg)
	_ = c.Socket.WriteMessage(websocket.TextMessage, msg)

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

如果没有的话,就先记录到redis中进行缓存

	cache.RedisClient.Incr(c.ID)
	_ , _ =cache.RedisClient.Expire(c.ID,time.Hour*24*30*3).Result()

  
 
  • 1
  • 2

之后,我们再进行广播消息

Manager.Broadcast <- &Broadcast{
	Client:c,
	Message:[]byte(sendMsg.Content),
}

  
 
  • 1
  • 2
  • 3
  • 4

2.3.3 获取历史消息

那这个时候我们传来的 type 就等于 2,Content就是时间戳

我们设置的话,是只保存三个月的,三个月过后我们就可以删除了。

	timeT, err := strconv.Atoi(sendMsg.Content) // 传送来时间
	if err != nil {
		timeT = 9999999
	}
	results, _ := FindManyMsg(conf.MongoDBName,c.SendID,c.ID,int64(timeT),10)

  
 
  • 1
  • 2
  • 3
  • 4
  • 5

这个FindManyMsg后面再说
返回前十条

	if len(results) > 10 {
		results = results[:10]
	}else if len(results) == 0{
		replyMsg := &ReplyMsg{
			Code:e.WebsocketEnd,
			Content:"到底了",
	}

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

写入返回

	msg , _ := json.Marshal(replyMsg)
	_ = c.Socket.WriteMessage(websocket.TextMessage,msg)

  
 
  • 1
  • 2

2.4 读取

我们用一个for循环进行消息的读取。

如果有消息的话,就WriteMessage写下来。发送出去。

for{
	select {
	case message, ok := <-c.Send :
		if !ok {
			_=c.Socket.WriteMessage(websocket.CloseMessage,[]byte{})
			return
		}
		log.Println(c.ID,"接受消息:",string(message))
		replyMsg := &ReplyMsg{
			Code:e.WebsocketSuccessMessage,
			Content:fmt.Sprintf("%s",string(message)),
		}
		msg , _ := json.Marshal(replyMsg)
		_ = c.Socket.WriteMessage(websocket.TextMessage, msg)
	}
}

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16

2.5 插入与查询

2.5.1 插入数据

我们使用的是mongoDB进行消息的存储,MongoDB的插入非常简单,文档数据库,插入json格式即可。

  • 定义一个存储的数据类型
type Trainer struct {
	Content 	string `bson:"content"`   // 内容
	StartTime 	int64  `bson:"startTime"` // 创建时间
	EndTime 	int64  `bson:"endTime"`   // 过期时间
	Read 		uint   `bson:"read"`      // 已读
}

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 传入数据库,用户ID,内容,是否已读,过期时间
func InsertMsg(database string, id string, content string, read uint, expire int64) (err error) {
	collection := conf.MongoDBClient.Database(database).Collection(id)
	comment := ws.Trainer{
		Content:   content,
		StartTime: time.Now().Unix(),
		EndTime:   time.Now().Unix() + expire,
		Read:      read,
	}
	_, err = collection.InsertOne(context.TODO(),comment)
	return
}

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

2.5.2 查询数据

MongoDB的查询也非常容易,按照json格式进行查询。

  • 定义一个存储对象的切片
var resultsMe []ws.Trainer

  
 
  • 1
  • 通过用户id查询所有的用户消息
idCollection := conf.MongoDBClient.Database(database).Collection(id)

  
 
  • 1
  • 根据传入的time 定义一个过滤器,进行这个时间内的查询。
filter := bson.M{"startTime": bson.M{"$lt": time}}

  
 
  • 1
  • 根据filter进行查询,然后再通过时间进行倒序排序,并且限定页数
sendIdTimeCursor, err := sendIdCollection.Find(context.TODO(), filter,
	options.Find().SetSort(bson.D{{"StartTime", -1}}), options.Find().
	SetLimit(int64(pageSize)))

  
 
  • 1
  • 2
  • 3
  • 把数据查询数据传入到resultsMe中
err = idTimeCurcor.All(context.TODO(), &resultsMe) 

  
 
  • 1

2.6 对方不在线

  • 广播信息
	case broadcast := <-Manager.Broadcast:
		message := broadcast.Message
		sendId := broadcast.Client.SendID
		flag := false // 默认对方不在线

  
 
  • 1
  • 2
  • 3
  • 4
  • 如果没有这个人的话就一直找就可以了
for id, conn := range Manager.Clients {
	if id != sendId {
		continue
	}
	select {
	case conn.Send <- message:
		flag = true
	default:
		close(conn.Send)
		delete(Manager.Clients, conn.ID)
	}
}

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 还是找到的话

就可以当作已读信息,存储

if flag {
	log.Println("对方在线应答")
	replyMsg := &ReplyMsg{
		Code:    e.WebsocketOnlineReply,
		Content: "对方在线应答",
	}
	msg , err := json.Marshal(replyMsg)
	_ = broadcast.Client.Socket.WriteMessage(websocket.TextMessage, msg)
	err = InsertMsg(conf.MongoDBName, id, string(message), 1, int64(3*month))
	if err != nil {
		fmt.Println("InsertOneMsg Err", err)
	}
}

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 如果没有找到的话,就是未读消息了。
else {
	log.Println("对方不在线")
	replyMsg := ReplyMsg{
		Code:    e.WebsocketOfflineReply,
		Content: "对方不在线应答",
	}
	msg , err := json.Marshal(replyMsg)
	_ = broadcast.Client.Socket.WriteMessage(websocket.TextMessage, msg)
	err = InsertMsg(conf.MongoDBName, id, string(message), 0, int64(3*month))
	if err != nil {
		fmt.Println("InsertOneMsg Err", err)
	}
}

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

3. 演示

  • 测试http连接

在这里插入图片描述

  • 进行ws连接,连接服务器

在这里插入图片描述

  • 当id=1上线,但是id=2没上线的时候发送消息
    在这里插入图片描述
  • 当id=2上线之后

在这里插入图片描述

  • 再次发消息,就是在线应答了

在这里插入图片描述

  • 这边就实时接受到消息了
    在这里插入图片描述
  • 获取历史信息

在这里插入图片描述

4. 源码地址

github地址

https://github.com/CocaineCong/gin-chat-demo

文章来源: blog.csdn.net,作者:小生凡一,版权归原作者所有,如需转载,请联系作者。

原文链接:blog.csdn.net/weixin_45304503/article/details/121787022

【版权声明】本文为华为云社区用户转载文章,如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。