作者介绍:简历上没有一个精通的运维工程师,下面的思维导图也是预计更新的内容和当前进度(不定时更新)。
中间件,我给它的定义就是为了实现某系业务功能依赖的软件,包括如下部分:
Web服务器
代理服务器
ZooKeeper
Kafka
RabbitMQ
Hadoop HDFS(本章节)
在HDFS中,文件通常被设计为"一次写入,多次读取"(Write Once, Read Many, WORM)。因此,HDFS本身不支持对已有文件进行修改,包括在文件末尾追加数据。但是,从Hadoop 0.20.205版本开始,HDFS支持追加写入(append)功能,不过默认情况下是关闭的。在较新的版本中,追加写入功能是开启的。
我们这里并没讲解过多的文件上传下载,主要是因为这个hdfs很多时候都是业务负责写入和删除,而追加写入又是一种比较特殊的文件,比如对某些特殊的数据进行备份:MySQL 的二进制日志 (Binlog) 。
在我们使用HDFS的命令进行上传的时候,是所有的文件(块)都上传成功成功才会返回成功,但是如果我们是使用sdk来上传,则略有不同,本小节我们使用Python来模拟追加(Append)模式上传。
以下代码使用豆包生成
import os
import time
import pyarrow.fs as fs
from datetime import datetime
# 配置参数
HDFS_HOST = '192.168.31.165'
HDFS_PORT = 9000
LOCAL_FILE = '/var/log/messages'
HDFS_FILE = '/test1/messages1'
CHECK_INTERVAL = 1
IDLE_TIMEOUT = 60 # 60秒未写入则关闭连接
class HDFSLogSyncer:
def __init__(self):
self.hdfs_client = None
self.last_size = 0
self.last_inode = None
self.bytes_written = 0
self.last_write_time = time.time()
self.connected = False
def _init_hdfs_client(self):
"""初始化HDFS连接"""
if not self.connected:
try:
os.environ["HADOOP_HOME"] = "/opt/hadoop"
os.environ["HADOOP_CLASSPATH"] = os.popen("hadoop classpath").read().strip()
os.environ["LD_LIBRARY_PATH"] = "/opt/hadoop/lib/native"
self.hdfs_client = fs.HadoopFileSystem(HDFS_HOST, HDFS_PORT)
self.connected = True
print(f"HDFS连接成功 (hdfs://{HDFS_HOST}:{HDFS_PORT})")
except Exception as e:
print(f"HDFS连接失败: {str(e)}")
raise
def _close_hdfs_connection(self):
"""关闭HDFS连接"""
if self.connected and self.hdfs_client:
try:
self.hdfs_client = None
self.connected = False
print("HDFS连接已关闭")
except Exception as e:
print(f"关闭连接出错: {str(e)}")
def _get_file_stat(self):
"""获取本地文件状态"""
try:
stat_info = os.stat(LOCAL_FILE)
return {
'size': stat_info.st_size,
'inode': stat_info.st_ino
}
except FileNotFoundError:
print(f"本地文件 {LOCAL_FILE} 不存在,等待创建...")
return None
except Exception as e:
print(f"获取文件状态失败: {str(e)}")
return None
def _hdfs_file_exists(self):
"""检查HDFS文件是否存在"""
try:
info = self.hdfs_client.get_file_info(HDFS_FILE)
return info.type != fs.FileType.NotFound
except Exception as e:
print(f"检查HDFS文件失败: {str(e)}")
return False
def _read_hdfs_file(self):
"""读取HDFS文件内容"""
try:
with self.hdfs_client.open_input_stream(HDFS_FILE) as f:
return f.read()
except Exception as e:
print(f"读取HDFS文件失败: {str(e)}")
return b""
def upload_historical_data(self):
"""上传历史文件内容"""
print("\n===== 开始上传历史文件 =====")
self._init_hdfs_client()
file_stat = self._get_file_stat()
if not file_stat:
return False
self.last_size = file_stat['size']
self.last_inode = file_stat['inode']
hdfs_exists = self._hdfs_file_exists()
try:
# 读取HDFS旧内容(兼容旧版本pyarrow)
old_content = self._read_hdfs_file() if hdfs_exists else b""
hdfs_stream = self.hdfs_client.open_output_stream(HDFS_FILE)
# 写入旧内容+新内容
hdfs_stream.write(old_content)
with open(LOCAL_FILE, 'rb') as f:
content = f.read()
if content:
hdfs_stream.write(content)
self.bytes_written = len(old_content) + len(content)
self.last_write_time = time.time()
print(f"历史文件上传完成 | 大小: {len(content)} 字节 | 累计: {self.bytes_written} 字节")
else:
print("历史文件为空,无需上传")
hdfs_stream.close()
return True
except Exception as e:
print(f"历史文件上传失败: {str(e)}")
return False
def _append_new_content(self, current_size):
"""追加新内容到HDFS"""
offset = self.last_size
length = current_size - offset
print(f"\n检测到新内容 | 偏移量: {offset} | 长度: {length} 字节")
try:
# 读取本地新增内容
with open(LOCAL_FILE, 'rb') as f:
f.seek(offset)
new_content = f.read(length)
# 读取HDFS旧内容并合并
old_content = self._read_hdfs_file()
with self.hdfs_client.open_output_stream(HDFS_FILE) as hdfs_stream:
hdfs_stream.write(old_content + new_content)
self.bytes_written += length
self.last_write_time = time.time()
timestamp = datetime.now().strftime('%H:%M:%S.%f')[:-3]
print(f"[{timestamp}] 新内容追加完成 | 本次: {length} 字节 | 累计: {self.bytes_written} 字节")
except Exception as e:
print(f"新内容追加失败: {str(e)}")
def listen_and_append(self):
"""监听新内容,超时关闭连接"""
print("\n===== 开始监听新内容 =====")
print(f"监听文件: {LOCAL_FILE} | 超时设置: {IDLE_TIMEOUT}秒")
while True:
# 超时检查:超过60秒无写入则关闭连接
idle_time = time.time() - self.last_write_time
if idle_time > IDLE_TIMEOUT:
print(f"\n已{idle_time:.1f}秒无新内容,触发超时")
self._close_hdfs_connection()
print("程序因超时自动退出")
return
file_stat = self._get_file_stat()
if not file_stat:
time.sleep(CHECK_INTERVAL)
continue
# 处理文件轮转
if file_stat['inode'] != self.last_inode:
print(f"\n检测到文件轮转,重新初始化监听...")
self.last_inode = file_stat['inode']
self.last_size = 0
# 追加新内容
if file_stat['size'] > self.last_size:
self._append_new_content(file_stat['size'])
self.last_size = file_stat['size']
time.sleep(CHECK_INTERVAL)
# 新增run方法(之前缺失的部分)
def run(self):
"""主程序入口"""
if not self.upload_historical_data():
print("===== 历史文件上传失败,退出程序 =====")
return
self.listen_and_append()
if __name__ == "__main__":
syncer = HDFSLogSyncer()
syncer.run()
执行输出过程
[root@localhost ~]# python app.py
===== 开始上传历史文件 =====
25/07/27 00:04:11 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
HDFS连接成功 (hdfs://192.168.31.165:9000)
历史文件上传完成 | 大小: 960 字节 | 累计: 1839919 字节
===== 开始监听新内容 =====
监听文件: /var/log/messages | 超时设置: 60秒
检测到新内容 | 偏移量: 960 | 长度: 31 字节
[00:04:18.048] 新内容追加完成 | 本次: 31 字节 | 累计: 1839950 字节
检测到新内容 | 偏移量: 991 | 长度: 31 字节
[00:04:19.515] 新内容追加完成 | 本次: 31 字节 | 累计: 1839981 字节
已60.0秒无新内容,触发超时
HDFS连接已关闭
程序因超时自动退出
当然这里的只演示了很简单的逻辑,实际的环境需要考虑的因素会更多,当然这个一般而言这个是业务方考虑的,我这里通过这个演示,主要是为了让大家更好的理解这个逻辑。