官方文档
canal-php
简介
- canal server伪装自己为 MySQL slave ,向 MySQL master 发送 dump 协议
- MySQL master 收到 dump 请求,开始推送 binary log 给canal server
- canal server 解析 binary log 对象(原始为 byte 流)
- canal client连接上canal server之后可以实时收到MySQL master推送过来的binlog
- canal server也可以把binlog直接推送到MySQL,Hbase,Es,Mongodb,Redis,RabbitMq,RocketMq
前期准备
我本次测试使用的是phpstudy自带的mysql8.0.12,需要先开启 Binlog 写入功能,配置 binlog-format 为 ROW 模式,my.cnf 中配置如下,phpstudy默认配置就是这样,不用修改
[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复
授权 canal 链接 MySQL 账号具有作为 MySQL slave 的权限, 如果已有账户可直接 grant
CREATE USER canal IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;
安装Canal Server
我这里选择用docker安装
//下载脚本
$ wget https://raw.githubusercontent.com/alibaba/canal/master/docker/run.sh
//赋予权限
$ chmod +x run.sh
//构建一个destination name为test的队列
./run.sh
-e canal.auto.scan=false
-e canal.destinations=test
-e canal.instance.master.address=127.0.0.1:3306
-e canal.instance.dbUsername=canal
-e canal.instance.dbPassword=canal
-e canal.instance.connectionCharset=UTF-8
-e canal.instance.tsdb.enable=true
-e canal.instance.gtidon=false
脚本会docker pull最新的canal/canal-server镜像
让我们稍微等几年看看什么情况...
容器跑起来了,默认端口是11111,destinations是test
启动Canal-client
- 获取canal-client
composer require xingwenge/canal_php
- 新建canal.php
<?php
namespace xingwenge\canal_php\sample;
use xingwenge\canal_php\CanalClient;
use xingwenge\canal_php\CanalConnectorFactory;
use xingwenge\canal_php\Fmt;
require_once './vendor/autoload.php';
ini_set('display_errors', 'On');
error_reporting(E_ALL);
try {
$client = CanalConnectorFactory::createClient(CanalClient::TYPE_SOCKET_CLUE);
# $client = CanalConnectorFactory::createClient(CanalClient::TYPE_SWOOLE);
$client->connect("127.0.0.1", 11111);
$client->checkValid();
$client->subscribe("1001", "test", ".*\..*");
# $client->subscribe("1001", "test", "db_name.tb_name"); # 指定某个库某个表
while (true) {
$message = $client->get(100);
if ($entries = $message->getEntries()) {
foreach ($entries as $entry) {
Fmt::println($entry);
}
}
sleep(1);
}
$client->disConnect();
} catch (\Exception $e) {
echo $e->getMessage(), PHP_EOL;
}
-
执行canal.php
去数据库改几条数据看看
收到binlog消息
- 实战成功
什么时候可以使用Canal
mysql和es,redis,mongdb信息同步
可以不用在应用层加代码就可以实现数据同步
mysql 不停机旧表迁移新表
1.canal-server开始监听旧表table的binlog,保存到日志文件canal.log(也可以存到mq)
2.mysqldump备份旧表生成table.sql,记录执行mysqldump的大概时间
3.写代码按新的规则导入table.sql到新表table_new
4.写代码按新的规则消费canal.log到新表table_new(取mysqldump之后的log)
5.比较两个表数据是否一致
6.停止消费canal.log
7.原子操作换名
RENAME TABLE `table` TO `table_old`, `table_new` TO `table`
8.写代码按新的规则继续消费canal.log到table(如果canal.log还有没有同步完的操作)
9.删除table_old