监听缓存通知以同步数据
1 简介
通过缓存的数据通知消息,更新mysql数据库是一个不错的选择,首先需要开启缓存redis的通知开关。
Redis有Key空间通知功能,利用该功能可以监听Redis中的数据变动,当Redis中的数据发生变化时,通知Go程序去更新MySQL。
-
实现步骤:
启用Redis的Key空间通知。 Go程序通过订阅Redis发布的事件,监听数据变化。 当检测到Redis中数据发生变化时,更新MySQL中的相应数据。
2 实例
示例代码:
package main
var ctx = context.Background()
func subscribeAndSync(db *gorm.DB, rdb *redis.Client) {
pubsub := rdb.Subscribe(ctx, "__keyevent@0__:set")
defer pubsub.Close()
for {
msg, err := pubsub.ReceiveMessage(ctx)
if err != nil {
log.Println("Error receiving message:", err)
continue
}
fmt.Println("Received message:", msg.Payload)
// 获取Redis键值
key := msg.Payload
value, err := rdb.Get(ctx, key).Result()
if err != nil {
log.Println("Error getting value from Redis:", err)
continue
}
// 更新MySQL
err = db.Exec("UPDATE your_table SET your_column = ? WHERE your_key = ?", value, key).Error
if err != nil {
log.Println("Error updating MySQL:", err)
}
}
}
func main() {
// 连接MySQL
dsn := "user:password@tcp(127.0.0.1:3306)/dbname"
db, err := gorm.Open(mysql.Open(dsn), &gorm.Config{})
if err != nil {
log.Fatal("Failed to connect to MySQL:", err)
}
// 连接Redis
rdb := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
})
// 开始订阅并同步
subscribeAndSync(db, rdb)
}
3 批量订阅
通过正则表达式模式匹配来订阅一组 keys 是最常见的方式。例如,PSUBSCRIBE 可以用来订阅符合某种模式的 key 空间事件。
-
- 使用模式订阅 (PSUBSCRIBE)
Redis 的模式订阅功能,可以使用通配符来订阅多个 key 的事件。假设需要订阅多个 key,它们遵循某种命名规则,可以通过通配符来订阅相关事件。
PSubscribe 用于订阅频道模式,而不是直接对 key 进行匹配。
在 Redis 的Key空间通知中,你不能直接用 PSUBSCRIBE 针对特定 key(如 user:*)来订阅 key 事件。
通过模式订阅来监听 key 事件,并在代码中手动过滤特定的 key 前缀。
__keyevent@<db>__:<event>:例如 __keyevent@0__:set 监听数据库 0 中的 set 事件。
__keyspace@<db>__:<key>:例如 __keyspace@0__:user:123 监听数据库 0 中 user:123 这个具体 key 的所有事件。
你不能直接用 PSubscribe 对多个 key 前缀进行模式订阅,
但你可以订阅特定的事件(如 set, del等),然后在接收到事件时手动过滤 key 前缀。
手动过滤 key 前缀这里是一个示例版本,允许你监听所有 set 或 del 事件,并通过代码过滤特定前缀(如 user: 和 product:)的 keys。
以 user: 开头的 keys,可以通过 PSUBSCRIBE user:* 来批量订阅这些 keys 的空间通知。
var ctx = context.Background()
// 同步 Redis 和 MySQL 的函数
func syncRedisToMySQL(db *gorm.DB, rdb *redis.Client, key string) {
value, err := rdb.Get(ctx, key).Result()
if err != nil {
log.Println("Error getting value from Redis:", err)
return
}
// 更新MySQL
err = db.Exec("UPDATE your_table SET your_column = ? WHERE your_key = ?", value, key).Error
if err != nil {
log.Println("Error updating MySQL:", err)
}
}
func subscribeAndSync(db *gorm.DB, rdb *redis.Client) {
// 订阅所有 set 和 del 事件
pubsub := rdb.PSubscribe(ctx, "__keyevent@0__:set", "__keyevent@0__:del")
defer pubsub.Close()
for {
msg, err := pubsub.ReceiveMessage(ctx)
if err != nil {
log.Println("Error receiving message:", err)
continue
}
// 打印接收到的消息
fmt.Println("Received message:", msg.Channel, msg.Payload)
// 手动过滤特定的 key 前缀
if strings.HasPrefix(msg.Payload, "user:") || strings.HasPrefix(msg.Payload, "product:") {
fmt.Println("Processing key:", msg.Payload)
syncRedisToMySQL(db, rdb, msg.Payload)
} else {
fmt.Println("Skipping key:", msg.Payload)
}
}
}
func main() {
// 连接 MySQL
dsn := "user:password@tcp(127.0.0.1:3306)/dbname"
db, err := gorm.Open(mysql.Open(dsn), &gorm.Config{})
if err != nil {
log.Fatal("Failed to connect to MySQL:", err)
}
// 连接 Redis
rdb := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
})
// 启动订阅并同步
subscribeAndSync(db, rdb)
}
订阅特定事件:PSubscribe(ctx, “keyevent@0:set”, “keyevent@0:del”) 订阅数据库 0 中所有 set 和 del 事件。
过滤特定 key:在 ReceiveMessage 中,通过 strings.HasPrefix 来过滤 user: 和 product: 开头的 keys,然后针对这些 key 进行处理和同步。
- 启用 Redis Key 空间通知
在 Redis 配置中,你需要确保启用了 Key 空间通知功能:
确保 Redis 启用了 Key空间通知:
CONFIG SET notify-keyspace-events KEA
K:启用 key 事件通知。
E:监听所有事件。
A:所有 key 的事件。
通过这种方式,你可以批量监听多个 key 前缀的变化,同时避免直接订阅多个特定的 key。
或者在 redis.conf 中设置:
notify-keyspace-events Ex
这里的 Ex 表示启用过期和所有类型的 key 事件通知。
如果你使用的是 Redis CLI,也可以动态启用通知:
CONFIG SET notify-keyspace-events Ex
模式匹配,你可以使用通配符来匹配一批 keys:
user:*: 订阅所有以 user: 开头的 keys。
product:*: 订阅所有以 product: 开头的 keys。
*: 订阅所有 key 的事件(不推荐,除非确实需要)。
通过这种模式订阅方法,你可以监听符合命名规则的多个 key 的变化。
- 点赞
- 收藏
- 关注作者
评论(0)