1、HiveServer2、Beeline、JDBC使用
bin/hiveserver2
bin/beeline
!connect jdbc:hive2://hadoop-senior:10000 hadoop hadoop org.apache.hive.jdbc.HiveDriver/!connect jdbc:hive2://hadoop-senior:10000 hadoop hadoop
HiveServer2 JDBC
将分析的结果存储在hive表(result),前端通过DAO代码,进行数据的查询。
2、Hive中常见的额数据压缩
1>安装snappy
2>编译hadoop 2.x源码(vi /hadoop-2.5.0-src/BUILDING.txt[官方提供的hadoop不支持压缩])
mvn clean package -Pdist,native -DskipTests -Dtar
mvn package -Pdist,native -DskipTests -Dtar -Drequire.snappy
/opt/modules/hadoop-2.5.0-src/target/hadoop-2.5.0/lib/native(替换掉就行)
3>检查
Native libraryNative library checking:
hadoop: true /opt/modules/hadoop-2.5.0/lib/native/libhadoop.so.1.0.0
zlib: true /lib/libz.so.1
snappy: true /opt/modules/hadoop-2.5.0/lib/native/libsnappy.so.1
lz4: true revision:99
bzip2: false
4>数据压缩
bin/yarn jar share/hadoop/mapreduce/hadoop-mapreduce-examples-2.5.0.jar wordcount /user/hadoop/mapreduce/wordcount/input /user/hadoop/mapreduce/wordcount/output
======================================================
bin/yarn jar share/hadoop/mapreduce/hadoop-mapreduce-examples-2.5.0.jar wordcount -Dmapreduce.map.output.compress=true -Dmapreduce.map.output.compress.codec=org.apache.hadoop.io.compress.SnappyCodec /user/hadoop/mapreduce/wordcount/input /user/hadoop/mapreduce/wordcount/output
3、配置演示讲解mapreduce和hive中使用snappy压缩
input -> map -> shuffle -> reduce -> output
数据压缩
数据量小
* 本地磁盘, IO
* 减少 网络IO
通常情况下
block -> map
10G , 10 block
压缩
5G , 5 block
4、Hive Storage format
file_format:
: SEQUENCEFILE
| TEXTFILE -- (Default, depending on hive.default.fileformat configuration)
| RCFILE -- (Note: Available in Hive 0.6.0 and later)
| ORC -- (Note: Available in Hive 0.11.0 and later)
| PARQUET -- (Note: Available in Hive 0.13.0 and later)
| AVRO -- (Note: Available in Hive 0.14.0 and later)
| JSONFILE -- (Note: Available in Hive 4.0.0 and later)
| INPUTFORMAT input_format_classname OUTPUTFORMAT output_format_classname
数据存储:
* 按行存储数据
* 按列存储数据
==========>textfile
create table page_views(
track_time string,
url string,
sessioon_id string,
referer string,
ip string,
end_user_id string,
city_id string
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
STORED AS TEXTFILE;
load data local inpath '/opt/datas/page_views.data' into table page_views;
dfs -du -h /user/hive/warehouse/page_views/;
18.1 M /user/hive/warehouse/page_views/page_views.data
==========>orc
create table page_views_orc(
track_time string,
url string,
sessioon_id string,
referer string,
ip string,
end_user_id string,
city_id string
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
STORED AS orc;
insert into page_views_orc select * from page_views;
dfs -du -h /user/hive/warehouse/page_views_orc/;
2.6 M /user/hive/warehouse/page_views_orc/000000_0
==========>parquet
create table page_views_parquet(
track_time string,
url string,
sessioon_id string,
referer string,
ip string,
end_user_id string,
city_id string
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
STORED AS parquet;
insert into page_views_parquet select * from page_views;
dfs -du -h /user/hive/warehouse/page_views_parquet/;
13.1M /user/hive/warehouse/page_views_parquet/000000_0
select sessioon_id,count(*) cnt from page_views group by sessioon_id order by cnt des limit 30;
select sessioon_id,count(*) cnt from page_views_orc group by sessioon_id order by cnt des limit 30;
----------------------------------
select sessioon_id from page_views limit 30;
select sessioon_id from page_views_orc limit 30;
select sessioon_id from page_views_parquet limit 30;
======================================================================
create table page_views_orc_snappy(
track_time string,
url string,
sessioon_id string,
referer string,
ip string,
end_user_id string,
city_id string
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
STORED AS orc tblproperties("orc.compress"="snappy");
insert into page_views_orc_snappy select * from page_views;
dfs -du -h /user/hive/warehouse/page_views_orc_snappy/;
3.8 M
create table page_views_orc_none(
track_time string,
url string,
sessioon_id string,
referer string,
ip string,
end_user_id string,
city_id string
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
STORED AS orc tblproperties("orc.compress"="NONE");
insert into page_views_orc_none select * from page_views;
dfs -du -h /user/hive/warehouse/page_views_orc_none/;
3.8 M
总结:
在实际的项目开发中,hive表数据
* 存储格式
orcfile / qarquet
* 数据压缩
snappy
5、Hive企业使用优化
1>hive高级优化
hive.conf
属性:hive.fetch.task.conversion
value:minimal/more
Hive企业使用优化二
* 大表【拆分】
子表
* 外部表、分区表
结合使用
多级分区
* 数据
存储格式(textfile、orcfile、parquet)
压缩格式(snappy)
* SQL
优化SQL语句
join,filter
* MapReduce
Reduce Number
JVM重用
推测执行
2>数据倾斜
Common/Shuffle/Reduce JOIN
连接发生的阶段,发生在Reduce Task
大表对大表
* 每个表的数据都是从文件中读取的
Map JOIN
连接发生的阶段,发生在Map Task
小表对大表
* 大表的数据从文件中读取 cid
* 小表的数据内存中 id
DistributedCache
SMB JOIN
Sort-Merge-BUCKET JOIN
customer
3 bucket
1st
1001 - 1101
2nd
1201 - 1401
3rd
1501 - 1901
order
3 bucket
1st
1001 - 1101
2nd
1201 - 1401
3rd
1501 - 1901
hive.auto.convert.join = true
hive.optimize.bucketmapjoin = true
hive.optimize.bucketmapjoin.sortedmerge = true
3>执行计划
EXPLAIN select * from emp;
EXPLAIN select deptno, avg(sal) avg_sal from emp group by deptno;
并行执行
hive.exec.parallel.thread.number = 8
hive.exec.parallel = false
JVM重用
mapreduce.job.jvm.numtasks = 1
Reduce 数目
mapreduce.job.reduces = 1
推测执行
mapreduce.map.speculative = true
hive.mapred.reduce.tasks.speculative.execution = true
mapreduce.reduce.speculative = true
Map数目
hive.merge.size.per.task = 256000000
job1 a join b aa
job2 c join d cc
job3 aa join cc
4>动态分区调整
--动态分区属性:设置为true表示开启动态分区功能(默认为false)
hive.exec.dynamic.partition = true
--动态分区属性:设置为nonstrict,表示允许所有分区都是动态的(默认为strict)
--设置为strict,表示必须保证至少有一个分区是静态的
hive.exec.dynamic.partition.mode = strict
--动态分区属性:每个mapper或reduce可以创建的最大分区个数
hive.exec.max.dynamic.partitions.pernode = 100
--动态分区属性:一个动态分区创建语句可以创建的最大动态分区个数
hive.exec.max.dynamic.partitions = 1000
--动态分区属性:全局可以创建的最大文件个数
hive.exec.max.created.files = 100000
5>strict mode
--对分区表进行查询,在where子句中没有加分区过滤的话,将禁止提交任务(默认为:nonstrict)
set hive.mapred.mode = strict
注:使用严格模式可以禁止3中类型的查询
1>对于分区表,不加分区字段过滤条件,不能执行
2>对于order by 语句,必须使用limit语句
3>限制笛卡尔积的查询(join的时候不适用on,而使用where的)
6、Hive 项目实战一创建并导入日志数据
思路:
* 原表
* 针对不同的业务创建不同的子表
* 数据存储格式orcfile/parquet
* 数据压缩 snappy
* map output 数据压缩 snappy
* 外部表
* 分区表(演示)
create table if not exists default.bf_log_src(
remote_addr string,
remote_user string,
time_local string,
request string,
status string,
body_bytes_sent string,
request_body string,
http_referer string,
http_user_agent string,
http_x_forwarded_for string,
host string
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ' '
stored as textfile;
load data local inpath '/opt/datas/moodle.ibeifeng.access.log' into table default.bf_log_src;
select * from bf_log_src limit 10;//发现数据有问题
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>调整后
drop table if exists default.bf_log_src;
create table if not exists default.bf_log_src(
remote_addr string,
remote_user string,
time_local string,
request string,
status string,
body_bytes_sent string,
request_body string,
http_referer string,
http_user_agent string,
http_x_forwarded_for string,
host string
)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.RegexSerDe'
with SERDEPROPERTIES(
"input.regex" = "(\"[^ ]*\") (\"-|[^ ]*\") (\"[^\]]*\") (\"[^\"]*\") (\"[0-9]*\") (\"[0-9]*\") (-|[^ ]*) (\"[^ ]*\") (\"[^\"]*\") (-|[^ ]*\") (\"[^ ]*\")"
);
(\"[^ ]*\") (\"-|[^ ]*\") (\"[^\]]*\") (\"[^\"]*\") (\"[0-9]*\") (\"[0-9]*\") (-|[^ ]*) (\"[^ ]*\") (\"[^\"]*\") (-|[^ ]*\") (\"[^ ]*\")
项目案例
* 依据业务数据表
* 方式一:原始表bf_log_src, 加载数据(预先处理)
* 方式二:创建正则表RegexSerDe
https://cwiki.apache.org/confluence/display/Hive/GettingStarted
* 数据ETL
拆分表(子表)、数据存储格式
数据预处理ETL(udf,python)
* 数据分析HQL
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
drop table if exists default.bf_log_comm;
create table if not exists default.bf_log_comm(
remote_addr string,
time_local string,
request string,
http_referer string
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
STORED AS ORC TBLPROPERTIES("orc.compress"="SNAPPY");
INSERT INTO TABLE default.bf_log_comm select remote_addr,time_local,request,http_referer from bf_log_src;
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
定义UDF,对原表数据进行清洗
第一个udf
去除引号
add jar /opt/datas/hiveudf2.jar;
create temporary function my_removequotes as "com.beifeng.senior.hive.udf.RemoveQuotesUDF";
insert overwrite table default.bf_log_comm select my_removequotes(remote_addr),my_removequotes(time_local),my_removequotes(request),my_removequotes(http_referer) from default.bf_log_src;
第二个UDF
处理日期时间字段
add jar /opt/datas/hiveudf3.jar;
create temporary function my_datetransform as "com.beifeng.senior.hive.udf.DateTransformUDF";
insert overwrite table default.bf_log_comm select my_removequotes(remote_addr),my_datetransform(my_removequotes(time_local)),my_removequotes(request),my_removequotes(http_referer) from default.bf_log_src;
select * from bf_log_comm limit 5;
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
desc function extended substring;
substring('Facebook', 5, 1)
'b'
下标从1开始计数
select substring('20150831000437', 9, 2) hour from bf_log_comm limit 1;
select t.hour, count(*) cnt from
(select substring(time_local, 9, 2) hour from bf_log_comm) t
group by t.hour order by cnt desc;
----------------
select t.prex_ip, count(*) cnt from
(select substring(remote_addr, 1, 7) prex_ip from bf_log_comm) t
group by t.prex_ip order by cnt desc limit 20;
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
使用脚本python进行数据处理:https://cwiki.apache.org/confluence/display/Hive/GettingStarted