hive-性能调优

本文知识点

  1. 表连接优化
  2. 用insert into 替换 union all
  3. limit 语句快速出结果
  4. 并行执行
  5. 调整Map 和 Reduce数量
  6. 数据倾斜(对症下药)
  7. 本地模式(数据量小的情况下)
  8. Map 阶段进行join操作
  9. 相关配置信息查看

一. 表连接优化

  1. 小表放前原则
    在编写带有 join 操作的代码语句时,应该将条目少的表/子查询放在 Join 操作符的左边。 因为在 Reduce 阶段,位于 Join 操作符左边的表的内容会被加载进内存,载入条目较少的表 可以有效减少 OOM(out of memory)即内存溢出。所以对于同一个 key 来说,对应的value值小的放前,大的放后,这便是“小表放前”原则。换种说法,就是把重复关联键少的表放在join前面,做关联可以提高join的效率。(hive的较新版本对表连接进行了优化,大表放前放后区别不大)

  2. 使用相同的连接键
    当对3个或者更多个表进行join连接时,如果每个on子句都使用相同的连接键的话,那么只会产生一个MapReduce job。
    如果 Join 的条件不相同,Map-Reduce 的任务数目和 Join 操作的数目是对应的。

  3. 尽量尽早地过滤数据
    减少每个阶段的数据量,对于分区表要加分区,同时只选择需要使用到的字段。

  4. 尽量原子化操作
    尽量避免一个SQL包含复杂逻辑,可以使用中间表来完成复杂的逻辑

二. 用insert into 替换 union all

如果union all的部分个数大于2,或者每个union部分数据量大,应该拆成多个insert into 语句,实际测试过程中,执行时间能提升50%
如:

1
2
3
4
5
insert overwite table tablename partition (dt= ....)  
select ..... from ( select ... from A
union all  select ... from B  
union all select ... from C ) R  
where ...;

可以改写为:

1
2
3
insert into table tablename partition (dt= ....) select .... from A WHERE ...; 
insert into table tablename partition (dt= ....) select .... from B  WHERE ...;
insert into table tablename partition (dt= ....) select .... from C WHERE ...;

三. limit 语句快速出结果

一般情况下,Limit语句还是需要执行整个查询语句,然后再返回部分结果。
有几个配置属性可以开启,避免这种情况—对数据源进行抽样

1
2
3
hive.limit.optimize.enable=true --- 开启对数据源进行采样的功能
hive.limit.row.max.size --- 设置最小的采样容量
hive.limit.optimize.limit.file --- 设置最大的采样样本数

缺点:有可能部分数据永远不会被处理到

四. 并行执行

hive会将一个查询转化为一个或多个阶段,包括:MapReduce阶段、抽样阶段、合并阶段、limit阶段等。默认情况下,一次只执行一个阶段。 不过,如果某些阶段不是互相依赖,是可以并行执行的。

1
2
set hive.exec.parallel=true,可以开启并发执行。
set hive.exec.parallel.thread.number=16; //同一个sql允许最大并行度,默认为8。

会比较耗系统资源。

五. Map 和 Reduce优化

hive查看参数配置值:set 属性名;

1.Map阶段优化

map个数的主要的决定因素有: input的文件总个数,input的文件大小,集群设置的文件块大小(默认128M,不可自定义)。

1
2
3
4
dfs.block.size(deprecated)
dfs.blocksize(new)
默认值:134217728
说明: 这个就是hdfs里一个文件块的大小了,CDH5中默认128M。太大的话会有较少map同时计算,太小的话也浪费可用map个数资源,而且文件太小namenode就浪费内存多。根据需要进行设置。

blocksize划分举例:
a) 假设input目录下有1个文件a,大小为780M,那么hadoop会将该文件a分隔成7个块(6个128m的块和1个12m的块),从而产生7个map数
b) 假设input目录下有3个文件a,b,c,大小分别为10m,20m,130m,那么hadoop会分隔成4个块(10m,20m,128m,2m),从而产生4个map数
即,如果文件大于块大小(128m),那么会拆分,如果小于块大小,则把该文件当成一个块。

map执行时间:map任务启动和初始化的时间+逻辑处理的时间。

1)减少map数
若有大量小文件(小于128M),会产生多个map,处理方法是:

1
2
3
4
5
6
7
8
9
10
set mapreduce.input.fileinputformat.split.minsize.per.node=104857600;    
-- 创建节点本地分区的最小数据字节数,否则数据将合并到机架级别。
默认值:0

set mapreduce.input.fileinputformat.split.minsize.per.rack=104857600;
-- 创建机架本地分区的最小数据字节数,否则数据将合并到全局级别。
默认值:0

set mapreduce.input.fileinputformat.split.maxsize=268435456;
-- 单位为B,设置最大分片大小默认为dfs.blocksize

前面三个参数确定合并文件块的大小,大于文件块大小大于256m的,按照256m来分隔,小于256m,大于100m的,按照100m来分隔,把那些小于100m的(包括小文件和分隔大文件剩下的)进行合并。

设置执行前进行文件合并,在多个文件上创建虚拟分割,并在可能的情况下按公共节点,机架分组:

1
set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;

v1 -> v2

1
2
3
4
In Hadoop 2.X and YARN Architecture:
mapred.max.split.size -> mapreduce.input.fileinputformat.split.maxsize
mapred.min.split.size.per.rack -> mapreduce.input.fileinputformat.split.minsize.per.rack
mapred.min.split.size.per.node -> mapreduce.input.fileinputformat.split.minsize.per.node

2)增加map数
当input的文件都很大,任务逻辑复杂,map执行非常慢的时候,可以考虑增加Map数,来使得每个Map处理的数据量减少,从而提高任务的执行效率。由于mapreduce中没有办法直接控制map数量,所以只能曲线救国,通过设置每个map中处理的数据量进行设置。
所以通过调整 Map输入文件的分片大小即可。

1
set mapreduce.input.fileinputformat.split.maxsize=更大值;(默认为 dfs.blocksize)

根据实际情况,控制map数量需要遵循两个原则:使大数据量利用合适的map数;使单个map任务处理合适的数据量;

2.Reduce阶段优化

调整方式:

1
mapreduce.job.reduces

默认: -1
通过将此属性设置为-1,Hive将自动确定reduce的数量,reduce个数 = InputFileSize / bytes per reducer

设置每一个 reducer 的平均负载字节数:

1
set hive.exec.reducers.bytes.per.reducer = 256000000;

在Hive 0.14.0及更早版本中的默认值为1 GB,即,如果输入大小为10 GB,则将使用10个reducer。
在Hive 0.14.0及更高版本中,默认值为256 MB,即,如果输入大小为1 GB,则将使用4个reducer。
避免设置过多的reduce。

六. 数据倾斜(对症下药)

1.sql脚本原因

count(distinct)是按group by 字段分组,按distinct字段排序,一般这种分布方式是很倾斜的。

解决方法:
日常统计场景中,我们经常会对一段时期内的字段进行消重并统计数量,SQL语句类似于

1
SELECT COUNT( DISTINCT id ) FROM TABLE_NAME WHERE ...;

这条语句是从一个表的符合WHERE条件的记录中统计不重复的id的总数。由于引入了DISTINCT,因此在Map阶段无法利用combine对输出结果消重,必须将id作为Key输出,在Reduce阶段再对来自于不同Map Task、相同Key的结果进行消重,计入最终统计值。
我们看到作业运行时的Reduce Task个数为1,对于统计大数据量时,这会导致最终Map的全部输出由单个的Reduce Task处理。这唯一的Reduce Task需要Shuffle大量的数据,并且进行排序聚合等处理,这使得它成为整个作业的IO和运算瓶颈。
经过上述分析后,我们尝试显式地增大Reduce Task个数来提高Reduce阶段的并发,使每一个Reduce Task的数据处理量控制在适当的值。

1
set mapred.reduce.tasks=20;

调整后我们发现这一参数并没有影响实际Reduce Task个数。原来Hive在处理COUNT这种”全聚合(full aggregates)”计算时,它会忽略用户指定的Reduce Task数,而强制使用1。我们只能采用变通的方法来绕过这一限制。我们利用Hive对嵌套语句的支持,将原来一个MapReduce作业转换为两个作业,在第一阶段选出全部的非重复id,在第二阶段再对这些已消重的id进行计数。这样在第一阶段我们可以通过增大Reduce的并发数,并发处理Map输出。在第二阶段,由于id已经消重,因此COUNT(*)操作在Map阶段不需要输出原id数据,只输出一个合并后的计数即可。这样即使第二阶段Hive强制指定一个Reduce Task,极少量的Map输出数据也不会使单一的Reduce Task成为瓶颈。改进后的SQL语句如下:

1
SELECT COUNT(*) FROM (SELECT DISTINCT id FROM TABLE_NAME WHERE … ) t;

实际运行时,我们发现Hive还对这两阶段的作业做了额外的优化。它将第二个MapReduce作业Map中的Count过程移到了第一个作业的Reduce阶段。这样在第一阶Reduce就可以输出计数值,而不是消重的全部id。这一优化大幅地减少了第一个作业的Reduce输出IO以及第二个作业Map的输入数据量。最终在同样的运行环境下优化后的语句执行只需要原语句20%左右的时间。

2.无效ID在关联时的数据倾斜问题

日志中常会出现信息丢失,比如每日约为 20 亿的全网日志,其中的 user_id 为主 键,在日志收集过程中会丢失,出现主键为 null 的情况,如果取其中的 user_id 和 bmw_users 关联,就会碰到数据倾斜的问题。原因是 Hive 中,主键为 null 值的项会被当做相同的 Key 而分配进同一个计算 Map。

解决方法 1:user_id 为空的不参与关联,子查询过滤 null

1
2
3
SELECT * FROM log a
JOIN bmw_users b ON a.user_id IS NOT NULL AND a.user_id=b.user_id
UNION All SELECT * FROM log a WHERE a.user_id IS NULL

解决方法 2 如下所示:函数过滤 null

1
2
3
SELECT * FROM log a LEFT OUTER
JOIN bmw_users b ON
CASE WHEN a.user_id IS NULL THEN CONCAT('dp_hive',RAND()) ELSE a.user_id END =b.user_id;

我们在工作中总结出:解决方法2比解决方法1效果更好,不但IO少了,而且作业数也少了。解决方法1中log读取两次,job 数为2。解决方法2中 job 数是1。这个优化适合无效 id(比如-99、 ‘’,null 等)产生的倾斜问题。把空值的 key 变成一个字符串加上随机数,就能把倾斜的数据分到不同的Reduce上,从而解决数据倾斜问题。因为空值不参与关联,即使分到不同的Reduce 上,也不会影响最终的结果。

3.不同数据类型关联产生的倾斜问题

比如两张表进行主键关联,表A的主键id是32位字符串类型,表B的主键id是bigint类型,在关联时会将表A的主键转为数值类型,做 hash 来分配 Reduce。这样表1的数据都到一个 Reduce 上了。

解决方法:
将非字符串类型的主键先转为字符串类型。

1
SELECT * FROM A a LEFT OUTER JOIN B b ON a.id=CASE(b.id AS STRING);

4.数据倾斜其他解决方案

  1. set hive.groupby.skewindata=true;这是通用的算法优化,但算法优化有时不能适应特定业务背景,开发人员了解业务,了解数据,可以通过业务逻辑精确有效的解决数据倾斜问题。
  2. 在使用SUM,COUNT,MAX,MIN等UDAF函数时,不怕数据倾斜问题,Hadoop在Map端的汇总合并优化过,使数据倾斜不成问题。

七. 本地模式(数据量小的情况下)

1)理论分析
大多数的Hadoop Job是需要Hadoop提供的完整的可扩展性来处理大数据集的。不过,有时Hive的输入数据量是非常小的。在这种情况下,为查询触发执行任务时消耗可能会比实际job的执行时间要多的多。对于大多数这种情况,Hive可以通过本地模式在单台机器上处理所有的任务。对于小数据集,执行时间可以明显被缩短。

2) 配置
用户可以通过设置hive.exec.mode.local.auto的值为true,来让Hive在适当的时候自动启动这个优化。

1
set hive.exec.mode.local.auto=true;  //开启本地mr

设置local mr的最大输入数据量,当输入数据量小于这个值时采用local mr的方式,默认为134217728,即128M

1
set hive.exec.mode.local.auto.inputbytes.max=134217728;

设置local mr的最大输入文件个数,当输入文件个数小于这个值时采用local mr的方式,默认为4

1
set hive.exec.mode.local.auto.input.files.max=4;

八. Map 阶段进行join操作

是否根据输入小表的大小,自动将 Reduce 端的 Common Join 转化为 Map Join,从而加快大表关联小表的 Join 速度。(默认 false)

1
set hive.auto.convert.join=true;

九. 相关配置信息查看

  1. hadoop核心配置:core-default.xml

  2. hdfs默认配置:hdfs-default.xml

  3. mapreduce默认配置:mapred-default.xml

  4. yarn默认配置yarn-default.xml

  5. hive属性详解

  6. 弃用的属性deprecated properties

参考文章:

Hive性能优化

Hive数据仓库与企业级优化

参考:Hive SQL优化之 Count Distinct