1 预先配置
在hive配置文件:%HIVE_HOME%/conf/hive-site.xml添加
<!-- 禁用 impersonation -->
<property>
<name>hive.server2.enable.doAs</name>
<value>false</value>
</property>
在 Hadoop 的配置文件中%HADOOP_HOME%/etc/hadoo/下的:core-site.xml 和 hdfs-site.xml添加
<property>
<name>hadoop.proxyuser.root.groups</name>
<value>*</value>
</property>
<property>
<name>hadoop.proxyuser.root.hosts</name>
<value>*</value>
</property>
确保没有设置限制 root 用户的权限
修改访问数据库表person的权限
#hdfs dfs -chmod -R 775 /user/hive/warehouse/demo.db/person
由于Hive是数据仓库,而不是数据库,所以一般不支持增删改查,这里仅介绍如何通过Java来向Hive插入,查询数据。 2 用Java来开发Hive应用
pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.jerry</groupId>
<artifactId>hive</artifactId>
<version>0.0.1-SNAPSHOT</version>
<description>Java How to connect Hivi</description>
<dependencies>
<!-- Hive JDBC Driver -->
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-jdbc</artifactId>
<version>3.1.2</version>
</dependency>
<!-- Hadoop Common -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>3.2.2</version>
</dependency>
<!-- Hadoop Client -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.2.2</version>
</dependency>
</dependencies>
</project>
Java文件
package com.jerry;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.HashMap;
import java.util.Map;
public class HiveClient {
private static final String DRIVER_CLASS = "org.apache.hive.jdbc.HiveDriver";
private static final String CONNECTION_URL = "jdbc:hive2://192.168.31.184:10000/demo";
private static PreparedStatement preparedstatement;
private static Statement statement;
private static ResultSet resultSet = null;
//链接
private Connection getConnection() throws SQLException {
try {
Class.forName(DRIVER_CLASS);
Connection con = DriverManager.getConnection(CONNECTION_URL);
statement = con.createStatement();
return con;
} catch (ClassNotFoundException e) {
e.printStackTrace();
throw new SQLException(e.getMessage());
}
}
//断开链接
public void disconnect(Connection con) throws SQLException {
// Close resources
resultSet.close();
statement.close();
con.close();
}
//执行查询
public void query(String query) throws SQLException {
// Execute a query
resultSet = statement.executeQuery(query);
}
//带条件执行查询
public void query(Connection con,String query,Map<String, String> condition) throws SQLException {
String where = " where ";
int i = 0;
int length = condition.size();
String[] valuearray= new String[length];
for (String key : condition.keySet()) {
String value = condition.get(key);
where = where+key+" = ? AND ";
valuearray[i] = value;
i++;
}
where = where + "1=1";
query = query + where;
PreparedStatement preparedStatement = con.prepareStatement(query);
for(int j=0;j<length;j++) {
preparedStatement.setString(j+1, valuearray[j]);
}
resultSet = preparedStatement.executeQuery();;
}
//打印查询记录
public void printQueryResult(ResultSet resultSet) throws SQLException {
//获取 ResultSet 的元数据
ResultSetMetaData metaData = resultSet.getMetaData();
// 获取列数
int columnCount = metaData.getColumnCount();
while (resultSet.next()) {
for (int i=1;i<=columnCount;i++) {
System.out.print(resultSet.getString(i)+",");
}
System.out.println("");
}
}
//查询并且打印数据
public void queryAndPrint(String query) throws SQLException {
query(query);
printQueryResult(resultSet);
}
//查询并且打印数据
public void queryAndPrint(Connection con,String query,Map<String, String> condition) throws SQLException {
query(con,query,condition);
printQueryResult(resultSet);
}
//添加数据
public void addDataToHiveTable(Connection con,String tableName,String[] newValue,String like,String map) {
try {
String insertSql = "INSERT INTO person SELECT ?,?,?,"+like+","+map;
System.out.println(like);
preparedstatement = con.prepareStatement(insertSql);
preparedstatement.setInt(1, Integer.parseInt(newValue[0]));
preparedstatement.setString(2, newValue[1]);
preparedstatement.setInt(3, Integer.parseInt(newValue[2]));
preparedstatement.executeUpdate();
} catch (SQLException e) {
e.printStackTrace();
}
}
//将文件中的数据加载到表中
public void loadDataForLocal(String tableName,String path) throws SQLException {
String query = "LOAD DATA LOCAL INPATH '"+path+"' INTO TABLE "+tableName;
statement.execute(query);
}
//清空数据表
public void truncateTable(Connection con,String tableName) throws SQLException {
String query = "truncate table "+tableName;
con.setAutoCommit(true); // 确保自动提交
Statement statement = con.createStatement();
statement.execute(query);
}
public static void main(String[] args) throws SQLException {
HiveClient hive = new HiveClient();
String tableName = "person";
String like = "array('basketball', 'music', 'dance')";
String map = "map('address','xxxx')";
String[] newAddValue = {"10","elite0","50"};
Connection con = hive.getConnection();
String query = "SELECT * FROM "+tableName;
Map<String, String> condition = new HashMap<String, String>();
condition.put("name","elite0");
condition.put("age","50");
String inpath = "/home/jerry/hive/person";
try {
System.out.println("全表查询:");
hive.queryAndPrint(query);
hive.addDataToHiveTable(con,tableName,newAddValue,like,map);
System.out.println("插入数据后全表查询:");
hive.queryAndPrint(query);
System.out.println("条件查询:");
hive.queryAndPrint(con,query,condition);
hive.truncateTable(con,tableName);
System.out.println("清空表:");
hive.queryAndPrint(query);
hive.loadDataForLocal(tableName,inpath);
System.out.println("从文件中加载:");
hive.queryAndPrint(query);
hive.disconnect(con);
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
运行结果
全表查询:
1,elite0,10,["basketball","music","dance"],{"adderss":"xx"},
2,elite1,20,["basketball","music","dance"],{"adderss":"xx"},
3,elite2,10,["basketball","music","dance"],{"adderss":"xx"},
4,elite3,20,["basketball","music","dance"],{"adderss":"xx"},
5,elite4,10,["basketball","music","dance"],{"adderss":"xx"},
6,elite5,20,["basketball","music","dance"],{"adderss":"xx"},
array('basketball', 'music', 'dance')
插入数据后全表查询:
10,elite0,50,["basketball","music","dance"],{"address":"xxxx"},
1,elite0,10,["basketball","music","dance"],{"adderss":"xx"},
2,elite1,20,["basketball","music","dance"],{"adderss":"xx"},
3,elite2,10,["basketball","music","dance"],{"adderss":"xx"},
4,elite3,20,["basketball","music","dance"],{"adderss":"xx"},
5,elite4,10,["basketball","music","dance"],{"adderss":"xx"},
6,elite5,20,["basketball","music","dance"],{"adderss":"xx"},
条件查询:
10,elite0,50,["basketball","music","dance"],{"address":"xxxx"},
清空表:
从文件中加载:
1,elite0,10,["basketball","music","dance"],{"adderss":"xx"},
2,elite1,20,["basketball","music","dance"],{"adderss":"xx"},
3,elite2,10,["basketball","music","dance"],{"adderss":"xx"},
4,elite3,20,["basketball","music","dance"],{"adderss":"xx"},
5,elite4,10,["basketball","music","dance"],{"adderss":"xx"},
6,elite5,20,["basketball","music","dance"],{"adderss":"xx"},
3 用Python来开发Hive应用
pip3
pip install sasl
pip install thrift
pip install thrift-sasl
pip install PyHive
Python
import pandas as pd
from pyhive import hive
from sqlalchemy import create_engine
from pyhive import hive
class Hive:
def __init__(self):
self.database= 'demo'
self.host = '192.168.31.184'
self.port = '10000'
def getconnect(self):
conn = hive.Connection(host=self.host, port=self.port,database=self.database)
return conn;
def getEngine(self):
# 创建 Hive 数据库连接
hive_uri = f"hive://"+self.host+":"+self.port+"/"+self.database
return create_engine(hive_uri)
def disconnect(self,engine,conn):
engine.dispose()
conn.close()
#执行查询
def query(self,sql,engine,condition=None):
try:
if condition is None:
# 执行 SQL 查询
df = pd.read_sql(sql, engine)
print(df)
else:
values = []
where = " where "
for key in condition:
where = where+key+" = %s and "
values.append(condition[key])
where = where+"1=1"
sql = sql + where
params = tuple(values)
df = pd.read_sql(sql, engine, params=params)
print(df)
except Exception as e:
print("Error occurred:", e)
#添加数据
def addDataToHiveTable(self,conn,tableName,data):
like_array = f"array({', '.join(map(lambda x: f'\'{x}\'', data['like']))})" # 使用单引号包裹字符串
address_map = f"map('{list(data['address'].keys())[0]}', '{list(data['address'].values())[0]}')" # 创建 MAP 格式
# 创建游标
cursor = conn.cursor()
insertSql = "INSERT INTO person SELECT %s,%s,%s,"+like_array+","+address_map
# 执行插入操作
try:
cursor.execute(insertSql, (data['id'], data['name'], data['age']))
except Exception as e:
print(f"Error inserting data: {e}")
conn.commit()
cursor.close()
#将文件中的数据加载到表中
def loadDataForLocal(self,conn,tableName,path):
cursor = conn.cursor()
query = "LOAD DATA LOCAL INPATH '"+path+"' INTO TABLE "+tableName
cursor.execute(query)
conn.commit()
cursor.close()
#清空数据表
def truncateTable(self,conn,tableName):
cursor = conn.cursor()
query = "truncate table "+tableName;
#con.setAutoCommit(true) #确保自动提交
cursor.execute(query)
conn.commit()
cursor.close()
if __name__ == "__main__":
sql = "SELECT * FROM person"
condition={"name":"elite1","age":"20"}
# 准备要插入的数据
data = {
'id': "50",
'name': "Jerry",
'age': 50, # 确保这里是整数
'like': ["basketball", "music", "dance"],
'address': {"address": "xx"}
}
tableName = "person"
path = "/home/jerry/hive/person"
myhive = Hive()
print("建立连接")
conn = myhive.getconnect()
engine = myhive.getEngine()
print("全表查询")
myhive.query(sql,engine)
print("条件查询")
myhive.query(sql,engine,condition)
print("加数据进入表")
myhive.addDataToHiveTable(conn,tableName,data)
myhive.query(sql,engine)
print("清空表中所有数据")
myhive.truncateTable(conn,tableName)
print("从文件中导入数据")
myhive.loadDataForLocal(conn,tableName,path)
myhive.query(sql,engine)
print("断开连接")
myhive.disconnect(engine,conn)
运行结果
建立连接
全表查询
id name age likes address
0 1 elite0 10 ["basketball","music","dance"] {"adderss":"xx"}
1 2 elite1 20 ["basketball","music","dance"] {"adderss":"xx"}
2 3 elite2 10 ["basketball","music","dance"] {"adderss":"xx"}
3 4 elite3 20 ["basketball","music","dance"] {"adderss":"xx"}
4 5 elite4 10 ["basketball","music","dance"] {"adderss":"xx"}
5 6 elite5 20 ["basketball","music","dance"] {"adderss":"xx"}
条件查询
id name age likes address
0 2 elite1 20 ["basketball","music","dance"] {"adderss":"xx"}
加数据进入表
id name age likes address
0 50 Jerry 50 ["basketball","music","dance"] {"address":"xx"}
1 1 elite0 10 ["basketball","music","dance"] {"adderss":"xx"}
2 2 elite1 20 ["basketball","music","dance"] {"adderss":"xx"}
3 3 elite2 10 ["basketball","music","dance"] {"adderss":"xx"}
4 4 elite3 20 ["basketball","music","dance"] {"adderss":"xx"}
5 5 elite4 10 ["basketball","music","dance"] {"adderss":"xx"}
6 6 elite5 20 ["basketball","music","dance"] {"adderss":"xx"}
清空表中所有数据
从文件中导入数据
id name age likes address
0 1 elite0 10 ["basketball","music","dance"] {"adderss":"xx"}
1 2 elite1 20 ["basketball","music","dance"] {"adderss":"xx"}
2 3 elite2 10 ["basketball","music","dance"] {"adderss":"xx"}
3 4 elite3 20 ["basketball","music","dance"] {"adderss":"xx"}
4 5 elite4 10 ["basketball","music","dance"] {"adderss":"xx"}
5 6 elite5 20 ["basketball","music","dance"] {"adderss":"xx"}
断开连接