准备工作
本次验证用两种数据库(mysql,tidb ),工具 flink,seatunel
source 准备
准备两个数据源,一个扮演source ,一个扮演sink,source mysql 开启binlog
- 编辑mysql 配置文件开启binlog
vim /etc/mysql/my.cnf
- 添加配置
[mysqld]
log-bin=mysql-bin
#配置serverid
server-id=1
- 监控是否开启了binlog
show master status
- 创建测试表
CREATE DATABASE test;
CREATE TABLE test.users (
id bigint PRIMARY KEY AUTO_INCREMENT,
name varchar(20) NULL,
birthday timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP
);
INSERT INTO test.users (name) VALUES ('hello');
INSERT INTO test.users (name) VALUES ('world');
INSERT INTO test.users (name) VALUES ('flink');
sink准备(本地mysql,tidb)
//正常应该是tidb,这里简化部署使用mysql
mysql -h127.0.0.1 -uroot -p123456
创建数据
CREATE DATABASE test;
CREATE TABLE test.users (
id bigint PRIMARY KEY AUTO_INCREMENT,
name varchar(20) NULL,
birthday timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP
);
安装( 准备linux ubuntu >16)
flink
cd /opt
wget https://repo.huaweicloud.com/apache/flink/flink-1.13.3/flink-1.13.3-bin-scala_2.11.tgz
tar xf flink-1.13.3-bin-scala_2.11.tgz
cd flink-1.13.3
wget -P ./lib/ https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-elasticsearch7_2.11/1.13.3/flink-sql-connector-elasticsearch7_2.11-1.13.3.jar | \
wget -P ./lib/ https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc_2.11/1.13.3/flink-connector-jdbc_2.11-1.13.3.jar | \
wget -P ./lib/ https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/2.1.1/flink-sql-connector-mysql-cdc-2.1.1.jar | \
wget -P ./lib/ https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.22/mysql-connector-java-8.0.22.jar
编译使用seatunnel
cd /opt/flink-1.13.3
git clone git@github.com:apache/incubator-seatunnel.git
cd incubator-seatunnel
git reset --hard d2dc8bf
mvn clean install -DskipTests
tar xf seatunnel-dist/target/seatunnel-dist-2.0.5-SNAPSHOT-2.11.8-bin.tar.gz
cd seatunnel-dist-2.0.5-SNAPSHOT-2.11.8
创建cdc过程 flinksql
cd /opt/flink-1.13.3/seatunnel-dist-2.0.5-SNAPSHOT-2.11.8
tee config/application.conf <<-'EOF'
SET 'table.dml-sync' = 'true';
SET 'parallelism.default' = '1';
SET 'execution.checkpointing.interval' = '5sec';
CREATE TABLE mysql_users (
id BIGINT PRIMARY KEY NOT ENFORCED,
name STRING,
birthday TIMESTAMP(3)
) WITH (
'connector' = 'mysql-cdc',
'hostname' = '101.43.180.11',
'port' = '3306',
'username' = 'admin',
'password' = 'admin123',
'database-name' = 'test',
'table-name' = 'users'
);
CREATE TABLE tidb_users (
id BIGINT PRIMARY KEY NOT ENFORCED,
name STRING,
birthday TIMESTAMP(3)
) WITH (
'connector'= 'jdbc',
'driver' = 'com.mysql.cj.jdbc.Driver',
'url' = 'jdbc:mysql://localhost:3306/tidb',
'username' = 'root',
'password' = '123456',
'table-name' ='users'
);
INSERT INTO tidb_users SELECT * FROM mysql_users;
EOF
启动seatunel
bin/start-seatunnel-sql.sh --target local -Drest.port=8081 -Dtaskmanager.numberOfTaskSlots=1
测试
source mysql 执行增删改查脚本
-- INSERT INTO `test`.`users` (`id`, `name`, `birthday`) VALUES ('4', 'gaoc', '2022-09-21 14:00:27');
-- update test.users set name='gaoc' where id =4;
-- delete from users where id=4;
查看sink tidb(mysql),观察数据是否同步发生变化
select * from users;