1.软件安装
[root@mha-maxscale-1 ~]# pip install mysql-replication
2.MySQL授权
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'replicator'@'192.168.3.%' IDENTIFIED BY '123456';
3.Binlog要满足如下条件
MySQL>root@(none) 09:53:38>show variables like 'log_bin'; +---------------+-------+ | Variable_name | Value | +---------------+-------+ | log_bin | ON | +---------------+-------+ 1 row in set (0.01 sec)
MySQL>show variables like 'binlog_format'; +---------------+-------+ | Variable_name | Value | +---------------+-------+ | binlog_format | ROW | +---------------+-------+ 1 row in set (0.00 sec)
MySQL>show variables like 'binlog_row_image'; +------------------+-------+ | Variable_name | Value | +------------------+-------+ | binlog_row_image | FULL | +------------------+-------+ 1 row in set (0.00 sec)
4.示例代码
[root@mha-maxscale-1 script]# cat mysql-replication.py #!/usr/bin/env python # -*- coding: utf-8 -*-
from pymysqlreplication import BinLogStreamReader from pymysqlreplication.row_event import ( DeleteRowsEvent, UpdateRowsEvent, WriteRowsEvent, ) import sys import json
def main(): mysql_settings = {'host': '192.168.3.130', 'port': 3306, 'user': 'replicator', 'passwd': '123456'} stream = BinLogStreamReader( connection_settings=mysql_settings, server_id=101, blocking=True, only_schemas=['zow'], only_events=[DeleteRowsEvent, WriteRowsEvent, UpdateRowsEvent], resume_stream=True, log_file='mysql-bin.000013', log_pos=6197)
for binlogevent in stream: for row in binlogevent.rows: event = {"schema": binlogevent.schema, "table": binlogevent.table, "log_pos": binlogevent.packet.log_pos} if isinstance(binlogevent, DeleteRowsEvent): event["action"] = "delete" event["values"] = dict(row["values"].items()) event = dict(event.items()) elif isinstance(binlogevent, UpdateRowsEvent): event["action"] = "update" event["before_values"] = dict(row["before_values"].items()) event["after_values"] = dict(row["after_values"].items()) event = dict(event.items()) elif isinstance(binlogevent, WriteRowsEvent): event["action"] = "insert" event["values"] = dict(row["values"].items()) event = dict(event.items()) print json.dumps(event) sys.stdout.flush()
stream.close()
if __name__ == "__main__": main()
|