【Go语言实战】 (11) go-micro微服务 实现简单备忘录 (下) | 备忘录模块
写在前面
这一章节我们继续前一章的内容,将备忘录模块完善,我们将使用
RabbitMQ
作为消息队列去创建备忘录
1. RabbitMQ创建备忘录
1.1 导入配置
导入配置
[rabbitmq]
RabbitMQ = amqp
RabbitMQUser = guest
RabbitMQPassWord = guest
RabbitMQHost = localhost
RabbitMQPort = 5672
- 1
- 2
- 3
- 4
- 5
- 6
加载配置
func LoadRabbitMQ(file *ini.File) {
RabbitMQ = file.Section("rabbitmq").Key("RabbitMQ").String()
RabbitMQUser = file.Section("rabbitmq").Key("RabbitMQUser").String()
RabbitMQPassWord = file.Section("rabbitmq").Key("RabbitMQPassWord").String()
RabbitMQHost = file.Section("rabbitmq").Key("RabbitMQHost").String()
RabbitMQPort = file.Section("rabbitmq").Key("RabbitMQPort").String()
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
连接RabbitMQ
// MQ rabbitMQ链接单例
var MQ *amqp.Connection
// 初始化rabbitMQ链接
func RabbitMQ(connString string) {
conn, err := amqp.Dial(connString)
if err != nil {
panic(err)
}
MQ = conn
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
1.2 proto
- task/services/protos
1.2.1 taskModels.proto
定义了task的proto模型
syntax="proto3";
package services;
option go_package ="./;protos";
message TaskModel{
//@inject_tag: json:"Id" form:"Id"
uint64 Id = 1;
//@inject_tag: json:"Uid" form:"Uid"
uint64 Uid = 2;
//@inject_tag: json:"Title" form:"Title"
string Title = 3;
//@inject_tag: json:"Content" form:"Content"
string Content = 4;
//@inject_tag: json:"StartTime" form:"StartTime"
int64 StartTime = 5;
//@inject_tag: json:"EndTime" form:"EndTime"
int64 EndTime = 6;
//@inject_tag: json:"Status" form:"Status"
int64 Status = 7;
//@inject_tag: json:"CreateTime" form:"CreateTime"
int64 CreateTime = 8;
//@inject_tag: json:"UpdateTime" form:"UpdateTime"
int64 UpdateTime = 9;
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
执行protoc生成pb文件
protoc --proto_path=. --micro_out=. --go_out=. taskModels.proto
- 1
1.2.2 taskService.proto
定义了taskRequest,task的请求参数。
定义了TaskListResponse,task列表的响应参数。
定义了TaskDetailResponse,task列表的详细信息。
定义了TaskService,都是定义一些增删改查的服务。
syntax="proto3";
package services;
import "taskModels.proto";
option go_package = "./;protos";
message TaskRequest{
//@inject_tag: json:"Id" form:"Id"
uint64 Id = 1;
//@inject_tag: json:"Uid" form:"Uid"
uint64 Uid = 2;
//@inject_tag: json:"Title" form:"Title"
string Title = 3;
//@inject_tag: json:"Content" form:"Content"
string Content = 4;
//@inject_tag: json:"StartTime" form:"StartTime"
int64 StartTime = 5;
//@inject_tag: json:"EndTime" form:"EndTime"
int64 EndTime = 6;
//@inject_tag: json:"Status" form:"Status"
int64 Status = 7;
// @inject_tag: json:"Start" form:"Start" uri:"Start"
uint32 Start = 8;
// @inject_tag: json:"Limit" form:"Limit" uri:"Limit"
uint32 Limit = 9;
}
message TaskListResponse{
repeated TaskModel TaskList=1;
// @inject_tag: json:"Count"
uint32 Count=2;
}
message TaskDetailResponse{
TaskModel TaskDetail=1;
}
service TaskService{
rpc CreateTask(TaskRequest) returns(TaskDetailResponse);
rpc GetTasksList(TaskRequest) returns(TaskListResponse);
rpc GetTask(TaskRequest) returns(TaskDetailResponse);
rpc UpdateTask(TaskRequest) returns(TaskDetailResponse);
rpc DeleteTask(TaskRequest) returns(TaskDetailResponse);
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
执行protoc生成pb文件
protoc --proto_path=. --micro_out=. --go_out=. taskService.proto
- 1
1.3 写入数据
- task/core/taskService.go
我们在这个go文件中将数据写入RabbitMQ当中。
- 连接通道
ch, err := model.MQ.Channel()
if err != nil {
err = errors.New("rabbitMQ err:" + err.Error())
return err
}
- 1
- 2
- 3
- 4
- 5
- 声明通道队列
q, err := ch.QueueDeclare("task_queue", true, false, false, false, nil)
if err != nil {
err = errors.New("rabbitMQ err:" + err.Error())
return err
}
- 1
- 2
- 3
- 4
- 5
- 将请求的参数序列化,发布到队列中
body, _ := json.Marshal(req)
err = ch.Publish("", q.Name, false, false, amqp.Publishing{
DeliveryMode: amqp.Persistent,
ContentType: "application/json",
Body: body,
})
- 1
- 2
- 3
- 4
- 5
- 6
1.4 读取数据
- mq-server/services/task.go
从RabbitMQ中接收数据信息再写入数据库中
- 打开Channel
ch, err := model.MQ.Channel()
- 1
- 从
task_queue
通道中获取消息
q, err := ch.QueueDeclare("task_queue", true, false, false, false, nil)
if err != nil {
panic(err)
}
- 1
- 2
- 3
- 4
name:队列名称;
durable:是否持久化,队列存盘,true服务重启后信息不会丢失,影响性能;
autoDelete:是否自动删除;
noWait:是否非阻塞,true为是,不等待RMQ返回信息;
args:参数,传nil即可;
exclusive:是否设置排他
消息ACK保证了消息不会丢失,但是当rabbitMQ Server停止(不是consumer 挂掉)的时候,我们的所有消息都会丢失。针对这种情况,我们先确保消息队列的持久化,设置消息队列的durable选项为true
- 公平分派消息
err = ch.Qos(1, 0, false)
if err != nil {
panic(err)
}
- 1
- 2
- 3
- 4
设置Qos,设置预取大小prefetch,当prefetch=1时,表示在没收到consumer的ACK消息之前,只会为其consumer分派一个消息。
- 读出数据
msgs, err := ch.Consume(q.Name, "", false, false, false, false, nil)
- 1
- 从通道中读出数据
将通道的信息,反系列化,然后在数据库中创建。
go func() {
for d := range msgs {
var p model.Task
err := json.Unmarshal(d.Body, &p)
if err != nil {
panic(err)
}
fmt.Println("d.Body",string(d.Body))
model.DB.Create(&p)
log.Printf("Done")
_ = d.Ack(false) // 确认消息,必须为false
}
}()
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
2. 备忘录其他操作
构造一个服务
type TaskService struct {
}
- 1
- 2
- 3
2.1 获取备忘录列表
传入的参数:上下文信息,请求参数,响应参数。
func (*TaskService) GetTasksList(ctx context.Context, req *services.TaskRequest, res *services.TaskListResponse) error {
if req.Limit == 0 {
req.Limit = 6
}
//在数据库查找值
var productData []model.Task
var count uint32
err := model.DB.Offset(req.Start).Limit(req.Limit).Where("uid=?", req.Uid).Find(&productData).Error
if err != nil {
err = errors.New("mysql err:" + err.Error())
return err
}
err = model.DB.Model(&model.Task{}).Where("uid=?", req.Uid).Count(&count).Error
if err != nil {
err = errors.New("mysql err:" + err.Error())
return err
}
//序类化备忘录列表
var taskRes []*services.TaskModel
for _, item := range productData {
taskRes = append(taskRes, BuildTask(item))
}
//序列化后的结果赋给response
res.TaskList = taskRes
res.Count = count
return nil
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
2.2 获取备忘录详情
func (*TaskService) GetTask(ctx context.Context, req *services.TaskRequest, res *services.TaskDetailResponse) error {
//在数据库查找值
productData := model.Task{}
err := model.DB.First(&productData, req.Id).Error
if err != nil {
err = errors.New("mysql err:" + err.Error())
return err
}
//序类化商品
productRes := BuildTask(productData)
//序列化后的结果赋给response
res.TaskDetail = productRes
return nil
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
2.3 更新备忘录
func (*TaskService) UpdateTask(ctx context.Context, req *services.TaskRequest, res *services.TaskDetailResponse) error {
//在数据库查找值
taskData := model.Task{}
err := model.DB.Model(model.Task{}).Where("id = ? AND uid = ?",req.Id,req.Uid).First(&taskData).Error
if err != nil {
err = errors.New("mysql err:" + err.Error())
return err
}
//将要更新的数据赋值给结构体
taskData.Title = req.Title
taskData.Status = int(req.Status)
taskData.Content = req.Content
//update
err = model.DB.Save(&taskData).Error
if err != nil {
err = errors.New("mysql err:" + err.Error())
return err
}
//序列化后的结果赋给response
res.TaskDetail = BuildTask(taskData)
return nil
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
2.4 注册到etcd中
- 注册etcd
etcdReg := etcd.NewRegistry(
registry.Addrs("127.0.0.1:2379"),
)
- 1
- 2
- 3
- 得到微服务实例
// 1. 得到微服务实例
microService := micro.NewService(
micro.Name("rpcTaskService"), // 设置微服务名字,用来访问的
micro.Address("127.0.0.1:8083"),
micro.Registry(etcdReg),
)
- 1
- 2
- 3
- 4
- 5
- 6
- 初始化
microService.Init()
- 1
- 服务注册
将用户服务注册到etcd中
_ = services.RegisterTaskServiceHandler(microService.Server(), new(core.TaskService))
- 1
- 启动微服务
_ = microService.Run()
- 1
查看etcd中http://localhost:8080/etcdkeeper/
是否有该模块的注册信息
3. 接入网关
3.1 接入路由
- api-gateway/weblib/handlers
//备忘录服务
authed.GET("tasks", handlers.GetTaskList)
authed.POST("task", handlers.CreateTask)
authed.GET("task/:id", handlers.GetTaskDetail)
authed.DELETE("task/:id", handlers.DeleteTask)
authed.PUT("task/:id", handlers.UpdateTask)
- 1
- 2
- 3
- 4
- 5
- 6
3.2 编写接口(创建备忘录为例子)
注意这是一个多用户的备忘录,所以我们要确保的是创建到该用户的管理下的备忘录中。
所以我们就需要用到用户的id,所以就从Authorization中取出来。
func CreateTask(ginCtx *gin.Context) {
var taskReq services.TaskRequest
PanicIfTaskError(ginCtx.Bind(&taskReq))
//从gin.keys取出服务实例
claim,_ := util.ParseToken(ginCtx.GetHeader("Authorization"))
taskReq.Uid = uint64(claim.Id)
taskService := ginCtx.Keys["taskService"].(services.TaskService)
taskRes, err := taskService.CreateTask(context.Background(), &taskReq)
PanicIfTaskError(err)
ginCtx.JSON(200, gin.H{"data": taskRes.TaskDetail})
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
3.3 测试
- 创建备忘录
- 展示用户备忘录
- 修改备忘录
文章来源: blog.csdn.net,作者:小生凡一,版权归原作者所有,如需转载,请联系作者。
原文链接:blog.csdn.net/weixin_45304503/article/details/122301707
- 点赞
- 收藏
- 关注作者
评论(0)