应用场景:在工作中需要同步两个机房间的mysql数据库,但是不想通过使用类似mysql自带的主从同步这样的技术,这里做的同步仅仅是将远程数据库同步下来。原理上如同mysql自带的replication技术,同样也是对于mysql自带的binlog的读写。
通过搜索google有些开源的解决方案可以使用,本文介绍的是使用一款基于java开发的软件maxwell来实现的。
maxwell是一个由Java编写的守护进程,可以实时读取mysql binlog并将行更新以JSON格式写入Kafka,Kinesis,RabbitMQ,Google Cloud Pub / Sub或Redis(Pub / Sub或LPUSH)。(以上内容摘自maxwell官网)。可以想象,有了mysql增量数据流,使用场景就很多了,比如:实时同步数据到缓存,同步数据到ElasticSearch,数据迁移等等。与canal(ali)相比,更加轻量
maxwell还提供以下功能:
1.使用SELECT * FROM table 的方式做全量数据初始化
2.支持主库发生failover后,自动恢复binlog位置
3.对数据进行分区,解决数据倾斜的问题
4.伪装成mysql从库,接收binlog
简单的介绍就到这里,看下怎么做吧
首先maxwell是基于对mysql binlog的读取来实现主要功能的,因此必须打开mysql binlog
vim /etc/my.cnf
[mysqld]
log-bin=mysql-bin #添加这一行就ok
binlog-format=ROW #选择row模式
server_id=1
重启mysql后可通过如下命令查看
show variables like '%log_bin%'
这里我通过docker来按照maxwell,并实现相关功能。首先是获取docker镜像
docker pull zendesk/maxwell:latest
docker run -d --name=maxwell --link mysql_master:mysql_master zendesk/maxwell bin/maxwell --user=maxwell --password=Kitche931743 --host=mysql_master --output_row_query=true --filter='exclude:*.*,include:zzdzzb.*,include:biddingdata.*' --producer=rabbitmq --rabbitmq_host=xx --rabbitmq_user=root --rabbitmq_pass=xxx --rabbitmq_virtual_host=maxwell --rabbitmq_exchange=maxwell.some --rabbitmq_exchange_type=topic --rabbitmq_exchange_durable=true --rabbitmq_exchange_autodelete=false --rabbitmq_routing_key_template=zzdzzb.sync
参数解释:
user:mysql用户名
password:mysql密码
host:mysql服务器的地址
output_row_query:输出原始的SQL语句
filter:对数据库的和表的排除或者包含操作
producer:产生消息类型rabbitmq、kafka等
rabbitmq_host:rabbitmq的地址
rabbitmq_virtual_host: rabbitmq的虚拟地址,可以在rabbitmq的控制台中设置
rabbitmq_exchange: rabbitmq的交换机
rabbitmq_exchange_type:rabbitmq交换机类型
rabbitmq_exchange_durable:是否重用
rabbitmq_exchange_autodelete:是否自动删除
rabbitmq_routing_key_template:rabbitmq的交换魔板
有关rabbitmq的设置和基本概念可以参照其他相关文章
以上命令在rabbitmq配置正确的情况下能够启动基本的maxwell 服务器
全量初始化:
maxwell-bootstrap --user='maxwell' --password='Kitche931743' --host=mysql_master --database=zzdzzb --table=test_person --log_level info --client_id maxwell
将zzdzzb数据库的test_person表放入全量同步队列,这要求在服务器上实现建立maxwell数据库和相关表结构
客户端:
采用python实现客户端,代码如下:
import pika
import json
import pymysql
def exec_sql(database,sql):
db = pymysql.connect("192.168.1.21", "root", "xxx", database)
print(sql)
cursor = db.cursor()
effect_row = cursor.execute(sql)
db.commit()
db.close()
def exec_sql_presql(database,sql,data):
print(sql)
db = pymysql.connect("192.168.1.21", "root", "xxxx", database)
cursor = db.cursor()
effect_row = cursor.execute(sql,data)
print('effect row:%d',effect_row)
db.commit()
db.close()
credentials = pika.PlainCredentials('root','Kitche931743')
connection = pika.BlockingConnection(pika.ConnectionParameters(
'xxxx',5672,'maxwell',credentials))
channel = connection.channel()
def execute_preparesql(tablename,data,database):
fields = data.keys()
values = []
prepare = []
for field in fields:
val = data[field]
if val is not None:
values.append(val)
if isinstance(val,str):
prepare.append("%s")
elif isinstance(val,int):
prepare.append('%s')
elif isinstance(val,float):
prepare.append('%s')
else:
values.append(None)
prepare.append("%s")
sqlstr = 'insert into ' + tablename + '(' + ','.join(fields) + ') value(' + ','.join(prepare) + ')'
exec_sql_presql(database,sqlstr, tuple(values))
def callback(ch, method, properties, body):
bodystr = body.decode('utf8')
maxwellDic = json.loads(bodystr)
type = maxwellDic['type']
database = maxwellDic['database']
tablename = maxwellDic['table']
print('type:' + type)
if type == 'bootstrap-insert':
execute_preparesql(tablename,maxwellDic['data'],database)
elif type == 'bootstrap-complete':
sqlstr = ''
elif type == 'bootstrap-start':
sqlstr = ''
else:
if maxwellDic.__contains__('query'):
sqlstr = maxwellDic["query"]
else:
sqlstr = ''
print(bodystr)
if database == 'zzdzzb' or database == 'biddingdata':
if sqlstr != '':
exec_sql(database, sqlstr)
#print(sqlstr)
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_consume(callback,
queue='zzdzzb.sync'
# no_ack=True
)
本文为Lokie.Wang原创文章,转载无需和我联系,但请注明来自lokie博客http://lokie.wang