业务背景
将hive中多个表数据同步到clickhouse中提供实时查询,表均2亿条记录。对同步工具的要求一是能够实现抽数时间不宜过长;二是能够自定义控制将数据抽取到clickhouse集群指定的节点实例上。作为一名java开发,自然不想过多依赖Hadoop那一套,网上搜索一番后决定使用seatunnel,通过简单配置化就可以实现数据的抽取。
简介
Apache SeaTunnel (Incubating) 是一个分布式、高性能、易扩展、用于海量数据(离线&实时)同步和转化的数据集成平台。
官方文档:https://interestinglab.github.io/seatunnel-docs/#/
安装
安装比较简单,参考官方文档即可。
配置
config.conf 下述配置是从hive中抽数插入到clickhouse中的配置,数据源是hive的一张表,通过seatunnel插件根据id字段进行分片插入clickhouse集群不同分片。
spark {
spark.sql.catalogImplementation = "hive"
spark.app.name = "hive2clickhouse"
spark.executor.instances = 30
spark.executor.cores = 1
spark.executor.memory = "2g"
spark.ui.port = 13000
}
input {
hive {
pre_sql = "select id,name,create_time from table"
table_name = "table_tmp"
}
}
filter {
convert {
source_field = "data_source"
new_type = "UInt8"
}
org.interestinglab.waterdrop.filter.Slice {
source_table_name = "table_tmp"
source_field = "id"
slice_num = 2
slice_code = 0
result_table_name = "table_8123"
}
org.interestinglab.waterdrop.filter.Slice {
source_table_name = "table_tmp"
source_field = "id"
slice_num = 2
slice_code = 1
result_table_name = "table_8124"
}
}
output {
clickhouse {
source_table_name="table_8123"
host = "ip1:8123"
database = "db_name"
username="username"
password="pwd"
table = "table1"
fields = ["id","name","create_time"]
clickhouse.socket_timeout = 50000
retry_codes = [209, 210]
retry = 3
bulk_size = 500000
}
clickhouse {
source_table_name="table_8124"
host = "ip2:8124"
database = "db_name"
username="username"
password="pwd"
table = "table1"
fields = ["id","name","create_time"]
clickhouse.socket_timeout = 50000
retry_codes = [209, 210]
retry = 3
bulk_size = 500000
}
}
插件开发
package org.interestinglab.waterdrop.filter
import io.github.interestinglab.waterdrop.apis.BaseFilter
import io.github.interestinglab.waterdrop.config.{Config, ConfigFactory}
import org.apache.spark.sql.functions.{col, hash, lit, udf}
import org.apache.spark.sql.{Dataset, Row, SparkSession}
class Slice extends BaseFilter {
var conf: Config = ConfigFactory.empty()
/**
* Set Config.
* */
override def setConfig(config: Config): Unit = {
this.conf = config
}
/**
* Get Config.
* */
override def getConfig(): Config = {
this.conf
}
override def checkConfig(): (Boolean, String) = {
if (!conf.hasPath("source_field")) {
(false, "please specify [source_field] as a non-empty string")
} else if (!conf.hasPath("slice_code")) {
(false, "please specify [slice_code] as a non-empty string")
} else if (!conf.hasPath("slice_num")) {
(false, "please specify [slice_num] as a non-empty string")
} else {
(true, "")
}
}
override def process(spark: SparkSession, df: Dataset[Row]): Dataset[Row] = {
val srcField = conf.getString("source_field")
val sliceCode = conf.getInt("slice_code")
val sliceNum = conf.getInt("slice_num")
df.filter(func(hash(col(srcField)), lit(sliceNum), lit(sliceCode)))
}
val func = udf((s: String, num: Int, target: Int) => {
val moCOde = s.toDouble % num
val absValue = moCOde.toInt.abs
absValue == target
})
}
启动
../bin/start-waterdrop.sh --master local[4] --deploy-mode client --config.conf