当前位置: 首页>数据库>正文

hive 的 mapreduce原理 hive.mapred.local.mem

1、 hive.fetch.task.conversion=more 该属性修改为 more 以后,在全局查找、字段查找、limit 查找等都不走mapreduce

2、 当输入数据量很小的时候, 查询触发执行任务时消耗可能会比实际 job 的执行时间要多的多, Hive可以通过本地模式在单台机器上处理所有的任务。对于小数据集,执行时间可以明显被缩短

//开启本地 mr
hive.exec.mode.local.auto=true 

// 设置 local mr 的最大输入数据量,当输入数据量小于这个值时采用 local mr 的方式,默认为 134217728,即 128M
hive.exec.mode.local.auto.inputbytes.max=50000000;

// 设置 local mr 的最大输入文件个数,当输入文件个数小于这个值时采用 local mr 的方式,默认为 4
hive.exec.mode.local.auto.input.files.max=10;

3、 有时 join 超时是因为某些 key 对应的数据太多,而相同 key 对应的数据都会发送到相同的 reducer 上,从而导致内存不够。
所以应该首先处理掉这些异常值, 比如过滤key为空值,或者为空值的key加上特殊标识+随机数的方式,然后到reduce阶段再去掉标识进行聚合一次操作。

4、 MapJoin操作,当在Reducer端聚合的时候可能会由于Key倾斜太多导致某几个Key处理很慢。MapJoin的时候会把小表广播到各个Map端内存中出缓存直接在Map端进行join操作。

// 开启Mapper端join,默认为 true
hive.auto.convert.join = true; 

// 大表小表的阀值设置(默认 25M 一下认为是小表):
hive.mapjoin.smalltable.filesize=25000000;

5、 Group BY 操作。默认情况下,Map 阶段同一 Key 数据分发给一个 reduce,当一个 key 数据过大时就倾斜了。

// 开启Map端join操作
hive.map.aggr = true

// 在 Map 端进行聚合操作的条目数目
hive.groupby.mapaggr.checkinterval = 100000

// 当选项设定为 true,生成的查询计划会有两个 MR Job。第一个 MR Job 中,Map 的输出结果会随机分布到 Reduce 中,每个 Reduce 做部分聚合操作,并输出结果,这样处理的结
// 果是相同的 Group By Key 有可能被分发到不同的 Reduce 中,从而达到负载均衡的目的;第二个 MR Job 再根据预处理的数据结果按照 Group By Key 分布到 Reduce 中(这个过程可以
// 保证相同的 Group By Key 被分布到同一个 Reduce 中),最后完成最终的聚合操作。
hive.groupby.skewindata = true

6、 Count(Distinct) 去重统计

数据量小的时候无所谓,数据量大的情况下,由于 COUNT DISTINCT 操作需要用一个Reduce Task 来完成,这一个 Reduce 需要处理的数据量太大,就会导致整个 Job 很难完成,
一般 COUNT DISTINCT 使用先 GROUP BY 再 COUNT 的方式替换

7、 尽量避免笛卡尔积,join 的时候不加 on 条件,或者无效的 on 条件,Hive 只能使用 1个 reducer 来完成笛卡尔积

8、 行列过滤

列处理:在 SELECT 中,只拿需要的列,如果有,尽量使用分区过滤,少用 SELECT *。

行处理:在分区剪裁中,当使用外关联时,如果将副表的过滤条件写在 Where 后面,那么就会先全表关联,之后再过滤。

9、 动态分区调整 

// 开启动态分区功能(默认 true,开启)
hive.exec.dynamic.partition=true

// 设置为非严格模式(动态分区的模式,默认 strict,表示必须指定至少一个分区为, 静态分区,nonstrict 模式表示允许所有的分区字段都可以使用动态分区。)
hive.exec.dynamic.partition.mode=nonstrict

// 在所有执行 MR 的节点上,最大一共可以创建多少个动态分区
hive.exec.max.dynamic.partitions=1000

// 在每个执行 MR 的节点上,最大可以创建多少个动态分区。该参数需要根据实际的数据来设定。
hive.exec.max.dynamic.partitions.pernode=100

// 整个 MR Job 中,最大可以创建多少个 HDFS 文件。
hive.exec.max.created.files=100000

// 当有空分区生成时,是否抛出异常。一般不需要设置。
hive.error.on.empty.partition=false

10、调整Map任务数,由于Hive的一个Map对应于一个BlockSize大小的文件,所以我们可以通过修改这个值增大或者减小Map数据。

// 小文件合并
hive.input.format= org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;

// 修改每个切片的最大的大小
mapreduce.input.fileinputformat.split.maxsize=100;

11、 调整Reduce任务数目

// 每个 Reduce 处理的数据量默认是 256MB
hive.exec.reducers.bytes.per.reducer=256000000

// 每个任务最大的 reduce 数,默认为 1009
hive.exec.reducers.max=1009

11、 并行执行

//打开任务并行执行
hive.exec.parallel=true; 

//同一个 sql 允许最大并行度,默认为 8。
hive.exec.parallel.thread.number=16;

12、 JVM 重用,JVM重用可以使得 JVM 实例在同一个 job 中重新使用 N 次。

<property>
 <name>mapreduce.job.jvm.numtasks</name>
 <value>10</value>
 <description>How many tasks to run per jvm. If set to -1, there is no limit.
 </description>
</property>

这个功能的缺点是,开启 JVM 重用将一直占用使用到的 task 插槽,以便进行重用,直到任务完成后才能释放。如果某个“不平衡的”job 中有某几个 reduce task 执行的时间要比其
他 Reduce task 消耗的时间多的多的话,那么保留的插槽就会一直空闲着却无法被其他的 job使用,直到所有的 task 都结束了才会释放。

13、推测执行

Hadoop 采用了推测执行(Speculative Execution)机制,它根据一定的法则推测出“拖后腿”的任务,并为这样的任务启动一个备份任务,
让该任务与原始任务同时处理同一份数据,并最终选用最先成功运行完成任务的计算结果作为最终结果。

<property>
 <name>mapreduce.map.speculative</name>
 <value>true</value>
 <description>If true, then multiple instances of some map tasks may be executed in parallel.</description>
</property>

<property>
 <name>mapreduce.reduce.speculative</name>
 <value>true</value>
 <description>If true, then multiple instances of some reduce tasks may be executed in parallel.</description>
</property>

14、小文件合并

当数据表格式为ORC的分区表的时候可以通过以下命令进行小文件合并:

# 该命令会把小的orc文件合并成单个文件最大的大小,减少小文件的数目
alter table dwd.dwd_pass_log_1min_analysis_data_di partition (dt=20180101) concatenate;

15、sqoop导入数据到hive的orc分区表中

sqoop import \
--connect "$mysql_db_con" \
--username "$mysql_db_user" \
--password "$mysql_db_pwd" \
--query "$query_sql" \  // 自定义sql查询,注意: 必须加 and $CONDITIONS 类似于 1=1这种写法
--fields-terminated-by "1" \  // 导出的字段分隔符为1
--lines-terminated-by "\n" \ // 导出数据行分隔符
--escaped-by '$' \  
--hive-import \
--hive-overwrite \  // 覆盖写的方式
--hive-database "dwd" \ // 写入的库名
--hive-table "dwd_paas_log_1min_analysis_data_di_2020" \ // 将要写入的表名
--hive-partition-key "dt" \  // 分区键
--hive-partition-value "$dt" \  // 分区的值
--split-by "$split_id" \  // 数据并行导出的键
--target-dir "$table_dir" \  // 数据导出时候的临时目录
--m 12 \  // 数据导出并行度
--delete-target-dir \  // 导出完成后删除临时目录
--hive-drop-import-delims \
--null-string '\N' \
--null-non-string '\N'

 


https://www.xamrdz.com/database/6cc1921948.html

相关文章: