当前位置: 首页>大数据>正文

Seatunel+flinkCDC完成mysql 到tidb的数据同步

准备工作

本次验证用两种数据库(mysql,tidb ),工具 flink,seatunel

source 准备

准备两个数据源,一个扮演source ,一个扮演sink,source mysql 开启binlog

  1. 编辑mysql 配置文件开启binlog
vim /etc/mysql/my.cnf
  1. 添加配置
[mysqld]
log-bin=mysql-bin
#配置serverid
server-id=1

  1. 监控是否开启了binlog
show master status
  1. 创建测试表
CREATE DATABASE test;

CREATE TABLE test.users (
  id bigint PRIMARY KEY AUTO_INCREMENT,
  name varchar(20) NULL,
  birthday timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP
);

INSERT INTO test.users (name) VALUES ('hello');
INSERT INTO test.users (name) VALUES ('world');
INSERT INTO test.users (name) VALUES ('flink');

sink准备(本地mysql,tidb)

//正常应该是tidb,这里简化部署使用mysql
mysql -h127.0.0.1 -uroot -p123456

创建数据

CREATE DATABASE test;

CREATE TABLE test.users (
  id bigint PRIMARY KEY AUTO_INCREMENT,
  name varchar(20) NULL,
  birthday timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP
);

安装( 准备linux ubuntu >16)

flink

cd /opt
wget https://repo.huaweicloud.com/apache/flink/flink-1.13.3/flink-1.13.3-bin-scala_2.11.tgz
tar xf flink-1.13.3-bin-scala_2.11.tgz
cd flink-1.13.3

wget -P ./lib/ https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-elasticsearch7_2.11/1.13.3/flink-sql-connector-elasticsearch7_2.11-1.13.3.jar | \
wget -P ./lib/ https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc_2.11/1.13.3/flink-connector-jdbc_2.11-1.13.3.jar | \
wget -P ./lib/ https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/2.1.1/flink-sql-connector-mysql-cdc-2.1.1.jar | \
wget -P ./lib/ https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.22/mysql-connector-java-8.0.22.jar

编译使用seatunnel

cd /opt/flink-1.13.3
git clone git@github.com:apache/incubator-seatunnel.git
cd incubator-seatunnel
git reset --hard d2dc8bf
mvn clean install -DskipTests

tar xf seatunnel-dist/target/seatunnel-dist-2.0.5-SNAPSHOT-2.11.8-bin.tar.gz
cd seatunnel-dist-2.0.5-SNAPSHOT-2.11.8

创建cdc过程 flinksql

cd /opt/flink-1.13.3/seatunnel-dist-2.0.5-SNAPSHOT-2.11.8
tee config/application.conf <<-'EOF'
SET 'table.dml-sync' = 'true';
SET 'parallelism.default' = '1';
SET 'execution.checkpointing.interval' = '5sec';

CREATE TABLE mysql_users (
  id BIGINT PRIMARY KEY NOT ENFORCED,
  name STRING,
  birthday TIMESTAMP(3)
) WITH (
  'connector' = 'mysql-cdc',
  'hostname' = '101.43.180.11',
  'port' = '3306',
  'username' = 'admin',
  'password' = 'admin123',
  'database-name' = 'test',
  'table-name' = 'users'
);

CREATE TABLE tidb_users (
  id BIGINT PRIMARY KEY NOT ENFORCED,
  name STRING,
  birthday TIMESTAMP(3)
) WITH (
  'connector'= 'jdbc',
  'driver' = 'com.mysql.cj.jdbc.Driver',
  'url' = 'jdbc:mysql://localhost:3306/tidb',
  'username' = 'root',
  'password' = '123456',
  'table-name' ='users'
);

INSERT INTO tidb_users SELECT * FROM mysql_users;
EOF

启动seatunel

bin/start-seatunnel-sql.sh --target local -Drest.port=8081 -Dtaskmanager.numberOfTaskSlots=1

测试

source mysql 执行增删改查脚本

-- INSERT INTO `test`.`users` (`id`, `name`, `birthday`) VALUES ('4', 'gaoc', '2022-09-21 14:00:27');
-- update test.users set name='gaoc' where id =4;
-- delete from users where id=4;

查看sink tidb(mysql),观察数据是否同步发生变化

select * from users;

https://www.xamrdz.com/bigdata/74y1995686.html

相关文章: