Python:mysql-replication监控MySQL的binlog变动

举报
彭世瑜 发表于 2021/08/14 00:15:13 2021/08/14
2k+ 0 0
【摘要】 Github: https://github.com/noplay/python-mysql-replication 设置同步账号权限 GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'replicator'@'%' IDENTIFIED BY '123456'; # 刷新权限 flush...

Github: https://github.com/noplay/python-mysql-replication

设置同步账号权限

GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'replicator'@'%' IDENTIFIED BY '123456';

# 刷新权限
flush privileges;

  
 

参考
利用Python my-replication读取mysql的binlog

[mysql]mysql grant 用户权限总结

安装

pip install mysql-replication

  
 

代码示例

# -*- coding: utf-8 -*-

import datetime
import json

from pymysqlreplication import BinLogStreamReader
from pymysqlreplication.row_event import ( DeleteRowsEvent, UpdateRowsEvent, WriteRowsEvent
)
class DateEncoder(json.JSONEncoder): """ 自定义类,解决报错: TypeError: Object of type 'datetime' is not JSON serializable """ def default(self, obj): if isinstance(obj, datetime.datetime): return obj.strftime('%Y-%m-%d %H:%M:%S') elif isinstance(obj, datetime.date): return obj.strftime("%Y-%m-%d") else: return json.JSONEncoder.default(self, obj)
# 配置数据库信息
mysql_settings = { 'host': '127.0.0.1', 'port': 3306, 'user': 'root', 'passwd': '123456'
}
def main(): # 实例化binlog 流对象 stream = BinLogStreamReader( connection_settings=mysql_settings, server_id=100,  # slave标识,唯一 blocking=True,  # 阻塞等待后续事件 # 设定只监控写操作:增、删、改 only_events=[ DeleteRowsEvent, UpdateRowsEvent, WriteRowsEvent ] ) for binlogevent in stream: # binlogevent.dump()  # 打印所有信息 for row in binlogevent.rows: # 打印 库名 和 表名 event = {"schema": binlogevent.schema, "table": binlogevent.table} if isinstance(binlogevent, DeleteRowsEvent): event["action"] = "delete" event["data"] = row["values"] elif isinstance(binlogevent, UpdateRowsEvent): event["action"] = "update" event["data"] = row["after_values"]  # 注意这里不是values elif isinstance(binlogevent, WriteRowsEvent): event["action"] = "insert" event["data"] = row["values"] print(json.dumps(event, cls=DateEncoder)) # sys.stdout.flush() # stream.close()  # 如果使用阻塞模式,这行多余了
if __name__ == '__main__': main()
"""
输出数据格式
{ "schema": "demo", # 数据库名 "table": "student",  # 表名 "action": "update",  # 动作 insert、delete、update "data": { # 数据,里边包含所有字段 "id": 26, "name": "haha", "age": 34, "update_time": "2019-06-06 16:59:06", "display": 0 }
}
"""
  
 

文章来源: pengshiyu.blog.csdn.net,作者:彭世瑜,版权归原作者所有,如需转载,请联系作者。

原文链接:pengshiyu.blog.csdn.net/article/details/91047757

【版权声明】本文为华为云社区用户转载文章,如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

作者其他文章

评论(0

抱歉,系统识别当前为高风险访问,暂不支持该操作

    全部回复

    上滑加载中

    设置昵称

    在此一键设置昵称,即可参与社区互动!

    *长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

    *长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。