Python:mysql-replication监控MySQL的binlog变动

举报
彭世瑜 发表于 2021/08/14 00:15:13 2021/08/14
【摘要】 Github: https://github.com/noplay/python-mysql-replication 设置同步账号权限 GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'replicator'@'%' IDENTIFIED BY '123456'; # 刷新权限 flush...

Github: https://github.com/noplay/python-mysql-replication

设置同步账号权限

GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'replicator'@'%' IDENTIFIED BY '123456';

# 刷新权限
flush privileges;

  
 
  • 1
  • 2
  • 3
  • 4

参考
利用Python my-replication读取mysql的binlog

[mysql]mysql grant 用户权限总结

安装

pip install mysql-replication

  
 
  • 1

代码示例

# -*- coding: utf-8 -*-

import datetime
import json

from pymysqlreplication import BinLogStreamReader
from pymysqlreplication.row_event import ( DeleteRowsEvent, UpdateRowsEvent, WriteRowsEvent
)


class DateEncoder(json.JSONEncoder): """ 自定义类,解决报错: TypeError: Object of type 'datetime' is not JSON serializable """ def default(self, obj): if isinstance(obj, datetime.datetime): return obj.strftime('%Y-%m-%d %H:%M:%S') elif isinstance(obj, datetime.date): return obj.strftime("%Y-%m-%d") else: return json.JSONEncoder.default(self, obj)


# 配置数据库信息
mysql_settings = { 'host': '127.0.0.1', 'port': 3306, 'user': 'root', 'passwd': '123456'
}


def main(): # 实例化binlog 流对象 stream = BinLogStreamReader( connection_settings=mysql_settings, server_id=100,  # slave标识,唯一 blocking=True,  # 阻塞等待后续事件 # 设定只监控写操作:增、删、改 only_events=[ DeleteRowsEvent, UpdateRowsEvent, WriteRowsEvent ] ) for binlogevent in stream: # binlogevent.dump()  # 打印所有信息 for row in binlogevent.rows: # 打印 库名 和 表名 event = {"schema": binlogevent.schema, "table": binlogevent.table} if isinstance(binlogevent, DeleteRowsEvent): event["action"] = "delete" event["data"] = row["values"] elif isinstance(binlogevent, UpdateRowsEvent): event["action"] = "update" event["data"] = row["after_values"]  # 注意这里不是values elif isinstance(binlogevent, WriteRowsEvent): event["action"] = "insert" event["data"] = row["values"] print(json.dumps(event, cls=DateEncoder)) # sys.stdout.flush() # stream.close()  # 如果使用阻塞模式,这行多余了


if __name__ == '__main__': main()
"""
输出数据格式
{ "schema": "demo", # 数据库名 "table": "student",  # 表名 "action": "update",  # 动作 insert、delete、update "data": { # 数据,里边包含所有字段 "id": 26, "name": "haha", "age": 34, "update_time": "2019-06-06 16:59:06", "display": 0 }
}
"""


  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96

文章来源: pengshiyu.blog.csdn.net,作者:彭世瑜,版权归原作者所有,如需转载,请联系作者。

原文链接:pengshiyu.blog.csdn.net/article/details/91047757

【版权声明】本文为华为云社区用户转载文章,如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。