1、背景
最近公司要搭建一个大数据ETL平台,过程涉及一些测试工作,在测试过程中,将一些可用性强的步骤给记录下来,方便后面开发作业的时候,拿来直接 copy
数据来源于HIve(一些加工好的指标),需要存储至ES(用于接口查询),如此离线数据ETL自然会想到阿里的DataX啦
2、环境介绍
- 作业流程:从HIve中读取数据,直接写入至ES库表中
- 本机环境:Linux(7.9)、DataX(3.0)、Hadoop(3.1.3)、Hive(3.1.2)、ES(7.8.0)
- 集群节点:3台(node01、node02、node03),ES安装在node03(单节点)
3、任务准备
3.1 查看官网支持数据源
可以看出,官网支持的数据源还是很多的,如图,但是ES貌似只支持写入操作
3.2 Hive读配置
官网地址:https://github.com/alibaba/DataX/blob/master/hdfsreader/doc/hdfsreader.md
{
"job": {
"setting": {
"speed": {
"channel": 3
}
},
"content": [
{
"reader": {
"name": "hdfsreader",
"parameter": {
"path": "/user/hive/warehouse/mytable01/*",
"defaultFS": "hdfs://xxx:port",
"column": [
{
"index": 0,
"type": "long"
},
{
"index": 1,
"type": "boolean"
},
{
"type": "string",
"value": "hello"
},
{
"index": 2,
"type": "double"
}
],
"fileType": "orc",
"encoding": "UTF-8",
"fieldDelimiter": ","
}
},
"writer": {
"name": "streamwriter",
"parameter": {
"print": true
}
}
}
]
}
}
3.3 ES写配置
官网地址:https://github.com/alibaba/DataX/blob/master/elasticsearchwriter/doc/elasticsearchwriter.md
{
"job": {
"setting": {
"speed": {
"channel": 1
}
},
"content": [
{
"reader": {
...
},
"writer": {
"name": "elasticsearchwriter",
"parameter": {
"endpoint": "http://xxx:9999",
"accessId": "xxxx",
"accessKey": "xxxx",
"index": "test-1",
"type": "default",
"cleanup": true,
"settings": {"index" :{"number_of_shards": 1, "number_of_replicas": 0}},
"discovery": false,
"batchSize": 1000,
"splitter": ",",
"column": [
{"name": "pk", "type": "id"},
{ "name": "col_ip","type": "ip" },
{ "name": "col_double","type": "double" },
{ "name": "col_long","type": "long" },
{ "name": "col_integer","type": "integer" },
{ "name": "col_keyword", "type": "keyword" },
{ "name": "col_text", "type": "text", "analyzer": "ik_max_word"},
{ "name": "col_geo_point", "type": "geo_point" },
{ "name": "col_date", "type": "date", "format": "yyyy-MM-dd HH:mm:ss"},
{ "name": "col_nested1", "type": "nested" },
{ "name": "col_nested2", "type": "nested" },
{ "name": "col_object1", "type": "object" },
{ "name": "col_object2", "type": "object" },
{ "name": "col_integer_array", "type":"integer", "array":true},
{ "name": "col_geo_shape", "type":"geo_shape", "tree": "quadtree", "precision": "10m"}
]
}
}
}
]
}
}
3.4 hive数据准备
-- 建表
create table dwd_mhs_opt_register_hive (
global_id string
,patient_id string
,med_org_id string
,med_org_name string
,register_dept_id string
,register_dept_name string
,register_date date
,visit_id string
,register_id string
,register_time timestamp
,fee_register decimal(18,2)
,patient_name string
,id_card string
,id_card_type string
,gender string
,birthday string
,patient_phone string
,visit_card_no string
,visit_card_type string
,ds_date date
);
-- 插入数据
insert into dwd_mhs_opt_register_hive values ('65a2344e241996aeb2b0f636d1cf2615','o_420000000002_611117','420000000002','绣林卫生院','102','门诊全科','2022-09-22','611117','611117','2022-09-22 09:28:32','4.5','张三丰','428001195208085237','01','1','1952-08-08','13522193151','A132193151','01','2022-09-22');
insert into dwd_mhs_opt_register_hive values ('aa0664fd7a8db6e3281666340223914d','o_420000003016_221375','420000003016','朱坡卫生院','102','门诊全科','2022-09-22','221375','221375','2022-09-22 09:38:32','4.5','宋远桥','428001198208105257','01','1','1982-08-10','13522293152','A135220152','01','2022-09-22');
insert into dwd_mhs_opt_register_hive values ('71590ce8bedd4a1bbb56fe1fcea0597f','o_420000000282_347902','420000000282','庞公卫生院','102','门诊全科','2022-09-22','347902','347902','2022-09-22 09:48:32','4.5','殷素素','428001198508185745','01','2','1985-08-18','13522393153','A135783153','01','2022-09-22');
-- 查询数据
select i.* from dwd_mhs_opt_register_hive i ;
4、任务测试
4.1 配置文件准备
在datax的conf目录下创建配置文件:
配置内容:
{
"job": {
"setting": {
"speed": {
"channel": 1
}
},
"content": [
{
"reader": {
"name": "hdfsreader",
"parameter": {
"path": "/user/hive/warehouse/test.db/dwd_mhs_opt_register_hive",
"defaultFS": "hdfs://node01:8020",
"column": [
{
"index": 0,
"type": "string"
},
{
"index": 1,
"type": "string"
},
{
"index": 2,
"type": "string"
},
{
"index": 3,
"type": "string"
},
{
"index": 4,
"type": "string"
},
{
"index": 5,
"type": "string"
},
{
"index": 6,
"type": "date"
},
{
"index": 7,
"type": "string"
},
{
"index": 8,
"type": "string"
},
{
"index": 9,
"type": "date"
},
{
"index": 10,
"type": "double"
},
{
"index": 11,
"type": "string"
},
{
"index": 12,
"type": "string"
},
{
"index": 13,
"type": "string"
},
{
"index": 14,
"type": "string"
},
{
"index": 15,
"type": "string"
},
{
"index": 16,
"type": "string"
},
{
"index": 17,
"type": "string"
},
{
"index": 18,
"type": "string"
},
{
"index": 19,
"type": "date"
}
],
"fileType": "text",
"encoding": "UTF-8",
"fieldDelimiter": "\u0001"
}
},
"writer": {
"name": "elasticsearchwriter",
"parameter": {
"endpoint": "http://node03:9200",
"accessId": "root",
"accessKey": "root",
"index": "register2",
"type": "default",
"cleanup": true,
"settings": {"index" :{"number_of_shards": 1, "number_of_replicas": 0}},
"discovery": false,
"batchSize": 1000,
"splitter": ",",
"column": [
{"name": "global_id", "type": "string" },
{"name": "patient_id", "type": "string" },
{"name": "med_org_id", "type": "string" },
{"name": "med_org_name", "type": "string" },
{"name": "register_dept_id", "type": "string" },
{"name": "register_dept_name", "type": "string" },
{"name": "register_date", "type": "date", "format": "yyyy-MM-dd" },
{"name": "visit_id", "type": "string" },
{"name": "register_id", "type": "string" },
{"name": "register_time", "type": "date" ,"format": "yyyy-MM-dd HH:mm:ss"},
{"name": "fee_register", "type": "double" },
{"name": "patient_name", "type": "string" },
{"name": "id_card", "type": "string" },
{"name": "id_card_type", "type": "string" },
{"name": "gender", "type": "string" },
{"name": "birthday", "type": "string" },
{"name": "patient_phone", "type": "string" },
{"name": "visit_card_no", "type": "string" },
{"name": "visit_card_type", "type": "string" },
{"name": "ds_date", "type": "date" ,"format": "yyyy-MM-dd"}
]
}
}
}
]
}
}
// 注意:我是临时搭了个单节点的ES,没有权限配置,默认给“”会报错,随便给个值即可
// "accessId": "root",
// "accessKey": "root",
4.2 启动任务
# 我是在 conf 目录,执行系列命令即可
../bin/datax.py datax_01_hive2es.json
# 出现下列字符串,则成功,本次成功写入 3 条记录
2022-09-26 14:47:13.030 [job-0] INFO JobContainer -
任务启动时刻 : 2022-09-26 14:47:00
任务结束时刻 : 2022-09-26 14:47:13
任务总计耗时 : 12s
任务平均流量 : 52B/s
记录写入速度 : 0rec/s
读出记录总数 : 3
读写失败总数 : 0
好像时间有点问题,应该是创建索引要指定mapping映射,后续测试一下
5、总结
- DataX的确可以从Hive读数据并写入Hive中,但是时间格式有点问题,需要查看一下原因
- 官方网站说Hive是根据index来对应ES字段的,使用字段名称是否可以呢,后续跟进一下
- 当字段比较多的时候,要仔细了,很容易出错
- 官网没说从ES读数据源,但是经过我后来测试,是可以的,下篇博客案例分享