python获取数据库更新消息(原文链接:https://www.cnblogs.com/zhangjianhua/p/8080538.html)

论坛 期权论坛 编程之家     
选择匿名的用户   2021-6-2 17:17   1223   0

1.软件安装

[root@mha-maxscale-1 ~]# pip install mysql-replication

2.MySQL授权

 GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'replicator'@'192.168.3.%' IDENTIFIED BY '123456';

3.Binlog要满足如下条件

MySQL>root@(none) 09:53:38>show variables like 'log_bin';
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| log_bin | ON |
+---------------+-------+
1 row in set (0.01 sec)

MySQL>show variables like 'binlog_format';
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| binlog_format | ROW |
+---------------+-------+
1 row in set (0.00 sec)

MySQL>show variables like 'binlog_row_image';
+------------------+-------+
| Variable_name | Value |
+------------------+-------+
| binlog_row_image | FULL |
+------------------+-------+
1 row in set (0.00 sec)

4.示例代码

[root@mha-maxscale-1 script]# cat mysql-replication.py
#!/usr/bin/env python
# -*- coding: utf-8 -*-

from pymysqlreplication import BinLogStreamReader
from pymysqlreplication.row_event import (
DeleteRowsEvent,
UpdateRowsEvent,
WriteRowsEvent,
)
import sys
import json

def main():
mysql_settings = {'host': '192.168.3.130',
'port': 3306, 'user': 'replicator', 'passwd': '123456'}
stream = BinLogStreamReader(
connection_settings=mysql_settings,
server_id=101,
blocking=True,
only_schemas=['zow'],
only_events=[DeleteRowsEvent, WriteRowsEvent, UpdateRowsEvent],
resume_stream=True,
log_file='mysql-bin.000013', log_pos=6197)

for binlogevent in stream:
for row in binlogevent.rows:
event = {"schema": binlogevent.schema, "table": binlogevent.table, "log_pos": binlogevent.packet.log_pos}
if isinstance(binlogevent, DeleteRowsEvent):
event["action"] = "delete"
event["values"] = dict(row["values"].items())
event = dict(event.items())
elif isinstance(binlogevent, UpdateRowsEvent):
event["action"] = "update"
event["before_values"] = dict(row["before_values"].items())
event["after_values"] = dict(row["after_values"].items())
event = dict(event.items())
elif isinstance(binlogevent, WriteRowsEvent):
event["action"] = "insert"
event["values"] = dict(row["values"].items())
event = dict(event.items())
print json.dumps(event)
sys.stdout.flush()

stream.close()


if __name__ == "__main__":
main()

分享到 :
0 人收藏
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

积分:3875789
帖子:775174
精华:0
期权论坛 期权论坛
发布
内容

下载期权论坛手机APP