Canal+Mysql+RabbitMQ数据同步

Canal+Mysql+RabbitMQ数据同步

学习目标

  • canal是什么
  • canal能做什么
  • canal的工作原理
  • canal搭建和使用
  • canal同步数据测试

Canal是什么

Canal 是阿里巴巴开源的同步数据的工具,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费。

Canal能做什么

上面我们说了 Canal 主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费,那它具体能做些什么呢,Canal 可以做:

  • 数据库镜像
  • 数据库实时备份
  • 索引构建和实时维护(拆分异构索引、倒排索引等)
  • 业务 cache 刷新
  • 带业务逻辑的增量数据处理

Canal的工作原理

  • canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议
  • MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
  • canal 解析 binary log 对象(原始为 byte 流)

canal搭建和使用

准备工作

mysql配置(安装省略)

msyql需要先开启 Binlog 写入功能,配置 binlog-format 为 ROW 模式,my.cnf 中配置如下

1
2
3
4
[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复

创建canal用户并授权 canal 链接 MySQL 账号具有作为 MySQL slave 的权限

1
2
3
CREATE USER canal IDENTIFIED BY 'your_password';  
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;

rabbitmq安装配置(docker)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# 拉取镜像
docker pull rabbitmq:3.10.0-rc.4-management-alpine

# 启动rabbitmq服务
docker run -d --name rabbit-mg -p 5672:5672 -p 15672:15672 rabbitmq:3.10.0-rc.4-management-alpine

# 进入容器
docker exec -it rabbit-mg /bin/bash

# 新增用户并配置密码
rabbitmqctl add_user root your_password

# 赋予用户所有权限
rabbitmqctl set_permissions -p / root ".*" ".*" ".*"

# 赋予用户administrator角色
rabbitmqctl set_user_tags root administrator

# 查看所有用户
rabbitmqctl list_users

# 修改用户密码
rabbitmqctl change_password root your_password

打开浏览器输入 http://ip:15672 ,输入账号密码登录

rabbitmq登录界面

新增交换机、队列、设置路由key绑定

新增交换机并绑定队列

下载解压

1
2
3
4
5
6
7
8
# 下载canal
wget -c https://github.com/alibaba/canal/releases/download/canal-1.1.5/canal.deployer-1.1.5.tar.gz

# 创建目录
mkdir canal-deployer-1.1.5

# 解压至目录
tar -zxvf canal.deployer-1.1.5.tar.gz -C canal-deployer-1.1.5

目录结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
.
├── bin 脚本目录
│   ├── restart.sh 重启脚本
│   ├── startup.bat
│   ├── startup.sh 启动脚本
│   └── stop.sh 停止脚本
├── conf 配置文件目录
│   ├── canal_local.properties
│   ├── canal.properties
│   ├── example
│   | └── instance.properties
│   ├── logback.xml
│   ├── metrics
│   └── spring
├── lib 包
├── logs 日志
└── plugin 插件
├── connector.kafka-1.1.5-jar-with-dependencies.jar
├── connector.rabbitmq-1.1.5-jar-with-dependencies.jar
└── connector.rocketmq-1.1.5-jar-with-dependencies.jar

修改配置文件

配置mysql数据库信息

vim canal-deployer-1.1.5/conf/example/instance.properties

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
## mysql serverId
canal.instance.mysql.slaveId = 128
#position info,需要改成自己的数据库信息
canal.instance.master.address = 127.0.0.1:3306 #mysql地址
canal.instance.master.journal.name =
canal.instance.master.position =
canal.instance.master.timestamp =
#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#username/password,需要改成自己的数据库信息
canal.instance.dbUsername = canal #mysql账号
canal.instance.dbPassword = your_password #mysql密码
canal.instance.defaultDatabaseName =
canal.instance.connectionCharset = UTF-8
#table regex
canal.instance.filter.regex = .\*\\\\..\*

# mq config
canal.mq.topic=canal_demo_key #配置mq的路由key
canal.mq.partition=0

配置rabbitmq信息

vim canal-deployer-1.1.5/conf/canal.properties

1
2
3
4
5
6
7
canal.serverMode = rabbitMQ             #连接的服务模式
rabbitmq.host = 127.0.0.1 #mq主机地址
rabbitmq.virtual.host = / #mq虚拟主机
rabbitmq.exchange = canal_demo #mq交换机
rabbitmq.username = root #mq账号
rabbitmq.password = your_password #mq密码
rabbitmq.deliveryMode = direct #mq交换机模式

启动canal

1
2
3
4
5
6
7
8
9
10
11
12
13
14
启动
sh canal-deployer-1.1.5/bin/startup.sh

查看 server 日志
tail -f canal-deployer-1.1.5/logs/canal/canal.log

查看 instance 的日志
tail -f canal-deployer-1.1.5/logs/example/example.log

关闭
sh canal-deployer-1.1.5/bin/stop.sh

重启
sh canal-deployer-1.1.5/bin/restart.sh

canal同步数据测试

先查询MySQL用户表数据,然后插入一条数据、更新一条数据、删除一条数据。

mysql-curd

查看队列条数,此时有三条数据

rabbitmq-total

查看队列数据内容,分别对应了Mysql操作的增、改、删对应的json数据

rabbitmq-data

至此已经完成将操作数据同步至rabbitmq,后续对队列进行消费作对应的业务处理即可。