Canal

官网地址:https://github.com/alibaba/canal

什么是 Canel

官方介绍:

canal ,译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费

  • Canal 是一个同步增量数据的一个工具,可以很方便的同步数据库的增量数据到其他存储应用;
  • Canal 基于binary log 增量订阅和消费;
  • Canal 的数据同步不是全量的,而是增量

Canal 能做什么

  • 数据库镜像;
  • 数据库实时备份;
  • 索引构建和实时维护;
  • 业务 cache(缓存)刷新;
  • 带业务逻辑的增量数据处理。

Canal 工作原理

  • canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 请求;
  • MySQL master 收到 dump 请求,开始推送 binary log 给 slave ( 即 canal );
  • canal 解析 binary log 对象,将其发送到存储目的地。

实践

在虚拟机中安装 Docker(参照官网安装),Docker 中安装 MYSQL 作为 Mysql master,本机安装的 MYSQL 作为 Mysql slave(伪),实现虚拟机 Mysql master 数据增加,通过 Canal 本地数据同步增加。

Docker安装 MYSQL

  1. 拉取镜像;

    1
    docker pull mysql
  2. 创建容器并挂载数据和配置文件;

    1
    2
    3
    4
    5
    docker run -d -p 3306:3306 --privileged=true \
    -v /liuduix/mysql/log:/var/log/mysql \
    -v /liuduix/mysql/data:/var/lib/mysql \
    -v /liuduix/mysql/conf:/etc/mysql/conf.d \
    -e MYSQL_ROOT_PASSWORD=12345678 --name="mysql" mysql
  3. 进入挂载的配置目录liuduix/mysql/conf,新建my.cnf,写入以下内容;

    • 解决乱码问题;

      1
      2
      3
      4
      5
      [client]
      default_character_set=utf8mb4
      [mysqld]
      collation_server=utf8mb4_general_ci
      character_set_server=utf8mb4
    • 设置密码验证方式(非必须,如果连接请求被拒绝可以尝试配置此项);

      MYSQL 8.0 将密码验证方式改为caching_sha2_password,而以前的板本是mysql_native_password

      以下使用 + 代表在该节点下新增一行

      1
      2
      3
      [mysqld]
      #......
      + default_authentication_plugin=mysql_native_password
  4. 重启 msyql 实例;

    1
    docker restart mysql
  5. 查看是否重启成功;

    1
    docker ps

    如果容器启动失败,使用 docker logs [容器名/容器ID] 来查看错误日志

Master 开启 binlog 日志

  1. 查看 Docker 中的 Mysql 是否开启 binlog;

    1
    show variables like 'log_bin'
  2. 如果显示未开启则需要修改配置文件开启(Docker 中最新版 mysql 默认是开启的);

    liuduix/mysql/conf/my.cnf

    1
    2
    3
    4
    5
    [mysqld]
    #...
    + log-bin=mysql-bin # 打开 binlog mysql-bin 为文件名
    + binlog-format=ROW # 选择ROW(行)模式
    + server_id=1 # 不要和 canal 的 slaveId 重复即可

    binlog-format 三种模式

    • ROW:记录每次操作后每行的变化,占用空间大
    • STATEMENT:记录每一次执行写操作的语句,可能数据不一致
    • MIXED:混合日志,默认是 STATMENT 模式,特殊情况时会使用 ROW 模式
  3. 重启 mysql,查看是否开启 binlog 和配置是否成功;

    1
    docker restart mysql
    1
    show variables like `log_bin`
    1
    2
    # 查看 binlog 模式是否为行级模式
    show variables like 'binlog_format'
  4. Mysql master 中新建用于 canal 访问的账户;

    1
    2
    3
    4
    5
    6
    # 新建用户 用户名:canal  密码:canal 
    CREATE USER canal IDENTIFIED by 'canal';
    # 授权
    GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
    # 刷新MySQL的系统权限表
    FLUSH PRIVILEGES;

    查看用户表(主要查看 Host 是否为 %,如果是 localhost 代表仅支持本地访问);

    1
    mysql->user

Docker 安装 Canal

  1. 拉取镜像;

    1
    docker pull canal/canal-server
  2. 先启动 canal ,获取配置文件用于后续挂载;

    • 启动 canal

      1
      docker run --name canal -d canal/canal-server
    • 复制出配置文件

      1
      docker cp canal:/home/admin/canal-server/conf/example/instance.properties /liuduix/canal/conf/
  3. 修改该配置文件;

    /liuduix/canal/conf/instance.properties

    1
    2
    3
    4
    5
    6
    7
    8
    # 改为自己的数据库地址
    canal.instance.master.address=192.168.225.132:3306
    # 改为用于 canal 登录的账户名和密码
    canal.instance.dbUsername=canal
    canal.instance.dbPassword=canal
    # 同步的数据库表规则,默认是匹配所有库所有表,不需要修改
    # 此过滤规则仅针对 ROW 模式有效
    canal.instance.filter.regex=.*\\..*
  4. 移除容器;

    1
    2
    docker stop canal
    docker rm canal
  5. 创建新的 canal 容器并挂载数据和配置;

    1
    2
    3
    docker run --name canal -d -p 11111:11111 \
    -v /liuduix/canal/conf/instance.properties:/home/admin/canal-server/conf/example/instance.properties \
    canal/canal-server

测试

  1. 本地和远程 Mysql 的相同库中创建结构相同的表;

    1
    2
    3
    4
    5
    6
    7
    8
    CREATE TABLE `members`  (
    `id` varchar(20) NOT NULL PRIMARY KEY COMMENT '用户id',
    `user_name` varchar(25) COMMENT '用户名',
    `age` int(0) COMMENT '年龄',
    `gmt_create` date COMMENT '创建时间',
    `gmt_modified` date COMMENT '修改时间',
    PRIMARY KEY (`id`) USING BTREE
    ) ENGINE = InnoDB CHARACTER SET = utf8mb4;
  2. 引入依赖;

    1
    2
    3
    4
    5
    6
    <!--canal-->
    <dependency>
    <groupId>com.alibaba.otter</groupId>
    <artifactId>canal.client</artifactId>
    <version>1.1.0</version>
    </dependency>
  3. 创建 CanalClient 类;

    client/CanalClient

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
    124
    125
    126
    127
    128
    129
    130
    131
    132
    133
    134
    135
    136
    137
    138
    139
    140
    141
    142
    143
    144
    145
    146
    147
    148
    149
    150
    151
    152
    153
    154
    155
    156
    157
    158
    159
    160
    161
    162
    163
    164
    165
    166
    167
    168
    169
    170
    171
    172
    173
    174
    175
    176
    177
    178
    179
    180
    181
    182
    183
    184
    185
    186
    187
    188
    189
    190
    191
    192
    193
    194
    195
    196
    197
    198
    199
    200
    201
    202
    203
    204
    205
    206
    207
    208
    209
    210
    211
    212
    213
    214
    215
    216
    217
    218
    219
    220
    package com.liu.client;

    import com.alibaba.otter.canal.client.CanalConnector;
    import com.alibaba.otter.canal.client.CanalConnectors;
    import com.alibaba.otter.canal.protocol.CanalEntry.*;
    import com.alibaba.otter.canal.protocol.Message;
    import com.google.protobuf.InvalidProtocolBufferException;
    import org.apache.commons.dbutils.DbUtils;
    import org.apache.commons.dbutils.QueryRunner;
    import org.springframework.stereotype.Component;

    import javax.annotation.Resource;
    import javax.sql.DataSource;
    import java.net.InetSocketAddress;
    import java.sql.Connection;
    import java.sql.SQLException;
    import java.util.List;
    import java.util.Queue;
    import java.util.concurrent.ConcurrentLinkedQueue;

    /**
    * @author liuduix
    */
    @Component
    public class CanalClient {
    //sql队列
    private Queue<String> SQL_QUEUE = new ConcurrentLinkedQueue<>();
    @Resource
    private DataSource dataSource;

    /**
    * canal入库方法
    */
    public void run() {
    CanalConnector connector = CanalConnectors.newSingleConnector(new
    InetSocketAddress("192.168.225.130",
    11111), "example", "", "");
    int batchSize = 1000;
    try {
    connector.connect();
    connector.subscribe(".*\\..*");
    connector.rollback();
    try {
    while (true) {
    //尝试从master那边拉去数据batchSize条记录,有多少取多少
    Message message = connector.getWithoutAck(batchSize);
    long batchId = message.getId();
    int size = message.getEntries().size();
    if (batchId == -1 || size == 0) {
    Thread.sleep(1000);
    } else {
    dataHandle(message.getEntries());
    }
    connector.ack(batchId);
    //当队列里面堆积的sql大于一定数值的时候就模拟执行
    if (SQL_QUEUE.size() >= 1) {
    executeQueueSql();
    }
    }
    } catch (InterruptedException e) {
    e.printStackTrace();
    } catch (InvalidProtocolBufferException e) {
    e.printStackTrace();
    }
    } finally {
    connector.disconnect();
    }
    }

    /**
    * 模拟执行队列里面的sql语句
    */
    public void executeQueueSql() {
    int size = SQL_QUEUE.size();
    for (int i = 0; i < size; i++) {
    String sql = SQL_QUEUE.poll();
    System.out.println("[sql]----> " + sql);
    this.execute(sql.toString());
    }
    }

    /**
    * 数据处理
    *
    * @param entrys
    */
    private void dataHandle(List<Entry> entrys) throws
    InvalidProtocolBufferException {
    for (Entry entry : entrys) {
    if (EntryType.ROWDATA == entry.getEntryType()) {
    RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
    EventType eventType = rowChange.getEventType();
    if (eventType == EventType.DELETE) {
    saveDeleteSql(entry);
    } else if (eventType == EventType.UPDATE) {
    saveUpdateSql(entry);
    } else if (eventType == EventType.INSERT) {
    saveInsertSql(entry);
    }
    }
    }
    }

    /**
    * 保存更新语句
    *
    * @param entry
    */
    private void saveUpdateSql(Entry entry) {
    try {
    RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
    List<RowData> rowDatasList = rowChange.getRowDatasList();
    for (RowData rowData : rowDatasList) {
    List<Column> newColumnList = rowData.getAfterColumnsList();
    StringBuffer sql = new StringBuffer("update " +
    entry.getHeader().getTableName() + " set ");
    for (int i = 0; i < newColumnList.size(); i++) {
    sql.append(" " + newColumnList.get(i).getName()
    + " = '" + newColumnList.get(i).getValue() + "'");
    if (i != newColumnList.size() - 1) {
    sql.append(",");
    }
    }
    sql.append(" where ");
    List<Column> oldColumnList = rowData.getBeforeColumnsList();
    for (Column column : oldColumnList) {
    if (column.getIsKey()) {
    //暂时只支持单一主键
    sql.append(column.getName() + "=" + column.getValue());
    break;
    }
    }
    SQL_QUEUE.add(sql.toString());
    }
    } catch (InvalidProtocolBufferException e) {
    e.printStackTrace();
    }
    }

    /**
    * 保存删除语句
    *
    * @param entry
    */
    private void saveDeleteSql(Entry entry) {
    try {
    RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
    List<RowData> rowDatasList = rowChange.getRowDatasList();
    for (RowData rowData : rowDatasList) {
    List<Column> columnList = rowData.getBeforeColumnsList();
    StringBuffer sql = new StringBuffer("delete from " +
    entry.getHeader().getTableName() + " where ");
    for (Column column : columnList) {
    if (column.getIsKey()) {
    //暂时只支持单一主键
    sql.append(column.getName() + "=" + column.getValue());
    break;
    }
    }
    SQL_QUEUE.add(sql.toString());
    }
    } catch (InvalidProtocolBufferException e) {
    e.printStackTrace();
    }
    }

    /**
    * 保存插入语句
    *
    * @param entry
    */
    private void saveInsertSql(Entry entry) {
    try {
    RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
    List<RowData> rowDatasList = rowChange.getRowDatasList();
    for (RowData rowData : rowDatasList) {
    List<Column> columnList = rowData.getAfterColumnsList();
    StringBuffer sql = new StringBuffer("insert into " +
    entry.getHeader().getTableName() + " (");
    for (int i = 0; i < columnList.size(); i++) {
    sql.append(columnList.get(i).getName());
    if (i != columnList.size() - 1) {
    sql.append(",");
    }
    }
    sql.append(") VALUES (");
    for (int i = 0; i < columnList.size(); i++) {
    sql.append("'" + columnList.get(i).getValue() + "'");
    if (i != columnList.size() - 1) {
    sql.append(",");
    }
    }
    sql.append(")");
    SQL_QUEUE.add(sql.toString());
    }
    } catch (InvalidProtocolBufferException e) {
    e.printStackTrace();
    }
    }

    /**
    * 入库
    *
    * @param sql
    */
    public void execute(String sql) {
    Connection con = null;
    try {
    if (null == sql) return;
    con = dataSource.getConnection();
    QueryRunner qr = new QueryRunner();
    int row = qr.execute(con, sql);
    System.out.println("update: " + row);
    } catch (SQLException e) {
    e.printStackTrace();
    } finally {
    DbUtils.closeQuietly(con);
    }
    }
    }
  4. 创建启动类;

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    @SpringBootApplication
    public class CanalDemoApplication implements CommandLineRunner {

    @Resource
    private CanalClient canalClient;

    public static void main(String[] args) {
    SpringApplication.run(CanalDemoApplication.class, args);
    }

    @Override
    public void run(String... args) throws Exception {
    //项目启动,执行canal客户端监听
    canalClient.run();
    }
    }
  5. 测试;

    向 Docker 中的 Mysql 添加一条数据,看见控制台打印「Update 1」,同时本机的 Mysql(Canal 伪装的 slave)添加了一条数据。

参考文章