Docker部署Canal实践
Canal
官网地址:https://github.com/alibaba/canal
什么是 Canel
官方介绍:
canal ,译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费
- Canal 是一个同步增量数据的一个工具,可以很方便的同步数据库的增量数据到其他存储应用;
- Canal 基于binary log 增量订阅和消费;
- Canal 的数据同步不是全量的,而是增量。
Canal 能做什么
- 数据库镜像;
- 数据库实时备份;
- 索引构建和实时维护;
- 业务 cache(缓存)刷新;
- 带业务逻辑的增量数据处理。
Canal 工作原理
- canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 请求;
- MySQL master 收到 dump 请求,开始推送 binary log 给 slave ( 即 canal );
- canal 解析 binary log 对象,将其发送到存储目的地。
实践
在虚拟机中安装 Docker(参照官网安装),Docker 中安装 MYSQL 作为 Mysql master,本机安装的 MYSQL 作为 Mysql slave(伪),实现虚拟机 Mysql master 数据增加,通过 Canal 本地数据同步增加。
Docker安装 MYSQL
拉取镜像;
1
docker pull mysql
创建容器并挂载数据和配置文件;
1
2
3
4
5docker run -d -p 3306:3306 --privileged=true \
-v /liuduix/mysql/log:/var/log/mysql \
-v /liuduix/mysql/data:/var/lib/mysql \
-v /liuduix/mysql/conf:/etc/mysql/conf.d \
-e MYSQL_ROOT_PASSWORD=12345678 --name="mysql" mysql进入挂载的配置目录
liuduix/mysql/conf
,新建my.cnf
,写入以下内容;解决乱码问题;
1
2
3
4
5[client]
default_character_set=utf8mb4
[mysqld]
collation_server=utf8mb4_general_ci
character_set_server=utf8mb4设置密码验证方式(非必须,如果连接请求被拒绝可以尝试配置此项);
MYSQL 8.0 将密码验证方式改为
caching_sha2_password
,而以前的板本是mysql_native_password
。以下使用 + 代表在该节点下新增一行
1
2
3[mysqld]
#......
+ default_authentication_plugin=mysql_native_password
重启 msyql 实例;
1
docker restart mysql
查看是否重启成功;
1
docker ps
如果容器启动失败,使用 docker logs [容器名/容器ID] 来查看错误日志
Master 开启 binlog 日志
查看 Docker 中的 Mysql 是否开启 binlog;
1
show variables like 'log_bin'
如果显示未开启则需要修改配置文件开启(Docker 中最新版 mysql 默认是开启的);
liuduix/mysql/conf/my.cnf
1
2
3
4
5[mysqld]
#...
+ log-bin=mysql-bin # 打开 binlog mysql-bin 为文件名
+ binlog-format=ROW # 选择ROW(行)模式
+ server_id=1 # 不要和 canal 的 slaveId 重复即可binlog-format 三种模式
- ROW:记录每次操作后每行的变化,占用空间大
- STATEMENT:记录每一次执行写操作的语句,可能数据不一致
- MIXED:混合日志,默认是 STATMENT 模式,特殊情况时会使用 ROW 模式
重启 mysql,查看是否开启 binlog 和配置是否成功;
1
docker restart mysql
1
show variables like `log_bin`
1
2# 查看 binlog 模式是否为行级模式
show variables like 'binlog_format'Mysql master 中新建用于 canal 访问的账户;
1
2
3
4
5
6# 新建用户 用户名:canal 密码:canal
CREATE USER canal IDENTIFIED by 'canal';
# 授权
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
# 刷新MySQL的系统权限表
FLUSH PRIVILEGES;查看用户表(主要查看 Host 是否为 %,如果是 localhost 代表仅支持本地访问);
1
mysql->user
Docker 安装 Canal
拉取镜像;
1
docker pull canal/canal-server
先启动 canal ,获取配置文件用于后续挂载;
启动 canal
1
docker run --name canal -d canal/canal-server
复制出配置文件
1
docker cp canal:/home/admin/canal-server/conf/example/instance.properties /liuduix/canal/conf/
修改该配置文件;
/liuduix/canal/conf/instance.properties
1
2
3
4
5
6
7
8# 改为自己的数据库地址
canal.instance.master.address=192.168.225.132:3306
# 改为用于 canal 登录的账户名和密码
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
# 同步的数据库表规则,默认是匹配所有库所有表,不需要修改
# 此过滤规则仅针对 ROW 模式有效
canal.instance.filter.regex=.*\\..*移除容器;
1
2docker stop canal
docker rm canal创建新的 canal 容器并挂载数据和配置;
1
2
3docker run --name canal -d -p 11111:11111 \
-v /liuduix/canal/conf/instance.properties:/home/admin/canal-server/conf/example/instance.properties \
canal/canal-server
测试
本地和远程 Mysql 的相同库中创建结构相同的表;
1
2
3
4
5
6
7
8CREATE TABLE `members` (
`id` varchar(20) NOT NULL PRIMARY KEY COMMENT '用户id',
`user_name` varchar(25) COMMENT '用户名',
`age` int(0) COMMENT '年龄',
`gmt_create` date COMMENT '创建时间',
`gmt_modified` date COMMENT '修改时间',
PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8mb4;引入依赖;
1
2
3
4
5
6<!--canal-->
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.0</version>
</dependency>创建 CanalClient 类;
client/CanalClient
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220package com.liu.client;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry.*;
import com.alibaba.otter.canal.protocol.Message;
import com.google.protobuf.InvalidProtocolBufferException;
import org.apache.commons.dbutils.DbUtils;
import org.apache.commons.dbutils.QueryRunner;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import javax.sql.DataSource;
import java.net.InetSocketAddress;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
/**
* @author liuduix
*/
public class CanalClient {
//sql队列
private Queue<String> SQL_QUEUE = new ConcurrentLinkedQueue<>();
private DataSource dataSource;
/**
* canal入库方法
*/
public void run() {
CanalConnector connector = CanalConnectors.newSingleConnector(new
InetSocketAddress("192.168.225.130",
11111), "example", "", "");
int batchSize = 1000;
try {
connector.connect();
connector.subscribe(".*\\..*");
connector.rollback();
try {
while (true) {
//尝试从master那边拉去数据batchSize条记录,有多少取多少
Message message = connector.getWithoutAck(batchSize);
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
Thread.sleep(1000);
} else {
dataHandle(message.getEntries());
}
connector.ack(batchId);
//当队列里面堆积的sql大于一定数值的时候就模拟执行
if (SQL_QUEUE.size() >= 1) {
executeQueueSql();
}
}
} catch (InterruptedException e) {
e.printStackTrace();
} catch (InvalidProtocolBufferException e) {
e.printStackTrace();
}
} finally {
connector.disconnect();
}
}
/**
* 模拟执行队列里面的sql语句
*/
public void executeQueueSql() {
int size = SQL_QUEUE.size();
for (int i = 0; i < size; i++) {
String sql = SQL_QUEUE.poll();
System.out.println("[sql]----> " + sql);
this.execute(sql.toString());
}
}
/**
* 数据处理
*
* @param entrys
*/
private void dataHandle(List<Entry> entrys) throws
InvalidProtocolBufferException {
for (Entry entry : entrys) {
if (EntryType.ROWDATA == entry.getEntryType()) {
RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
EventType eventType = rowChange.getEventType();
if (eventType == EventType.DELETE) {
saveDeleteSql(entry);
} else if (eventType == EventType.UPDATE) {
saveUpdateSql(entry);
} else if (eventType == EventType.INSERT) {
saveInsertSql(entry);
}
}
}
}
/**
* 保存更新语句
*
* @param entry
*/
private void saveUpdateSql(Entry entry) {
try {
RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
List<RowData> rowDatasList = rowChange.getRowDatasList();
for (RowData rowData : rowDatasList) {
List<Column> newColumnList = rowData.getAfterColumnsList();
StringBuffer sql = new StringBuffer("update " +
entry.getHeader().getTableName() + " set ");
for (int i = 0; i < newColumnList.size(); i++) {
sql.append(" " + newColumnList.get(i).getName()
+ " = '" + newColumnList.get(i).getValue() + "'");
if (i != newColumnList.size() - 1) {
sql.append(",");
}
}
sql.append(" where ");
List<Column> oldColumnList = rowData.getBeforeColumnsList();
for (Column column : oldColumnList) {
if (column.getIsKey()) {
//暂时只支持单一主键
sql.append(column.getName() + "=" + column.getValue());
break;
}
}
SQL_QUEUE.add(sql.toString());
}
} catch (InvalidProtocolBufferException e) {
e.printStackTrace();
}
}
/**
* 保存删除语句
*
* @param entry
*/
private void saveDeleteSql(Entry entry) {
try {
RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
List<RowData> rowDatasList = rowChange.getRowDatasList();
for (RowData rowData : rowDatasList) {
List<Column> columnList = rowData.getBeforeColumnsList();
StringBuffer sql = new StringBuffer("delete from " +
entry.getHeader().getTableName() + " where ");
for (Column column : columnList) {
if (column.getIsKey()) {
//暂时只支持单一主键
sql.append(column.getName() + "=" + column.getValue());
break;
}
}
SQL_QUEUE.add(sql.toString());
}
} catch (InvalidProtocolBufferException e) {
e.printStackTrace();
}
}
/**
* 保存插入语句
*
* @param entry
*/
private void saveInsertSql(Entry entry) {
try {
RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
List<RowData> rowDatasList = rowChange.getRowDatasList();
for (RowData rowData : rowDatasList) {
List<Column> columnList = rowData.getAfterColumnsList();
StringBuffer sql = new StringBuffer("insert into " +
entry.getHeader().getTableName() + " (");
for (int i = 0; i < columnList.size(); i++) {
sql.append(columnList.get(i).getName());
if (i != columnList.size() - 1) {
sql.append(",");
}
}
sql.append(") VALUES (");
for (int i = 0; i < columnList.size(); i++) {
sql.append("'" + columnList.get(i).getValue() + "'");
if (i != columnList.size() - 1) {
sql.append(",");
}
}
sql.append(")");
SQL_QUEUE.add(sql.toString());
}
} catch (InvalidProtocolBufferException e) {
e.printStackTrace();
}
}
/**
* 入库
*
* @param sql
*/
public void execute(String sql) {
Connection con = null;
try {
if (null == sql) return;
con = dataSource.getConnection();
QueryRunner qr = new QueryRunner();
int row = qr.execute(con, sql);
System.out.println("update: " + row);
} catch (SQLException e) {
e.printStackTrace();
} finally {
DbUtils.closeQuietly(con);
}
}
}创建启动类;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class CanalDemoApplication implements CommandLineRunner {
private CanalClient canalClient;
public static void main(String[] args) {
SpringApplication.run(CanalDemoApplication.class, args);
}
public void run(String... args) throws Exception {
//项目启动,执行canal客户端监听
canalClient.run();
}
}测试;
向 Docker 中的 Mysql 添加一条数据,看见控制台打印「Update 1」,同时本机的 Mysql(Canal 伪装的 slave)添加了一条数据。
参考文章
- 超详细canal入门,看这篇就够了:https://zhuanlan.zhihu.com/p/177001630
- 使用 Docker 部署 canal 服务:https://blog.csdn.net/qq2276031/article/details/120234122