我们都知道,Hive SQL 实际上是翻译为 MapReduce 执行的, 那么它具体过程如何呢?今天我们就来探寻 Hive SQL 背后的执行机制和原理。
进一步理解和掌握 Hive SQL 的执行原理对于平时离线任务的开发和优化非常重要,直接关系到 Hive SQL 的执行效率和时间。
作为基于 Hadoop 主要数据仓库解决方案, Hive SQL 是主要的交互接口,实际的数据保存在 HDFS 文件中,真正的计算和执行则由 MapReduce 完成,它们之间的桥梁是 Hive 引擎。
Hive 主要组件包括 UI 组件、 Driver 组件( Complier Optimizer Executor )、 Metastore 组件、 CLI ( Command Line Interface ,命令行接口)、 JDBC/ODBC 、Thrift Server 和 Hive Web Interface (HWI )等。
Hive 就是通过 CLI 、JDBC / ODBC 或者 HWI 接收相关的 Hive SQL 查询,并通过 Driver 组件进行编译,分析优化,最后变成可执行的 MapReduce。
Hive SQL 是类似于 ANSI SQL 标准的 SQL 语言,但两者又不完全相同。 Hive SQL 和 MySQL 的 SQL 语言最为接近,但两者之间也存在显著差异,比如 Hive 不支持行级数据插人、更新和删除,也不支持事务等。
Hive 中的数据库从本质上来说仅仅是一个目录或者命名空间,但是对于具有很多用户和组的集群来说,这个概念非常有用 。
首先,这样可以避免表命名冲突;其次,它等同于关系型数据库中的数据库概念,是一组表或者表的逻辑组,非常容易理解。
Hive 中的表( Table )和关系数据库中的 table 在概念上是类似的,每个 table 在 Hive 中都有一个相应的目录存储数据,如果没有指定表的数据库,那么 Hive 会通过{HIVE_HOME} /conf/hive-site.xml
配置文件中的 hive.metastore.warehouse.dir
属性来使用默认值(一般是 /user/hive/warehouse
,也可以根据实际的情况来修改这个配置),所有的 table 数据(不包括外部表) 都保存在这个目录中。
Hive 表分为两类,即内部表和外部表。所谓内部表(managed table) 即 Hive 管理的表,Hive 内部表的管理既包含逻辑以及语法上的,也包含实际物理意义上的,即创建 Hive 内部表时,数据将真实存在于表所在的目录内,删除内部表时,物理数据和文件也一并删除。
那么到底是选择内部表还是外部表呢?
大多数情况下,这两者的区别不是很明显。如果数据的所有处理都在 Hive 中进行,那么更倾向于选择内部表。但是如果 Hive 和其他工具针对相同的数据集做处理,那么外部表更合适。
Hive 将表划分为分区(partition),partition 根据分区字段进行。 分区可以让数据的部分查询变得更快 。表或者分区可以进一步被划分为桶( bucket)。 桶通常在原始数据中加入一些额外的结构,这些结构可以用于高效查询。
例如 ,基于用户 ID 的分桶可以使基于用户的查询非常快。
假设日志数据中,每条记录都带有时间戳 。如果根据时间来分区,那么同一天的数据将被划分到同一个分区中。
分区可以通过多个维度来进行。 例如,通过日期划分之后,还可以根据国家进一步划分。
分区在创建表的时候使用 PARTITIONED BY
从句定义,该从句接收一个字段列表:
CREATE TABLE logs (ts BIGINT , line STRING)PARTITIONED BY (dt STRING,country STRING);
复制代码
当导入数据到分区表时,分区的值被显式指定:
LOAD DATA INPATH ’/user/root/path’ INTO TABLE logs PARTITION (dt='2001-01-01',country='GB’);
复制代码
实际 SQL 中,灵活指定分区将大大提高其效率,如下代码将仅会扫描 2001-01-01
下的 GB
目录。
SELECT ts , dt , line FROM logs WHERE dt=‘2001-01-01' and country='GB'
复制代码
在表或者分区中使用桶通常有两个原因:
为了让 Hive 对表进行分桶,通过 CLUSTERED BY
从句在创建表的时候指定:
CREATE TABLE bucketed users(id INT, name STRING) CLUSTERED BY (id) INTO 4 BUCKETS;
复制代码
指定表根据 id 字段进行分桶,并且分为 4 个桶 。分桶时, Hive 根据字段哈希后取余数来决定数据应该放在哪个桶,因此每个桶都是整体数据的随机抽样。
在 map-side 的关联中,两个表根据相同的宇段进行分桶,因此处理左边表的 bucket 时,可以直接从外表对应的 bucket 中提取数据进行关联操作。 map-side 关联的两个表不一定需要完全相同 bucket 数量,只要成倍数即可。
需要注意的是, Hive 并不会对数据是否满足表定义中的分桶进行校验,只有在查询时出现异常才会报错 。因此,一种更好的方式是将分桶的工作交给 Hive 来完成(设 hive.enforce.bucketing
属性为 true 即可)。
hive> CREATE TABLE empty key value store LIKE key value store;
复制代码
还可以通过 CREATE TABLE AS SELECT 的方式来创建表,示例如下:
Hive> CREATE TABLE new key value store ROW FORMAT SERDE "org.apache.Hadoop.hive.serde2.columnar.ColumnarSerDe" STORED AS RCFile AS SELECT (key % 1024) new_key, concat(key, value) key_value_pair FROM key_value_store SORT BY new_key, key_value_pair;
复制代码
修改表名的语法如下:
hive> ALTER TABLE old_table_name RENAME TO new_table_name;
复制代码
修改列名的语法如下:
ALTER TABLE table_name CHANGE (COLUMN) old_col_name new_col_name column_type [COMMENT col_comment) (FIRST|AFTER column_name)
复制代码
上述语法允许改变列名 数据类型 注释 列位 它们的任意组合 建表后如果要新增一列,则使用如下语法:
hive> ALTER TABLE pokes ADD COLUMNS (new_col INT COMMENT 'new col comment');
复制代码
DROP TABLE 语句用于删除表的数据和元数据 。对于外部表,只删除 Metastore 中的元数据,而外部数据保存不动,示例如下:
drop table my_table;
复制代码
如果只想删除表数据,保留表结构,跟 MySQL 类似,使用 TRUNCATE 语句:
TRUNCATE TABLE my_table;
复制代码
相对路径的示例如下:
hive> LOAD DATA LOCAL INPATH ’./exarnples/files/kvl.txt ’ OVERWRITE INTO TABLE pokes;
复制代码
将查询结果写入 HDFS 文件系统。
INSERT OVERWRITE TABLE tablenamel [PARTITION (partcoll=val1, partcol2=val2 ... )] select_statement1 FROM from_statement
复制代码
这是基础模式,还有多插入模式和自动分区模式,这里就不再叙述。
SELECT [ALL | DISTINCT] select_expr, select_expr, ...FROM table_reference [WHERE where_condition] [GROUP BY col_list [HAVING condition]] [ CLUSTER BY col_list | [DISTRIBUTE BY col_list] [SORT BY | ORDER BY col_list] ][LIMIT number]
复制代码
SET mapred.reduce.tasks = 1 SELECT * FROM test SORT BY amount DESC LIMIT 5
复制代码
SELECT `(ds|hr)?+.+` FROM test
复制代码
join_table:table_reference (INNER] JOIN table_factor (join_condition]| table_reference {LEFTIRIGHTjFULL} (OUTER] JOIN table_reference join_ condition| table_reference LEFT SEM JOIN table_reference join_condition| table_reference CROSS JOIN table_reference (join_condition] (as of Hive 0.10)table reference:table_factor| join_tabletable_factor:tbl_name [alias]| table_subquery alias| (table_references)join_condition:on expression
复制代码
select a.val, b.val,c.val from a join b on (a.key=b.key1) join c on(c.key = b.key2);
复制代码
select a.val,b.val,c.val from a join b on (a.key=b.key1) join c on(c.key=b.key1);
复制代码
--第一个 SQL 语句SELECT c.val, d.val FROM c LEFT OUTER JOIN d ON (c.key=d.key) WHERE a.ds='2010-07-07' AND b.ds='2010-07-07'-- 第二个 SQL 语句SELECT c.val, d.val FROM c LEFT OUTER JOIN d ON (c.key=d.key AND d.ds=’ 2009-07-07 ’ AND c.ds='2009-07-07')
复制代码
SELECT a.key, a.value FROM a WHERE a.key in (SELECT b.key FROM B); --可以被重写为: SELECT a.key, a.val FROM a LEFT SEMI JOIN b on (a.key = b.key)
复制代码
我们都知道,一个好的的 Hive SQL 和写得不好的 Hive SQL ,对底层计算和资源的使用可能相差百倍甚至千倍、万倍。
除了资源的浪费,不恰当地使用 Hive SQL 可能会运行几个小时甚至十几个小时都得不到运算结果。因此,我们深入的理解 Hive SQL 的执行过程和原理是非常有必要的。
以 group by 语句执行图解为例:
我们假定一个业务背景:分析购买 iPhone7 客户在各城市中的分布情况,即哪个城市购买得最多、哪个最少。
select city,count(order_id) as iphone7_count from orders_table where day='201901010' and cat_name='iphone7' group by city;
复制代码
底层 MapReduce 执行过程:
Hive SQL 的 group by 语句涉及数据的重新分发和分布,因此其执行过程完整地包含了 MapReduce 任务的执行过程。
( 1 )输入分片
group by 语句的输入文件依然为 day=20170101 的分区文件,其输入分片过程和个数同 select 语句,也是被分为大小分别为: 128MB 、128MB、44MB 三个分片文件。
( 2 ) Map 阶段
Hadoop 集群同样启动三个 Map 任务,处理对应的三个分片文件;每个 map 任务处理其对应分片文件中的每行,检查其商品类目是否为 iPhone7 ,如果是,则输出形如<city,1> 的键值对,因为需要按照 city 对订单数目进行统计(注意和 select 语句的不同)。
( 3 ) Combiner 阶段
( 4 ) Shuffle 阶段
完整的 shuffle 包括分区(partition),排序(sort)和分隔(spill)、复制(copy)、合并(merge)等过程。
( 5 )Reduce 阶段
调用 reduce 函数,每个 reduce 任务的输出存到本地文件中
( 6 )输出文件
hadoop 合并 Reduce Task 任务的输出文件到输出目录
我们介绍了 Hive SQL 的执行原理。当然了,要知其然,并要知其所以然,理解 Hive 的执行原理是写高效 SQL 的前提和基础,也是掌握 Hive SQL 优化技巧的根本,接下来我们就要进入 Hive 优化实践的环节啦。
领取专属 10元无门槛券
私享最新 技术干货