利用maxwell读取mysql binlog来同步数据库

应用场景:在工作中需要同步两个机房间的mysql数据库,但是不想通过使用类似mysql自带的主从同步这样的技术,这里做的同步仅仅是将远程数据库同步下来。原理上如同mysql自带的replication技术,同样也是对于mysql自带的binlog的读写。
通过搜索google有些开源的解决方案可以使用,本文介绍的是使用一款基于java开发的软件maxwell来实现的。

maxwell是一个由Java编写的守护进程,可以实时读取mysql binlog并将行更新以JSON格式写入Kafka,Kinesis,RabbitMQ,Google Cloud Pub / Sub或Redis(Pub / Sub或LPUSH)。(以上内容摘自maxwell官网)。可以想象,有了mysql增量数据流,使用场景就很多了,比如:实时同步数据到缓存,同步数据到ElasticSearch,数据迁移等等。与canal(ali)相比,更加轻量

maxwell还提供以下功能:

1.使用SELECT * FROM table 的方式做全量数据初始化
2.支持主库发生failover后,自动恢复binlog位置
3.对数据进行分区,解决数据倾斜的问题
4.伪装成mysql从库,接收binlog

简单的介绍就到这里,看下怎么做吧
首先maxwell是基于对mysql binlog的读取来实现主要功能的,因此必须打开mysql binlog

vim /etc/my.cnf
[mysqld]
log-bin=mysql-bin #添加这一行就ok
binlog-format=ROW #选择row模式
server_id=1

重启mysql后可通过如下命令查看

    show variables like '%log_bin%'

这里我通过docker来按照maxwell,并实现相关功能。首先是获取docker镜像


docker pull zendesk/maxwell:latest
docker run -d --name=maxwell --link mysql_master:mysql_master zendesk/maxwell bin/maxwell --user=maxwell     --password=Kitche931743  --host=mysql_master  --output_row_query=true --filter='exclude:*.*,include:zzdzzb.*,include:biddingdata.*' --producer=rabbitmq --rabbitmq_host=xx --rabbitmq_user=root --rabbitmq_pass=xxx --rabbitmq_virtual_host=maxwell --rabbitmq_exchange=maxwell.some --rabbitmq_exchange_type=topic --rabbitmq_exchange_durable=true --rabbitmq_exchange_autodelete=false --rabbitmq_routing_key_template=zzdzzb.sync

参数解释:
user:mysql用户名
password:mysql密码
host:mysql服务器的地址
output_row_query:输出原始的SQL语句
filter:对数据库的和表的排除或者包含操作
producer:产生消息类型rabbitmq、kafka等
rabbitmq_host:rabbitmq的地址
rabbitmq_virtual_host: rabbitmq的虚拟地址,可以在rabbitmq的控制台中设置
rabbitmq_exchange: rabbitmq的交换机
rabbitmq_exchange_type:rabbitmq交换机类型
rabbitmq_exchange_durable:是否重用
rabbitmq_exchange_autodelete:是否自动删除
rabbitmq_routing_key_template:rabbitmq的交换魔板
有关rabbitmq的设置和基本概念可以参照其他相关文章

以上命令在rabbitmq配置正确的情况下能够启动基本的maxwell 服务器

全量初始化:

 maxwell-bootstrap --user='maxwell' --password='Kitche931743' --host=mysql_master --database=zzdzzb --table=test_person --log_level info  --client_id maxwell
 

将zzdzzb数据库的test_person表放入全量同步队列,这要求在服务器上实现建立maxwell数据库和相关表结构

客户端:
采用python实现客户端,代码如下:

import pika
import json
import pymysql
def exec_sql(database,sql):
db = pymysql.connect("192.168.1.21", "root", "xxx", database)
print(sql)
cursor = db.cursor()
effect_row = cursor.execute(sql)
db.commit()
db.close()

def exec_sql_presql(database,sql,data):
print(sql)
db = pymysql.connect("192.168.1.21", "root", "xxxx", database)
cursor = db.cursor()
effect_row = cursor.execute(sql,data)

print('effect row:%d',effect_row)
db.commit()
db.close()

credentials = pika.PlainCredentials('root','Kitche931743')
connection = pika.BlockingConnection(pika.ConnectionParameters(
'xxxx',5672,'maxwell',credentials))
channel = connection.channel()

def execute_preparesql(tablename,data,database):
fields = data.keys()
values = []
prepare = []
for field in fields:
    val =  data[field]
    if val is not None:
        values.append(val)
        if isinstance(val,str):
            prepare.append("%s")
        elif isinstance(val,int):
            prepare.append('%s')
        elif isinstance(val,float):
            prepare.append('%s')
    else:
        values.append(None)
        prepare.append("%s")

sqlstr = 'insert into ' + tablename + '(' + ','.join(fields) + ') value(' + ','.join(prepare) + ')'
exec_sql_presql(database,sqlstr, tuple(values))

def callback(ch, method, properties, body):
bodystr = body.decode('utf8')
maxwellDic = json.loads(bodystr)
type = maxwellDic['type']
database = maxwellDic['database']
tablename = maxwellDic['table']
print('type:' + type)
if type == 'bootstrap-insert':
    execute_preparesql(tablename,maxwellDic['data'],database)
elif type == 'bootstrap-complete':
    sqlstr = ''
elif type == 'bootstrap-start':
    sqlstr = ''
else:
    if maxwellDic.__contains__('query'):
        sqlstr = maxwellDic["query"]
    else:
        sqlstr = ''
        print(bodystr)
    if database == 'zzdzzb' or database == 'biddingdata':
        if sqlstr != '':
            exec_sql(database, sqlstr)

#print(sqlstr)
ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_consume(callback,
                  queue='zzdzzb.sync'
                  # no_ack=True
                  )

Lokie博客
请先登录后发表评论
  • 最新评论
  • 总共0条评论
  • 本博客使用免费开源的 laravel-bjyblog v5.5.1.1 搭建 © 2014-2018 lokie.wang 版权所有 ICP证:沪ICP备18016993号
  • 联系邮箱:kitche1985@hotmail.com