a) Table:类似与传统数据库中的Table,每一个Table在Hive中都有一个相应的目录来存储数据。例如:一个表t,它在HDFS中的路径为:/user/hive/warehouse/t。
b) Partition:类似于传统数据库中划分列的索引。在Hive中,表中的一个Partition对应于表下的一个目录,所有的Partition数据都存储在对应的目录中。例如:t表中包含ds和city两个Partition,则对应于ds=2014,city=beijing的HDFS子目录为:/user/hive/warehouse/t/ds=2014/city=Beijing;
需要注意的是,分区列是表的伪列,表数据文件中并不存在这个分区列的数据。
c) Buckets:对指定列计算的hash,根据hash值切分数据,目的是为了便于并行,每一个Buckets对应一个文件。将user列分数至32个Bucket上,首先对user列的值计算hash,比如,对应hash=0的HDFS目录为:/user/hive/warehouse/t/ds=2014/city=Beijing/part-00000;对应hash=20的目录为:/user/hive/warehouse/t/ds=2014/city=Beijing/part-00020。
对于MapReduce操作单元,Hive通过ExecMapper和ExecReducer执行MapReduce任务。
对于Hive语句: INSERT OVERWRITE TABLE read_log_tmp SELECT a.userid,a.bookid,b.author,b.categoryid FROM user_read_log a JOIN book_info b ON a.bookid = b.bookid;
其执行计划为:
图:reduce端join的任务执行流程
1.4 与一般SQL的区别
Hive 视图与一般数据库视图
Hive视图与一般数据库视图作用角色相同,都是基于数据规模缩减或者基于安全机制下的某些条件查询下的数据子集。Hive视图只支持逻辑视图,不支持物化视图,即每次对视图的查询hive都将执行查询任务,因此视图不会带来性能上的提升。作为Hive查询优化的一部分,对视图的查询条件语句和视图的定义查询条件语句将会尽可能的合并成一个条件查询。 Hive索引与一般数据库索引
相比于传统数据库,Hive只提供有限的索引功能,通过在某些字段上建立索引来加速某些操作。通常当逻辑分区太多太细,partition无法满足时,可以考虑建立索引。Hive1.2.1版本目前支持的索引类型有CompactIndexHandler和Bitmap。 CompactIndexHandler压缩索引通过将列中相同的值得字段进行压缩从而减小存储和加快访问时间。需要注意的是Hive创建压缩索引时会将索引数据也存储在Hive表中。对于表tb_index (id int, name string)而言,建立索引后的索引表中默认的三列一次为索引列(id)、hdfs文件地址(_bucketname)、偏移量(offset)。特别注意,offset列类型为array。 Bitmap位图索引作为一种常见的索引,如果索引列只有固定的几个值,那么就可以采用位图索引来加速查询。利用位图索引可以方便的进行AND/OR/XOR等各类计算,Hive0.8版本开始引入位图索引,位图索引在大数据处理方面的应用广泛,比如可以利用bitmap来计算用户留存率(索引做与运算,效率远好于join的方式)。如果Bitmap索引很稀疏,那么就需要对索引压缩以节省存储空间和加快IO。Hive的Bitmap Handler采用的是EWAH(https://github.com/lemire/javaewah)压缩方式。
图:reduce端join的reducer join
Reducer端join无法避免的reduce截断以及传输的大量数据都会给集群网络带来压力,从上图可以看出所有hash(bookid)% reducer_number等于0的key-value对都会通过shuffle被分发到0号reducer,如果分到0号reducer的记录数目远大于其他reducer的记录数目,显然0号的reducer的数据处理量将会远大于其他reducer,因此处理时间也会远大于其他reducer,甚至会带来内存等其他问题,这就是数据倾斜问题。对于join造成的数据倾斜问题我们可以通过设置参数setHive.optimize.skewjoin=true,让hive自己尝试解决join过程中产生的倾斜问题。 3.2 Group by语句
我们对user_read_log表按userid goup by语句来继续探讨数据倾斜问题,首先我们explain group by语句:
explainselect userid,count(*)from user_read_log groupby userid。
图:goupby的执行计划
Group by的执行计划按照userid的hash值分发记录,同时在map端也做了本地reduce,group by的shuffle过程是按照hash(userid)来分发的,实际应用中日志中很多用户都是未注册用户或者未登录,userid字段为空的记录数远大于userid不为空的记录数,当所有的空userid记录都分发到特定某一个reducer后,也会带来严重的数据倾斜问题。造成数据倾斜的主要原因在于分发到某个或某几个reducer的数据量远大于其他reducer的数据量。
对于groupby造成的数据倾斜问题,我们可以通过设置参数 set hive.map.aggr=true (开启map端combiner); set hive.groupby.skewindata=true;
这个参数的作用是做Reduce操作的时候,拿到的key并不是所有相同值给同一个Reduce,而是随机分发,然后Reduce做聚合,做完之后再做一轮MR,拿前面聚合过的数据再算结果。虽然多了一轮MR任务,但是可以有效的减少数据倾斜问题可能带来的危险。(达观数据文辉陈运文) Hive解决数据倾斜
正确的设置Hive参数可以在某种程度上避免的数据倾斜问题,合适的查询语句也可以避免数据倾斜问题。要尽早的过滤数据和裁剪数据,减少后续处理的数据量,使得join key的数据分布较为均匀,将空字段随机赋予值,这样既可以均匀分发倾斜的数据: select userid,namefrom user_info a join( selectcasewhen userid isnullthencast(rand(47)*100000asint) else userid from user_read_log ) b on a.userid = b.userid
如果用户在定义schema的时候就已经预料到表数据可能会存在严重的数据倾斜问题,Hive自0.10.0引入了skew table的概念,如建表语句 CREATETABLE user_read_log (userid int,bookid,…)
SKEWEDBY(userid)ON(null)[STORED AS DIRECTORIES];
需要注意的是,skewtable只是将倾斜特别严重的列的分开存储为不同的文件,每个制定的倾斜值制定为一个文件或者目录,因此在查询的时候可以通过过滤倾斜值来避免数据倾斜问题: select userid,namefrom user_info a join( select userid from user_read_log where pt=’2015’ and userid isnotnull ) b on a.userid = b.userid
可以看出,如果不加过滤条件,倾斜问题还是会存在,通过对skewtable加过滤条件的好处是避免了mapper的表扫描过滤操作。 3.3 Join的物理优化
Hive内部实现了MapJoinResolver(处理MapJoin)、SkewJoinResolver(处理倾斜join)、CommonJoinResolver
(处理普通Join)等类来实现join的查询物理优化(/org/apache/hadoop/hive/ql/optimizer/physical)。 CommonJoinResolver类负责将普通Join转换成MapJoin,Hive通过这个类来实现mapjoin的自动优化。对于表A和表B的join查询,会产生3个分支: