当前位置: 首页>数据库>正文

es 在 获取id的值 es数据抽取到hive

1、背景

最近公司要搭建一个大数据ETL平台,过程涉及一些测试工作,在测试过程中,将一些可用性强的步骤给记录下来,方便后面开发作业的时候,拿来直接 copy
数据来源于HIve(一些加工好的指标),需要存储至ES(用于接口查询),如此离线数据ETL自然会想到阿里的DataX啦

2、环境介绍

  • 作业流程:从HIve中读取数据,直接写入至ES库表中
  • 本机环境:Linux(7.9)、DataX(3.0)、Hadoop(3.1.3)、Hive(3.1.2)、ES(7.8.0)
  • 集群节点:3台(node01、node02、node03),ES安装在node03(单节点)

3、任务准备

3.1 查看官网支持数据源

可以看出,官网支持的数据源还是很多的,如图,但是ES貌似只支持写入操作

es 在 获取id的值 es数据抽取到hive,es 在 获取id的值 es数据抽取到hive_hive,第1张

3.2 Hive读配置

官网地址:https://github.com/alibaba/DataX/blob/master/hdfsreader/doc/hdfsreader.md

{
    "job": {
        "setting": {
            "speed": {
                "channel": 3
            }
        },
        "content": [
            {
                "reader": {
                    "name": "hdfsreader",
                    "parameter": {
                        "path": "/user/hive/warehouse/mytable01/*",
                        "defaultFS": "hdfs://xxx:port",
                        "column": [
                               {
                                "index": 0,
                                "type": "long"
                               },
                               {
                                "index": 1,
                                "type": "boolean"
                               },
                               {
                                "type": "string",
                                "value": "hello"
                               },
                               {
                                "index": 2,
                                "type": "double"
                               }
                        ],
                        "fileType": "orc",
                        "encoding": "UTF-8",
                        "fieldDelimiter": ","
                    }

                },
                "writer": {
                    "name": "streamwriter",
                    "parameter": {
                        "print": true
                    }
                }
            }
        ]
    }
}
3.3 ES写配置

官网地址:https://github.com/alibaba/DataX/blob/master/elasticsearchwriter/doc/elasticsearchwriter.md

{
  "job": {
    "setting": {
        "speed": {
            "channel": 1
        }
    },
    "content": [
      {
        "reader": {
          ...
        },
        "writer": {
          "name": "elasticsearchwriter",
          "parameter": {
            "endpoint": "http://xxx:9999",
            "accessId": "xxxx",
            "accessKey": "xxxx",
            "index": "test-1",
            "type": "default",
            "cleanup": true,
            "settings": {"index" :{"number_of_shards": 1, "number_of_replicas": 0}},
            "discovery": false,
            "batchSize": 1000,
            "splitter": ",",
            "column": [
              {"name": "pk", "type": "id"},
              { "name": "col_ip","type": "ip" },
              { "name": "col_double","type": "double" },
              { "name": "col_long","type": "long" },
              { "name": "col_integer","type": "integer" },
              { "name": "col_keyword", "type": "keyword" },
              { "name": "col_text", "type": "text", "analyzer": "ik_max_word"},
              { "name": "col_geo_point", "type": "geo_point" },
              { "name": "col_date", "type": "date", "format": "yyyy-MM-dd HH:mm:ss"},
              { "name": "col_nested1", "type": "nested" },
              { "name": "col_nested2", "type": "nested" },
              { "name": "col_object1", "type": "object" },
              { "name": "col_object2", "type": "object" },
              { "name": "col_integer_array", "type":"integer", "array":true},
              { "name": "col_geo_shape", "type":"geo_shape", "tree": "quadtree", "precision": "10m"}
            ]
          }
        }
      }
    ]
  }
}
3.4 hive数据准备
-- 建表
create table dwd_mhs_opt_register_hive (
	global_id            string
	,patient_id           string
	,med_org_id           string
	,med_org_name         string
	,register_dept_id     string
	,register_dept_name   string
	,register_date        date
	,visit_id             string
	,register_id          string
	,register_time        timestamp
	,fee_register         decimal(18,2)
	,patient_name         string
	,id_card              string
	,id_card_type         string
	,gender               string
	,birthday             string
	,patient_phone        string
	,visit_card_no        string
	,visit_card_type      string
	,ds_date              date
);
-- 插入数据
insert into dwd_mhs_opt_register_hive values ('65a2344e241996aeb2b0f636d1cf2615','o_420000000002_611117','420000000002','绣林卫生院','102','门诊全科','2022-09-22','611117','611117','2022-09-22 09:28:32','4.5','张三丰','428001195208085237','01','1','1952-08-08','13522193151','A132193151','01','2022-09-22');
insert into dwd_mhs_opt_register_hive values ('aa0664fd7a8db6e3281666340223914d','o_420000003016_221375','420000003016','朱坡卫生院','102','门诊全科','2022-09-22','221375','221375','2022-09-22 09:38:32','4.5','宋远桥','428001198208105257','01','1','1982-08-10','13522293152','A135220152','01','2022-09-22');
insert into dwd_mhs_opt_register_hive values ('71590ce8bedd4a1bbb56fe1fcea0597f','o_420000000282_347902','420000000282','庞公卫生院','102','门诊全科','2022-09-22','347902','347902','2022-09-22 09:48:32','4.5','殷素素','428001198508185745','01','2','1985-08-18','13522393153','A135783153','01','2022-09-22');
-- 查询数据
select i.* from dwd_mhs_opt_register_hive i ;

es 在 获取id的值 es数据抽取到hive,es 在 获取id的值 es数据抽取到hive_hadoop_02,第2张

4、任务测试

4.1 配置文件准备

在datax的conf目录下创建配置文件:

es 在 获取id的值 es数据抽取到hive,es 在 获取id的值 es数据抽取到hive_elasticsearch_03,第3张

配置内容:

{
    "job": {
        "setting": {
            "speed": {
                "channel": 1
            }
        },
        "content": [
            {
                "reader": {
                    "name": "hdfsreader",
                    "parameter": {
                        "path": "/user/hive/warehouse/test.db/dwd_mhs_opt_register_hive",
                        "defaultFS": "hdfs://node01:8020",
                        "column": [
                            {
                                "index": 0,
                                "type": "string"
                            },
                            {
                                "index": 1,
                                "type": "string"
                            },
                            {
                                "index": 2,
                                "type": "string"
                            },
                            {
                                "index": 3,
                                "type": "string"
                            },
                            {
                                "index": 4,
                                "type": "string"
                            },
                            {
                                "index": 5,
                                "type": "string"
                            },
                            {
                                "index": 6,
                                "type": "date"
                            },
                            {
                                "index": 7,
                                "type": "string"
                            },
                            {
                                "index": 8,
                                "type": "string"
                            },
                            {
                                "index": 9,
                                "type": "date"
                            },
                            {
                                "index": 10,
                                "type": "double"
                            },
                            {
                                "index": 11,
                                "type": "string"
                            },
                            {
                                "index": 12,
                                "type": "string"
                            },
                            {
                                "index": 13,
                                "type": "string"
                            },
                            {
                                "index": 14,
                                "type": "string"
                            },
                            {
                                "index": 15,
                                "type": "string"
                            },
                            {
                                "index": 16,
                                "type": "string"
                            },
                            {
                                "index": 17,
                                "type": "string"
                            },
                            {
                                "index": 18,
                                "type": "string"
                            },
                            {
                                "index": 19,
                                "type": "date"
                            }
                        ],
                        "fileType": "text",
                        "encoding": "UTF-8",
                        "fieldDelimiter": "\u0001"
                    }
                },
                "writer": {
                    "name": "elasticsearchwriter",
                    "parameter": {
                        "endpoint": "http://node03:9200",
                        "accessId": "root",
                        "accessKey": "root",
                        "index": "register2",
                        "type": "default",
                        "cleanup": true,
                        "settings": {"index" :{"number_of_shards": 1, "number_of_replicas": 0}},
                        "discovery": false,
                        "batchSize": 1000,
                        "splitter": ",",
                        "column": [
                            {"name": "global_id", "type": "string" },
                            {"name": "patient_id", "type": "string" },
                            {"name": "med_org_id", "type": "string" },
                            {"name": "med_org_name", "type": "string" },
                            {"name": "register_dept_id", "type": "string" },
                            {"name": "register_dept_name", "type": "string" },
                            {"name": "register_date", "type": "date", "format": "yyyy-MM-dd" },
                            {"name": "visit_id", "type": "string" },
                            {"name": "register_id", "type": "string" },
                            {"name": "register_time", "type": "date" ,"format": "yyyy-MM-dd HH:mm:ss"},
                            {"name": "fee_register", "type": "double" },
                            {"name": "patient_name", "type": "string" },
                            {"name": "id_card", "type": "string" },
                            {"name": "id_card_type", "type": "string" },
                            {"name": "gender", "type": "string" },
                            {"name": "birthday", "type": "string" },
                            {"name": "patient_phone", "type": "string" },
                            {"name": "visit_card_no", "type": "string" },
                            {"name": "visit_card_type", "type": "string" },
                            {"name": "ds_date", "type": "date" ,"format": "yyyy-MM-dd"}
                      ]
                    }
                }
            }
        ]
    }
}
// 注意:我是临时搭了个单节点的ES,没有权限配置,默认给“”会报错,随便给个值即可
// "accessId": "root",
// "accessKey": "root",
4.2 启动任务
# 我是在 conf 目录,执行系列命令即可
../bin/datax.py datax_01_hive2es.json
# 出现下列字符串,则成功,本次成功写入 3 条记录
2022-09-26 14:47:13.030 [job-0] INFO  JobContainer - 
任务启动时刻                    : 2022-09-26 14:47:00
任务结束时刻                    : 2022-09-26 14:47:13
任务总计耗时                    :                 12s
任务平均流量                    :               52B/s
记录写入速度                    :              0rec/s
读出记录总数                    :                   3
读写失败总数                    :                   0

好像时间有点问题,应该是创建索引要指定mapping映射,后续测试一下

es 在 获取id的值 es数据抽取到hive,es 在 获取id的值 es数据抽取到hive_elasticsearch_04,第4张

5、总结

  • DataX的确可以从Hive读数据并写入Hive中,但是时间格式有点问题,需要查看一下原因
  • 官方网站说Hive是根据index来对应ES字段的,使用字段名称是否可以呢,后续跟进一下
  • 当字段比较多的时候,要仔细了,很容易出错
  • 官网没说从ES读数据源,但是经过我后来测试,是可以的,下篇博客案例分享



https://www.xamrdz.com/database/6u61932738.html

相关文章: