当前位置: 首页>编程语言>正文

seatunnel 简单使用(原名waterdrop)

业务背景

将hive中多个表数据同步到clickhouse中提供实时查询,表均2亿条记录。对同步工具的要求一是能够实现抽数时间不宜过长;二是能够自定义控制将数据抽取到clickhouse集群指定的节点实例上。作为一名java开发,自然不想过多依赖Hadoop那一套,网上搜索一番后决定使用seatunnel,通过简单配置化就可以实现数据的抽取。

简介

Apache SeaTunnel (Incubating) 是一个分布式、高性能、易扩展、用于海量数据(离线&实时)同步和转化的数据集成平台。

官方文档:https://interestinglab.github.io/seatunnel-docs/#/

安装

安装比较简单,参考官方文档即可。

配置

config.conf 下述配置是从hive中抽数插入到clickhouse中的配置,数据源是hive的一张表,通过seatunnel插件根据id字段进行分片插入clickhouse集群不同分片。

spark {
  spark.sql.catalogImplementation = "hive"
  spark.app.name = "hive2clickhouse"
  spark.executor.instances = 30
  spark.executor.cores = 1 
  spark.executor.memory = "2g"
  spark.ui.port = 13000
}

input {
    hive {
        pre_sql = "select id,name,create_time from table"
        table_name = "table_tmp"
    }
}

filter {
    convert {
        source_field = "data_source"
        new_type = "UInt8"
    }

    org.interestinglab.waterdrop.filter.Slice {
        source_table_name = "table_tmp"
        source_field = "id"
        slice_num = 2
        slice_code = 0
        result_table_name = "table_8123"
    }
    org.interestinglab.waterdrop.filter.Slice {
        source_table_name = "table_tmp"
        source_field = "id"
        slice_num = 2
        slice_code = 1
        result_table_name = "table_8124"
    }
}

output {
    clickhouse {
        source_table_name="table_8123"
        host = "ip1:8123"
        database = "db_name"
        username="username"
        password="pwd"
        table = "table1"
        fields = ["id","name","create_time"]
            clickhouse.socket_timeout = 50000
            retry_codes = [209, 210]
            retry = 3
            bulk_size = 500000
    }
    clickhouse {
        source_table_name="table_8124"
        host = "ip2:8124"
        database = "db_name"
        username="username"
        password="pwd"
        table = "table1"
        fields = ["id","name","create_time"]
            clickhouse.socket_timeout = 50000
            retry_codes = [209, 210]
            retry = 3
            bulk_size = 500000
    }
}

插件开发

package org.interestinglab.waterdrop.filter

import io.github.interestinglab.waterdrop.apis.BaseFilter
import io.github.interestinglab.waterdrop.config.{Config, ConfigFactory}
import org.apache.spark.sql.functions.{col, hash, lit, udf}
import org.apache.spark.sql.{Dataset, Row, SparkSession}


class Slice extends BaseFilter {

  var conf: Config = ConfigFactory.empty()

  /**
   * Set Config.
   * */
  override def setConfig(config: Config): Unit = {
    this.conf = config
  }

  /**
   * Get Config.
   * */
  override def getConfig(): Config = {
    this.conf
  }

  override def checkConfig(): (Boolean, String) = {
    if (!conf.hasPath("source_field")) {
      (false, "please specify [source_field] as a non-empty string")
    } else if (!conf.hasPath("slice_code")) {
      (false, "please specify [slice_code] as a non-empty string")
    } else if (!conf.hasPath("slice_num")) {
      (false, "please specify [slice_num] as a non-empty string")
    } else {
      (true, "")
    }
  }

  override def process(spark: SparkSession, df: Dataset[Row]): Dataset[Row] = {
    val srcField = conf.getString("source_field")
    val sliceCode = conf.getInt("slice_code")
    val sliceNum = conf.getInt("slice_num")

    df.filter(func(hash(col(srcField)), lit(sliceNum), lit(sliceCode)))
  }

  val func = udf((s: String, num: Int, target: Int) => {
    val moCOde = s.toDouble % num
    val absValue = moCOde.toInt.abs
    absValue == target
  })
}

启动

../bin/start-waterdrop.sh --master local[4] --deploy-mode client --config.conf

https://www.xamrdz.com/lan/5kx1996124.html

相关文章: