Apache Hive 作为一个强大的数据仓库工具,其核心价值在于对存储在分布式系统(如 HDFS)中的大规模数据进行查询和分析。但在进行分析之前,首先需要有效地将数据加载到 Hive 表中或在表之间进行数据流转。Hive 提供了多种数据导入和操纵的方式,主要通过其数据操纵语言 (DML) 来实现。本文将详细介绍这些常用的数据加载与写入方法,重点围绕 LOAD DATA
命令和功能丰富的 INSERT
语句展开。
LOAD DATA
命令:从文件系统直接加载LOAD DATA
是 Hive 中最直接的数据导入命令,用于将文件系统(本地文件系统或 HDFS)中的数据文件加载到 Hive 表中。
语法核心:
LOAD DATA [LOCAL] INPATH 'filepath' [OVERWRITE] INTO TABLE tablename [PARTITION (partcol1=val1, partcol2=val2 ...)];
LOCAL
: 若指定,从客户端本地复制文件;否则,从HDFS 移动文件。INPATH 'filepath'
: 源数据文件或目录的路径。OVERWRITE
: 若指定,覆盖目标表或分区数据;否则,追加(行为依赖文件格式)。INTO TABLE tablename
: 目标 Hive 表。PARTITION (partcol1=val1, ...)
: 目标分区(若表为分区表)。代码案例:
CREATE TABLE employees_load (id INT, name STRING, department STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',';
LOAD DATA LOCAL INPATH '/path/to/local/employees.csv' OVERWRITE INTO TABLE employees_load;
CREATE TABLE web_logs_load (ip STRING, url STRING) PARTITIONED BY (dt STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t';
ALTER TABLE web_logs_load ADD IF NOT EXISTS PARTITION (dt='2023-11-20');
LOAD DATA INPATH '/hdfs/path/to/logs/2023-11-20/' INTO TABLE web_logs_load PARTITION (dt='2023-11-20');
INSERT
语句:多样化的数据写入与转换INSERT
语句是 Hive 中更为灵活和强大的数据写入机制,它不仅能加载数据,还能在写入过程中进行数据转换、聚合和筛选。
INSERT ... VALUES
:直接插入少量行虽然不常用于大规模数据,但 Hive 支持通过 VALUES
子句直接插入少量记录。
语法:
INSERT INTO TABLE tablename [PARTITION (partcol1=val1, ...)] VALUES (value1_row1, value2_row1, ...), (value1_row2, value2_row2, ...), ...;
案例:向配置表中插入几条记录
CREATE TABLE app_config (config_key STRING, config_value STRING, updated_by STRING);
INSERT INTO TABLE app_config VALUES ('timeout', '300', 'admin'), ('retry_count', '3', 'admin');
INSERT ... SELECT
:从查询结果加载数据 (核心用法)这是最常用的 INSERT
形式,将一个 SELECT
查询的结果写入到目标表中。可以配合 OVERWRITE
或 INTO
(追加)。
语法:
INSERT OVERWRITE TABLE target_table [PARTITION (part_col1=val1, ...)] [IF NOT EXISTS]
SELECT col1, col2, ... FROM source_table WHERE condition;
INSERT INTO TABLE target_table [PARTITION (part_col1=val1, ...)]
SELECT col1, col2, ... FROM source_table WHERE condition;
SELECT
语句的最后几列,并在 PARTITION
子句中只列出分区列名,实现动态分区加载。SET hive.exec.dynamic.partition=true;
SET hive.exec.dynamic.partition.mode=nonstrict;
INSERT OVERWRITE TABLE target_partitioned_table PARTITION (dt, country)
SELECT ..., date_col AS dt, country_col AS country FROM source_table;
案例:
employees_load
筛选数据覆盖写入 active_employees
CREATE TABLE active_employees (id INT, name STRING, department STRING);
INSERT OVERWRITE TABLE active_employees
SELECT id, name, department FROM employees_load WHERE department != 'HR';
CREATE TABLE daily_logs_partitioned (message STRING, severity STRING) PARTITIONED BY (log_date DATE, log_hour INT);
SET hive.exec.dynamic.partition=true;
SET hive.exec.dynamic.partition.mode=nonstrict;
-- 假设 raw_logs 表有 message, severity, event_time (TIMESTAMP) 列
INSERT OVERWRITE TABLE daily_logs_partitioned PARTITION (log_date, log_hour)
SELECT message, severity, to_date(event_time) AS log_date, hour(event_time) AS log_hour
FROM raw_logs;
Hive 允许从一个源查询 (FROM ... SELECT ...
) 的结果同时插入到多个表或分区中,这能有效减少对源表的扫描次数。
语法:
FROM source_table
INSERT OVERWRITE TABLE target_table1 [PARTITION (part_spec1)]
SELECT_clause1 [WHERE where_clause1]
INSERT OVERWRITE TABLE target_table2 [PARTITION (part_spec2)]
SELECT_clause2 [WHERE where_clause2]
...;
INSERT
子句可以有自己独立的 SELECT
列列表和 WHERE
条件,但它们都作用于同一个 FROM source_table
。案例:根据用户类型将用户数据分发到不同表
CREATE TABLE premium_users (user_id STRING, name STRING, join_date DATE);
CREATE TABLE regular_users (user_id STRING, name STRING, email STRING);
-- 假设 all_users 表有 user_id, name, user_type, join_date, email 列
FROM all_users
INSERT OVERWRITE TABLE premium_users
SELECT user_id, name, join_date WHERE user_type = 'PREMIUM'
INSERT OVERWRITE TABLE regular_users
SELECT user_id, name, email WHERE user_type = 'REGULAR';
INSERT OVERWRITE DIRECTORY
:将查询结果导出到 HDFS 目录此功能允许将查询结果直接写入 HDFS 文件系统中的指定目录,而不是 Hive 表。这对于数据导出、与其他系统集成或生成报告文件非常有用。
语法:
INSERT OVERWRITE [LOCAL] DIRECTORY 'directory_path'
[ROW FORMAT row_format_for_output_file]
[STORED AS file_format_for_output_file] -- STORED AS TEXTFILE is default if not specified
SELECT_statement;
LOCAL
: 如果指定,目录路径是本地文件系统;否则是 HDFS 路径。ROW FORMAT
: 可以指定输出文件的行格式,如字段分隔符。STORED AS
: 指定输出文件的存储格式 (如 TEXTFILE, ORC, PARQUET)。案例:将高薪员工信息导出到 HDFS 目录,以 CSV 格式存储
INSERT OVERWRITE DIRECTORY '/output/high_salary_employees_csv'
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
STORED AS TEXTFILE -- 可以省略,TEXTFILE 是默认的
SELECT id, name, salary
FROM employees_load
WHERE salary >= 100000;
导出到本地文件系统:
INSERT OVERWRITE LOCAL DIRECTORY '/tmp/local_employee_report'
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
SELECT name, department
FROM employees_load;
CREATE TABLE AS SELECT (CTAS)
:创建并加载CTAS 一步完成表的创建和数据填充,非常便捷。
语法核心:
CREATE TABLE [IF NOT EXISTS] new_table_name
[COMMENT table_comment]
[ROW FORMAT row_format] -- 通常不需要,除非想覆盖默认的,但CTAS主要用于数据和结构推断
[STORED AS file_format]
[LOCATION hdfs_path]
[TBLPROPERTIES (...)]
AS
select_statement;
代码案例:创建汇总表
CREATE TABLE department_salary_summary
STORED AS PARQUET
AS
SELECT department, AVG(salary) as avg_salary, COUNT(*) as num_employees
FROM employees_load
GROUP BY department;
INSERT
,驾驭 Hive 数据流Hive 的 LOAD DATA
和各种形式的 INSERT
语句共同构成了其强大的数据加载和操纵能力。从简单的文件加载到复杂的查询结果转换、多目标分发乃至数据导出,INSERT
语句几乎无所不能。理解并熟练运用这些命令,结合对静态与动态分区的认知,是每一位 Hive 用户进行高效数据处理的基石。
log_levels
,包含 level_id INT
和 level_name STRING
两列。
要求:使用 INSERT ... VALUES
语句向 log_levels
表中插入以下三条记录:
raw_sales_data
包含 product_id STRING, sales_amount DECIMAL(10,2), sale_date STRING
(格式 ‘YYYY-MM-DD’)。你希望将2023年10月份的销售数据加载到一个按 month_partition STRING
(格式 ‘YYYY-MM’) 分区的表 october_sales
中,并覆盖该分区现有数据。
要求:编写创建 october_sales
表(包含 product_id
, sales_amount
列和分区列)的 HQL,以及加载数据的 HQL。
all_orders
包含 order_id STRING, customer_id STRING, order_status STRING, order_value DECIMAL(12,2)
。你希望根据 order_status
将数据分别写入到两个新表:completed_orders
(状态为 ‘COMPLETED’) 和 pending_orders
(状态为 ‘PENDING’ 或 ‘PROCESSING’)。
要求:假设 completed_orders
和 pending_orders
表已创建且只包含 order_id
和 order_value
列。使用一次 FROM
子句和多表插入完成数据分发,覆盖目标表数据。
customer_feedback
包含 customer_id STRING, feedback_text STRING, rating INT
。你希望将所有评分低于3星(即 rating < 3)的反馈导出到 HDFS 目录 /user/reports/low_rating_feedback
中,以制表符分隔的文本文件形式存储。
要求:编写相应的 HQL 导出语句。
event_logs_source
包含 event_id STRING, user_id STRING, event_type STRING, event_timestamp TIMESTAMP
。你希望将这些数据加载到一个新的分区表 categorized_events
中,该表按 event_day DATE
(从 event_timestamp
提取) 和 event_category STRING
(假设根据 event_type
的某种规则映射,例如 ‘TYPE_A’ -> ‘Category1’, ‘TYPE_B’ -> ‘Category2’, 其他 -> ‘OtherCategory’) 进行双重动态分区。
要求:编写创建 categorized_events
表(包含 event_id
, user_id
列和两个分区列)的 HQL,以及使用动态分区加载数据的 HQL。
CREATE TABLE log_levels (
level_id INT,
level_name STRING
);
INSERT INTO TABLE log_levels VALUES (1, 'INFO'), (2, 'WARN'), (3, 'ERROR');
CREATE TABLE october_sales (
product_id STRING,
sales_amount DECIMAL(10,2)
)
PARTITIONED BY (month_partition STRING)
STORED AS ORC;
-- 假设要加载到 '2023-10' 分区
ALTER TABLE october_sales ADD IF NOT EXISTS PARTITION (month_partition='2023-10');
INSERT OVERWRITE TABLE october_sales PARTITION (month_partition='2023-10')
SELECT
product_id,
sales_amount
FROM
raw_sales_data
WHERE
SUBSTR(sale_date, 1, 7) = '2023-10';
-- 假设表已创建:
-- CREATE TABLE completed_orders (order_id STRING, order_value DECIMAL(12,2));
-- CREATE TABLE pending_orders (order_id STRING, order_value DECIMAL(12,2));
FROM all_orders
INSERT OVERWRITE TABLE completed_orders
SELECT order_id, order_value WHERE order_status = 'COMPLETED'
INSERT OVERWRITE TABLE pending_orders
SELECT order_id, order_value WHERE order_status IN ('PENDING', 'PROCESSING');
INSERT OVERWRITE DIRECTORY '/user/reports/low_rating_feedback'
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
STORED AS TEXTFILE
SELECT
customer_id,
feedback_text,
rating
FROM
customer_feedback
WHERE
rating < 3;
CREATE TABLE categorized_events (
event_id STRING,
user_id STRING
)
PARTITIONED BY (event_day DATE, event_category STRING)
STORED AS PARQUET;
SET hive.exec.dynamic.partition=true;
SET hive.exec.dynamic.partition.mode=nonstrict;
-- 假设最大动态分区数也已适当设置,例如 SET hive.exec.max.dynamic.partitions.pernode=1000;
INSERT OVERWRITE TABLE categorized_events PARTITION (event_day, event_category)
SELECT
event_id,
user_id,
to_date(event_timestamp) AS event_day_val, -- 对应第一个动态分区列 event_day
CASE
WHEN event_type = 'TYPE_A' THEN 'Category1'
WHEN event_type = 'TYPE_B' THEN 'Category2'
ELSE 'OtherCategory'
END AS event_category_val -- 对应第二个动态分区列 event_category
FROM
event_logs_source;