首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >Hadoop HDFS-追加(Append)写入模式

Hadoop HDFS-追加(Append)写入模式

作者头像
运维小路
发布2025-07-31 14:39:08
发布2025-07-31 14:39:08
6600
代码可运行
举报
文章被收录于专栏:运维小路运维小路
运行总次数:0
代码可运行

作者介绍:简历上没有一个精通的运维工程师,下面的思维导图也是预计更新的内容和当前进度(不定时更新)。

中间件,我给它的定义就是为了实现某系业务功能依赖的软件,包括如下部分:

Web服务器

代理服务器

ZooKeeper

Kafka

RabbitMQ

Hadoop HDFS(本章节)

在HDFS中,文件通常被设计为"一次写入,多次读取"(Write Once, Read Many, WORM)。因此,HDFS本身不支持对已有文件进行修改,包括在文件末尾追加数据。但是,从Hadoop 0.20.205版本开始,HDFS支持追加写入(append)功能,不过默认情况下是关闭的。在较新的版本中,追加写入功能是开启的。

我们这里并没讲解过多的文件上传下载,主要是因为这个hdfs很多时候都是业务负责写入和删除,而追加写入又是一种比较特殊的文件,比如对某些特殊的数据进行备份:MySQL 的二进制日志 (Binlog) 。

在我们使用HDFS的命令进行上传的时候,是所有的文件(块)都上传成功成功才会返回成功,但是如果我们是使用sdk来上传,则略有不同,本小节我们使用Python来模拟追加(Append)模式上传。

以下代码使用豆包生成

代码语言:javascript
代码运行次数:0
运行
复制
import os
import time
import pyarrow.fs as fs
from datetime import datetime
# 配置参数
HDFS_HOST = '192.168.31.165'
HDFS_PORT = 9000
LOCAL_FILE = '/var/log/messages'
HDFS_FILE = '/test1/messages1'
CHECK_INTERVAL = 1
IDLE_TIMEOUT = 60  # 60秒未写入则关闭连接
class HDFSLogSyncer:
    def __init__(self):
        self.hdfs_client = None
        self.last_size = 0
        self.last_inode = None
        self.bytes_written = 0
        self.last_write_time = time.time()
        self.connected = False
    def _init_hdfs_client(self):
        """初始化HDFS连接"""
        if not self.connected:
            try:
                os.environ["HADOOP_HOME"] = "/opt/hadoop"
                os.environ["HADOOP_CLASSPATH"] = os.popen("hadoop classpath").read().strip()
                os.environ["LD_LIBRARY_PATH"] = "/opt/hadoop/lib/native"

                self.hdfs_client = fs.HadoopFileSystem(HDFS_HOST, HDFS_PORT)
                self.connected = True
                print(f"HDFS连接成功 (hdfs://{HDFS_HOST}:{HDFS_PORT})")
            except Exception as e:
                print(f"HDFS连接失败: {str(e)}")
                raise
    def _close_hdfs_connection(self):
        """关闭HDFS连接"""
        if self.connected and self.hdfs_client:
            try:
                self.hdfs_client = None
                self.connected = False
                print("HDFS连接已关闭")
            except Exception as e:
                print(f"关闭连接出错: {str(e)}")
    def _get_file_stat(self):
        """获取本地文件状态"""
        try:
            stat_info = os.stat(LOCAL_FILE)
            return {
                'size': stat_info.st_size,
                'inode': stat_info.st_ino
            }
        except FileNotFoundError:
            print(f"本地文件 {LOCAL_FILE} 不存在,等待创建...")
            return None
        except Exception as e:
            print(f"获取文件状态失败: {str(e)}")
            return None
    def _hdfs_file_exists(self):
        """检查HDFS文件是否存在"""
        try:
            info = self.hdfs_client.get_file_info(HDFS_FILE)
            return info.type != fs.FileType.NotFound
        except Exception as e:
            print(f"检查HDFS文件失败: {str(e)}")
            return False
    def _read_hdfs_file(self):
        """读取HDFS文件内容"""
        try:
            with self.hdfs_client.open_input_stream(HDFS_FILE) as f:
                return f.read()
        except Exception as e:
            print(f"读取HDFS文件失败: {str(e)}")
            return b""
    def upload_historical_data(self):
        """上传历史文件内容"""
        print("\n===== 开始上传历史文件 =====")
        self._init_hdfs_client()
        file_stat = self._get_file_stat()
        if not file_stat:
            return False
        self.last_size = file_stat['size']
        self.last_inode = file_stat['inode']
        hdfs_exists = self._hdfs_file_exists()
        try:
            # 读取HDFS旧内容(兼容旧版本pyarrow)
            old_content = self._read_hdfs_file() if hdfs_exists else b""
            hdfs_stream = self.hdfs_client.open_output_stream(HDFS_FILE)
            # 写入旧内容+新内容
            hdfs_stream.write(old_content)
            with open(LOCAL_FILE, 'rb') as f:
                content = f.read()
                if content:
                    hdfs_stream.write(content)
                    self.bytes_written = len(old_content) + len(content)
                    self.last_write_time = time.time()
                    print(f"历史文件上传完成 | 大小: {len(content)} 字节 | 累计: {self.bytes_written} 字节")
                else:
                    print("历史文件为空,无需上传")
            hdfs_stream.close()
            return True
        except Exception as e:
            print(f"历史文件上传失败: {str(e)}")
            return False
    def _append_new_content(self, current_size):
        """追加新内容到HDFS"""
        offset = self.last_size
        length = current_size - offset
        print(f"\n检测到新内容 | 偏移量: {offset} | 长度: {length} 字节")
        try:
            # 读取本地新增内容
            with open(LOCAL_FILE, 'rb') as f:
                f.seek(offset)
                new_content = f.read(length)
            # 读取HDFS旧内容并合并
            old_content = self._read_hdfs_file()
            with self.hdfs_client.open_output_stream(HDFS_FILE) as hdfs_stream:
                hdfs_stream.write(old_content + new_content)
            self.bytes_written += length
            self.last_write_time = time.time()
            timestamp = datetime.now().strftime('%H:%M:%S.%f')[:-3]
            print(f"[{timestamp}] 新内容追加完成 | 本次: {length} 字节 | 累计: {self.bytes_written} 字节")
        except Exception as e:
            print(f"新内容追加失败: {str(e)}")
    def listen_and_append(self):
        """监听新内容,超时关闭连接"""
        print("\n===== 开始监听新内容 =====")
        print(f"监听文件: {LOCAL_FILE} | 超时设置: {IDLE_TIMEOUT}秒")
        while True:
            # 超时检查:超过60秒无写入则关闭连接
            idle_time = time.time() - self.last_write_time
            if idle_time > IDLE_TIMEOUT:
                print(f"\n已{idle_time:.1f}秒无新内容,触发超时")
                self._close_hdfs_connection()
                print("程序因超时自动退出")
                return
            file_stat = self._get_file_stat()
            if not file_stat:
                time.sleep(CHECK_INTERVAL)
                continue
            # 处理文件轮转
            if file_stat['inode'] != self.last_inode:
                print(f"\n检测到文件轮转,重新初始化监听...")
                self.last_inode = file_stat['inode']
                self.last_size = 0
            # 追加新内容
            if file_stat['size'] > self.last_size:
                self._append_new_content(file_stat['size'])
                self.last_size = file_stat['size']
            time.sleep(CHECK_INTERVAL)
    # 新增run方法(之前缺失的部分)
    def run(self):
        """主程序入口"""
        if not self.upload_historical_data():
            print("===== 历史文件上传失败,退出程序 =====")
            return
        self.listen_and_append()
if __name__ == "__main__":
    syncer = HDFSLogSyncer()
    syncer.run()

执行输出过程

代码语言:javascript
代码运行次数:0
运行
复制
[root@localhost ~]# python app.py 

===== 开始上传历史文件 =====
25/07/27 00:04:11 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
HDFS连接成功 (hdfs://192.168.31.165:9000)
历史文件上传完成 | 大小: 960 字节 | 累计: 1839919 字节

===== 开始监听新内容 =====
监听文件: /var/log/messages | 超时设置: 60秒

检测到新内容 | 偏移量: 960 | 长度: 31 字节
[00:04:18.048] 新内容追加完成 | 本次: 31 字节 | 累计: 1839950 字节

检测到新内容 | 偏移量: 991 | 长度: 31 字节
[00:04:19.515] 新内容追加完成 | 本次: 31 字节 | 累计: 1839981 字节

已60.0秒无新内容,触发超时
HDFS连接已关闭
程序因超时自动退出

当然这里的只演示了很简单的逻辑,实际的环境需要考虑的因素会更多,当然这个一般而言这个是业务方考虑的,我这里通过这个演示,主要是为了让大家更好的理解这个逻辑。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2025-07-27,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 运维小路 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档