【Go语言实战】基于 WebSocket + MongoDB 的IM即时聊天Demo
写在前面
这个项目是基于WebSocket + MongoDB + MySQL + Redis。
业务逻辑很简单,只是两人的聊天。
MySQL
用来存储用户基本信息MongoDB
用来存放用户聊天信息Redis
用来存储处理过期信息
github地址
https://github.com/CocaineCong/gin-chat-demo
1. WebSocket原理
WebSocket
是应用层第七层上的一个应用层协议,它必须依赖 HTTP 协议进行一次握手。
握手成功后,数据就直接从TCP
通道传输,与HTTP
无关了。即:WebSocket
分为握手和数据传输阶段。
即进行了HTTP握手 + 双工的TCP连接。
WebSocket 是一种在单个TCP连接上进行全双工通信
的协议。
WebSocket 使得客户端和服务器之间的数据交换变得更加简单,允许服务端主动向客户端推送数据。
如果只是想左图这样的不断发送http请求,轮询的效率是非常低,非常浪费资源,所以就有了websocket协议了,建立在 TCP 协议之上,服务器端的实现比较容易。
WebSocket协议一旦建立之后,互相沟通所消耗的请求头是很小的,服务器向客户端推送消息的功耗就小了。
2. 具体流程
2.1 定义类型
- 发送消息的结构体
type SendMsg struct {
Type int `json:"type"`
Content string `json:"content"`
}
- 1
- 2
- 3
- 4
- 回复消息的结构体
type ReplyMsg struct {
From string `json:"from"`
Code int `json:"code"`
Content string `json:"content"`
}
- 1
- 2
- 3
- 4
- 5
- 用户结构体
type Client struct {
ID string
SendID string
Socket *websocket.Conn
Send chan []byte
}
- 1
- 2
- 3
- 4
- 5
- 6
- 广播类(包括广播内容和源用户)
type Broadcast struct {
Client *Client
Message []byte
Type int
}
- 1
- 2
- 3
- 4
- 5
- 用户管理
type ClientManager struct {
Clients map[string]*Client
Broadcast chan *Broadcast
Reply chan *Client
Register chan *Client
Unregister chan *Client
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 信息转JSON (包括:发送者、接收者、内容)
type Message struct {
Sender string `json:"sender,omitempty"`
Recipient string `json:"recipient,omitempty"`
Content string `json:"content,omitempty"`
}
- 1
- 2
- 3
- 4
- 5
2.2 进行连接
- 定义一个管理
Manager
var Manager = ClientManager{
Clients : make(map[string]*Client), // 参与连接的用户,出于性能的考虑,需要设置最大连接数
Broadcast: make(chan *Broadcast),
Register : make(chan *Client),
Reply : make(chan *Client),
Unregister: make(chan *Client),
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
2.2.1 服务器监听连接
用 for 不断进行监听查看哪个用户进入通道通信,对用户一旦有用户进来,就 Register 进行注册
for {
case conn := <- Manager.Register:
log.Printf("建立新连接: %v", conn.ID)
Manager.Clients[conn.ID] = conn
replyMsg := &ReplyMsg{
Code: e.WebsocketSuccess,
Content: "已连接至服务器",
}
msg , _ := json.Marshal(replyMsg)
_ = conn.Socket.WriteMessage(websocket.TextMessage, msg)
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
2.2.2 服务器监听断开连接
同样的,也可以用来对服务器和用户之间连接的断开。
case conn := <-Manager.Unregister: // 断开连接
log.Printf("连接失败:%v", conn.ID)
if _, ok := Manager.Clients[conn.ID]; ok {
replyMsg := &ReplyMsg{
Code: e.WebsocketEnd,
Content: "连接已断开",
}
msg , _ := json.Marshal(replyMsg)
_ = conn.Socket.WriteMessage(websocket.TextMessage, msg)
close(conn.Send)
delete(Manager.Clients, conn.ID)
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
2.2.3 用户连接服务器
我们采用的是gin框架
,所以这里我们可以先引入路由
r := gin.Default()
r.Use(gin.Recovery(),gin.Logger())
v1 := r.Group("/")
{
v1.GET("ping", func(c *gin.Context) {
c.JSON(200,"SUCCESS")
})
v1.GET("ws",service.WsHandler)
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
再在service层
创建一个handler
处理
- 读取两人的id
uid:=c.Query("uid") // 自己的id
toUid:=c.Query("toUid") // 对方的id
- 1
- 2
- 升级ws协议
conn, err := (&websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool { // CheckOrigin解决跨域问题
return true
}}).Upgrade(c.Writer, c.Request, nil) // 升级成ws协议
- 1
- 2
- 3
- 4
- 创建用户实例
client := &Client{
ID : createId(uid,toUid),
SendID : createId(toUid,uid),
Socket : conn,
Send : make(chan []byte),
}
- 1
- 2
- 3
- 4
- 5
- 6
- 用户注册到用户管理上面
Manager.Register <- client
- 1
- 开通两个协程, 一个读,一个写
go client.Read()
go client.Write()
- 1
- 2
2.3 写入
2.3.1 定义类型
我们定义的接受类型是json形式,结构体如下
我们这里设计了几个type
-
type = 1 接受消息
-
type = 2 获取历史消息
type SendMsg struct {
Type int `json:"type"`
Content string `json:"content"`
}
- 1
- 2
- 3
- 4
2.3.2 读取数据
先用 PongHandler
返回当前的 socket
对象
c.Socket.PongHandler()
sendMsg := new(SendMsg)
// _,msg,_:=c.Socket.ReadMessage() // 不是json格式用这个
err := c.Socket.ReadJSON(&sendMsg) // json格式就用这个
- 1
- 2
- 3
- 4
2.3.3 接受消息
如果传过来的type=1
的话,那么我们就可以先去redis上面查询一下当前有多少人进行了连接。
r1 ,_ := cache.RedisClient.Get(c.ID).Result()
r2 ,_ := cache.RedisClient.Get(c.SendID).Result()
- 1
- 2
如果有三个人在线上,并且没有接受消息的话,就拒绝访问。
replyMsg := &ReplyMsg{
Code: e.WebsocketLimit,
Content: "达到限制",
}
msg , _ := json.Marshal(replyMsg)
_ = c.Socket.WriteMessage(websocket.TextMessage, msg)
- 1
- 2
- 3
- 4
- 5
- 6
如果没有的话,就先记录到redis
中进行缓存
cache.RedisClient.Incr(c.ID)
_ , _ =cache.RedisClient.Expire(c.ID,time.Hour*24*30*3).Result()
- 1
- 2
之后,我们再进行广播消息
Manager.Broadcast <- &Broadcast{
Client:c,
Message:[]byte(sendMsg.Content),
}
- 1
- 2
- 3
- 4
2.3.3 获取历史消息
那这个时候我们传来的 type 就等于 2
,Content就是时间戳
了
我们设置的话,是只保存三个月的,三个月过后我们就可以删除了。
timeT, err := strconv.Atoi(sendMsg.Content) // 传送来时间
if err != nil {
timeT = 9999999
}
results, _ := FindManyMsg(conf.MongoDBName,c.SendID,c.ID,int64(timeT),10)
- 1
- 2
- 3
- 4
- 5
这个FindManyMsg
后面再说
返回前十条
if len(results) > 10 {
results = results[:10]
}else if len(results) == 0{
replyMsg := &ReplyMsg{
Code:e.WebsocketEnd,
Content:"到底了",
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
写入返回
msg , _ := json.Marshal(replyMsg)
_ = c.Socket.WriteMessage(websocket.TextMessage,msg)
- 1
- 2
2.4 读取
我们用一个for循环进行消息的读取。
如果有消息的话,就WriteMessage
写下来。发送出去。
for{
select {
case message, ok := <-c.Send :
if !ok {
_=c.Socket.WriteMessage(websocket.CloseMessage,[]byte{})
return
}
log.Println(c.ID,"接受消息:",string(message))
replyMsg := &ReplyMsg{
Code:e.WebsocketSuccessMessage,
Content:fmt.Sprintf("%s",string(message)),
}
msg , _ := json.Marshal(replyMsg)
_ = c.Socket.WriteMessage(websocket.TextMessage, msg)
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
2.5 插入与查询
2.5.1 插入数据
我们使用的是mongoDB进行消息的存储,MongoDB的插入非常简单,文档数据库,插入json格式即可。
- 定义一个存储的数据类型
type Trainer struct {
Content string `bson:"content"` // 内容
StartTime int64 `bson:"startTime"` // 创建时间
EndTime int64 `bson:"endTime"` // 过期时间
Read uint `bson:"read"` // 已读
}
- 1
- 2
- 3
- 4
- 5
- 6
- 传入数据库,用户ID,内容,是否已读,过期时间
func InsertMsg(database string, id string, content string, read uint, expire int64) (err error) {
collection := conf.MongoDBClient.Database(database).Collection(id)
comment := ws.Trainer{
Content: content,
StartTime: time.Now().Unix(),
EndTime: time.Now().Unix() + expire,
Read: read,
}
_, err = collection.InsertOne(context.TODO(),comment)
return
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
2.5.2 查询数据
MongoDB的查询也非常容易,按照json格式进行查询。
- 定义一个存储对象的切片
var resultsMe []ws.Trainer
- 1
- 通过用户id查询所有的用户消息
idCollection := conf.MongoDBClient.Database(database).Collection(id)
- 1
- 根据传入的
time
定义一个过滤器,进行这个时间内的查询。
filter := bson.M{"startTime": bson.M{"$lt": time}}
- 1
- 根据
filter
进行查询,然后再通过时间进行倒序排序
,并且限定页数
。
sendIdTimeCursor, err := sendIdCollection.Find(context.TODO(), filter,
options.Find().SetSort(bson.D{{"StartTime", -1}}), options.Find().
SetLimit(int64(pageSize)))
- 1
- 2
- 3
- 把数据查询数据传入到resultsMe中
err = idTimeCurcor.All(context.TODO(), &resultsMe)
- 1
2.6 对方不在线
- 广播信息
case broadcast := <-Manager.Broadcast:
message := broadcast.Message
sendId := broadcast.Client.SendID
flag := false // 默认对方不在线
- 1
- 2
- 3
- 4
- 如果没有这个人的话就一直找就可以了
for id, conn := range Manager.Clients {
if id != sendId {
continue
}
select {
case conn.Send <- message:
flag = true
default:
close(conn.Send)
delete(Manager.Clients, conn.ID)
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 还是找到的话
就可以当作已读信息,存储
if flag {
log.Println("对方在线应答")
replyMsg := &ReplyMsg{
Code: e.WebsocketOnlineReply,
Content: "对方在线应答",
}
msg , err := json.Marshal(replyMsg)
_ = broadcast.Client.Socket.WriteMessage(websocket.TextMessage, msg)
err = InsertMsg(conf.MongoDBName, id, string(message), 1, int64(3*month))
if err != nil {
fmt.Println("InsertOneMsg Err", err)
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 如果没有找到的话,就是未读消息了。
else {
log.Println("对方不在线")
replyMsg := ReplyMsg{
Code: e.WebsocketOfflineReply,
Content: "对方不在线应答",
}
msg , err := json.Marshal(replyMsg)
_ = broadcast.Client.Socket.WriteMessage(websocket.TextMessage, msg)
err = InsertMsg(conf.MongoDBName, id, string(message), 0, int64(3*month))
if err != nil {
fmt.Println("InsertOneMsg Err", err)
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
3. 演示
- 测试http连接
- 进行ws连接,连接服务器
- 当id=1上线,但是id=2没上线的时候发送消息
- 当id=2上线之后
- 再次发消息,就是在线应答了
- 这边就实时接受到消息了
- 获取历史信息
4. 源码地址
github地址
https://github.com/CocaineCong/gin-chat-demo
文章来源: blog.csdn.net,作者:小生凡一,版权归原作者所有,如需转载,请联系作者。
原文链接:blog.csdn.net/weixin_45304503/article/details/121787022
- 点赞
- 收藏
- 关注作者
评论(0)