首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >第九章:持久化存储与序列化

第九章:持久化存储与序列化

作者头像
javpower
发布2025-08-06 17:33:22
发布2025-08-06 17:33:22
10700
代码可运行
举报
运行总次数:0
代码可运行

第九章:持久化存储与序列化

9.1 存储系统架构

9.1.1 存储架构设计

向量搜索引擎的存储系统需要处理索引数据、元数据和操作日志的持久化,确保数据的安全性和快速恢复能力。

9.1.2 存储接口设计

代码语言:javascript
代码运行次数:0
运行
复制
package com.jvector.storage;

import com.jvector.index.HnswIndex;
import java.io.IOException;
import java.nio.file.Path;
import java.util.Map;

/**
 * 索引存储接口
 */
public interface IndexStorage {

    /**
     * 保存索引到指定路径
     */
    void save(HnswIndex index, Path path) throws IOException;

    /**
     * 从指定路径加载索引
     */
    HnswIndex load(Path path) throws IOException;

    /**
     * 保存索引元数据
     */
    void saveMetadata(IndexMetadata metadata, Path path) throws IOException;

    /**
     * 加载索引元数据
     */
    IndexMetadata loadMetadata(Path path) throws IOException;

    /**
     * 增量保存(只保存变化的部分)
     */
    void saveIncremental(HnswIndex index, Path basePath, long version) throws IOException;

    /**
     * 增量加载
     */
    void loadIncremental(HnswIndex index, Path basePath, long fromVersion, long toVersion) throws IOException;

    /**
     * 检查索引文件是否存在且有效
     */
    boolean exists(Path path);

    /**
     * 获取存储统计信息
     */
    StorageStats getStats(Path path);

    /**
     * 压缩存储文件
     */
    void compact(Path path) throws IOException;
}

9.2 索引序列化策略

9.2.1 Java原生序列化

代码语言:javascript
代码运行次数:0
运行
复制
package com.jvector.storage.serialization;

import com.jvector.index.HnswIndex;
import com.jvector.index.HnswNode;
import java.io.*;
import java.nio.file.Files;
import java.nio.file.Path;

/**
 * Java原生序列化实现
 */
public class JavaSerializationStrategy implements SerializationStrategy {
    private static final Logger logger = LoggerFactory.getLogger(JavaSerializationStrategy.class);

    @Override
    public void serialize(HnswIndex index, Path path) throws IOException {
        logger.info("Serializing index using Java native serialization to: {}", path);

        try (ObjectOutputStream oos = new ObjectOutputStream(
                new BufferedOutputStream(Files.newOutputStream(path)))) {

            // 写入魔法数字和版本信息
            oos.writeInt(JAVA_MAGIC_NUMBER);
            oos.writeInt(SERIALIZATION_VERSION);

            // 写入索引基本信息
            writeIndexMetadata(oos, index);

            // 写入所有节点
            writeNodes(oos, index);

            // 写入连接信息
            writeConnections(oos, index);

            oos.flush();
            logger.info("Index serialization completed");
        }
    }

    @Override
    public HnswIndex deserialize(Path path) throws IOException {
        logger.info("Deserializing index from: {}", path);

        try (ObjectInputStream ois = new ObjectInputStream(
                new BufferedInputStream(Files.newInputStream(path)))) {

            // 验证魔法数字和版本
            validateHeader(ois);

            // 读取索引元数据
            IndexMetadata metadata = readIndexMetadata(ois);

            // 创建新的索引实例
            HnswIndex index = new HnswIndex(metadata.getConfig());

            // 读取节点数据
            readNodes(ois, index, metadata.getNodeCount());

            // 读取连接信息
            readConnections(ois, index);

            logger.info("Index deserialization completed, loaded {} nodes", metadata.getNodeCount());
            return index;
        }
    }

    /**
     * 写入索引元数据
     */
    private void writeIndexMetadata(ObjectOutputStream oos, HnswIndex index) throws IOException {
        oos.writeObject(index.getConfig());
        oos.writeLong(index.size());
        oos.writeObject(index.getEntryPointId());
        oos.writeLong(System.currentTimeMillis()); // 保存时间戳
    }

    /**
     * 写入节点信息
     */
    private void writeNodes(ObjectOutputStream oos, HnswIndex index) throws IOException {
        Collection<HnswNode> nodes = index.getAllNodes();
        oos.writeInt(nodes.size());

        for (HnswNode node : nodes) {
            oos.writeObject(node);
        }
    }

    /**
     * 写入连接信息
     */
    private void writeConnections(ObjectOutputStream oos, HnswIndex index) throws IOException {
        Map<Long, Map<Integer, Set<Long>>> allConnections = index.getAllConnections();
        oos.writeObject(allConnections);
    }

    /**
     * 验证文件头
     */
    private void validateHeader(ObjectInputStream ois) throws IOException {
        int magic = ois.readInt();
        int version = ois.readInt();

        if (magic != JAVA_MAGIC_NUMBER) {
            throw new IOException("Invalid file format, magic number mismatch");
        }

        if (version != SERIALIZATION_VERSION) {
            throw new IOException("Unsupported serialization version: " + version);
        }
    }

    /**
     * 读取索引元数据
     */
    private IndexMetadata readIndexMetadata(ObjectInputStream ois) throws IOException {
        try {
            HnswConfig config = (HnswConfig) ois.readObject();
            long nodeCount = ois.readLong();
            Long entryPointId = (Long) ois.readObject();
            long timestamp = ois.readLong();

            return new IndexMetadata(config, nodeCount, entryPointId, timestamp);

        } catch (ClassNotFoundException e) {
            throw new IOException("Failed to read index metadata", e);
        }
    }

    @Override
    public String getName() {
        return "java-native";
    }

    @Override
    public boolean supportsIncremental() {
        return false; // Java原生序列化不支持增量保存
    }

    private static final int JAVA_MAGIC_NUMBER = 0x4A564543; // "JVEC"
    private static final int SERIALIZATION_VERSION = 1;
}

9.2.2 Kryo高性能序列化

代码语言:javascript
代码运行次数:0
运行
复制
package com.jvector.storage.serialization;

import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import com.esotericsoftware.kryo.serializers.MapSerializer;
import com.jvector.core.Vector;
import com.jvector.index.HnswIndex;
import com.jvector.index.HnswNode;

/**
 * Kryo序列化策略实现
 */
public class KryoSerializationStrategy implements SerializationStrategy {
    private static final Logger logger = LoggerFactory.getLogger(KryoSerializationStrategy.class);

    private final ThreadLocal<Kryo> kryoThreadLocal = ThreadLocal.withInitial(this::createKryo);

    @Override
    public void serialize(HnswIndex index, Path path) throws IOException {
        logger.info("Serializing index using Kryo to: {}", path);

        Kryo kryo = kryoThreadLocal.get();

        try (Output output = new Output(Files.newOutputStream(path))) {
            // 写入文件头
            writeHeader(output);

            // 序列化索引
            kryo.writeObject(output, index);

            output.flush();
            logger.info("Kryo serialization completed");

        } catch (Exception e) {
            throw new IOException("Kryo serialization failed", e);
        }
    }

    @Override
    public HnswIndex deserialize(Path path) throws IOException {
        logger.info("Deserializing index using Kryo from: {}", path);

        Kryo kryo = kryoThreadLocal.get();

        try (Input input = new Input(Files.newInputStream(path))) {
            // 验证文件头
            validateHeader(input);

            // 反序列化索引
            HnswIndex index = kryo.readObject(input, HnswIndex.class);

            logger.info("Kryo deserialization completed");
            return index;

        } catch (Exception e) {
            throw new IOException("Kryo deserialization failed", e);
        }
    }

    /**
     * 创建并配置Kryo实例
     */
    private Kryo createKryo() {
        Kryo kryo = new Kryo();

        // 注册类以提高性能
        kryo.register(HnswIndex.class);
        kryo.register(HnswNode.class);
        kryo.register(Vector.class);
        kryo.register(float[].class);
        kryo.register(HashMap.class);
        kryo.register(HashSet.class);
        kryo.register(ConcurrentHashMap.class);

        // 配置Map序列化器
        MapSerializer mapSerializer = new MapSerializer();
        mapSerializer.setKeyClass(Long.class, kryo.getSerializer(Long.class));
        mapSerializer.setValueClass(HnswNode.class, kryo.getSerializer(HnswNode.class));
        kryo.register(HashMap.class, mapSerializer);

        // 设置引用处理
        kryo.setReferences(true);

        // 设置注册要求
        kryo.setRegistrationRequired(false);

        return kryo;
    }

    /**
     * 写入文件头
     */
    private void writeHeader(Output output) {
        output.writeInt(KRYO_MAGIC_NUMBER);
        output.writeInt(SERIALIZATION_VERSION);
        output.writeLong(System.currentTimeMillis());
    }

    /**
     * 验证文件头
     */
    private void validateHeader(Input input) throws IOException {
        int magic = input.readInt();
        int version = input.readInt();
        long timestamp = input.readLong();

        if (magic != KRYO_MAGIC_NUMBER) {
            throw new IOException("Invalid Kryo file format");
        }

        if (version != SERIALIZATION_VERSION) {
            throw new IOException("Unsupported Kryo serialization version: " + version);
        }
    }

    @Override
    public String getName() {
        return "kryo";
    }

    @Override
    public boolean supportsIncremental() {
        return true;
    }

    /**
     * 增量序列化实现
     */
    public void serializeIncremental(HnswIndex index, Path basePath, 
                                   Set<Long> changedNodeIds, long version) throws IOException {
        Path incrementalPath = basePath.resolve("incremental_" + version + ".kryo");

        Kryo kryo = kryoThreadLocal.get();

        try (Output output = new Output(Files.newOutputStream(incrementalPath))) {
            writeHeader(output);

            // 写入版本号
            output.writeLong(version);

            // 写入变化的节点数量
            output.writeInt(changedNodeIds.size());

            // 写入变化的节点
            for (Long nodeId : changedNodeIds) {
                HnswNode node = index.getNode(nodeId);
                if (node != null) {
                    output.writeLong(nodeId);
                    kryo.writeObject(output, node);
                } else {
                    // 标记删除的节点
                    output.writeLong(nodeId);
                    output.writeBoolean(true); // deleted flag
                }
            }

            output.flush();
        }
    }

    private static final int KRYO_MAGIC_NUMBER = 0x4B52594F; // "KRYO"
    private static final int SERIALIZATION_VERSION = 1;
}

9.2.3 自定义二进制格式

代码语言:javascript
代码运行次数:0
运行
复制
package com.jvector.storage.serialization;

import com.jvector.core.Vector;
import com.jvector.index.HnswIndex;
import com.jvector.index.HnswNode;
import java.io.*;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.zip.CRC32;

/**
 * 自定义二进制序列化策略
 * 提供最高的性能和最小的存储空间
 */
public class CustomBinarySerializationStrategy implements SerializationStrategy {
    private static final Logger logger = LoggerFactory.getLogger(CustomBinarySerializationStrategy.class);

    @Override
    public void serialize(HnswIndex index, Path path) throws IOException {
        logger.info("Serializing index using custom binary format to: {}", path);

        try (FileChannel channel = FileChannel.open(path, 
                StandardOpenOption.CREATE, 
                StandardOpenOption.WRITE, 
                StandardOpenOption.TRUNCATE_EXISTING)) {

            // 写入文件头
            writeFileHeader(channel, index);

            // 写入节点数据
            writeNodesData(channel, index);

            // 写入连接数据
            writeConnectionsData(channel, index);

            // 写入校验和
            writeChecksum(channel);

            channel.force(true);
            logger.info("Custom binary serialization completed");
        }
    }

    @Override
    public HnswIndex deserialize(Path path) throws IOException {
        logger.info("Deserializing index using custom binary format from: {}", path);

        try (FileChannel channel = FileChannel.open(path, StandardOpenOption.READ)) {

            // 读取并验证文件头
            FileHeader header = readAndValidateHeader(channel);

            // 创建索引实例
            HnswIndex index = new HnswIndex(header.getConfig());

            // 读取节点数据
            readNodesData(channel, index, header);

            // 读取连接数据
            readConnectionsData(channel, index, header);

            // 验证校验和
            validateChecksum(channel);

            logger.info("Custom binary deserialization completed");
            return index;
        }
    }

    /**
     * 写入文件头
     */
    private void writeFileHeader(FileChannel channel, HnswIndex index) throws IOException {
        ByteBuffer buffer = ByteBuffer.allocate(1024); // 固定大小的头部

        // 魔法数字
        buffer.putInt(CUSTOM_MAGIC_NUMBER);

        // 版本号
        buffer.putInt(SERIALIZATION_VERSION);

        // 时间戳
        buffer.putLong(System.currentTimeMillis());

        // 索引配置
        HnswConfig config = index.getConfig();
        buffer.putInt(config.getM());
        buffer.putInt(config.getEfConstruction());
        buffer.putInt(config.getMaxLevel());

        // 索引统计
        buffer.putLong(index.size());

        // 入口点ID
        Long entryPointId = index.getEntryPointId();
        buffer.put(entryPointId != null ? (byte) 1 : (byte) 0);
        if (entryPointId != null) {
            buffer.putLong(entryPointId);
        }

        // 距离度量名称
        String metricName = index.getDistanceMetric().getName();
        byte[] metricBytes = metricName.getBytes("UTF-8");
        buffer.putInt(metricBytes.length);
        buffer.put(metricBytes);

        // 填充剩余空间
        while (buffer.hasRemaining()) {
            buffer.put((byte) 0);
        }

        buffer.flip();
        channel.write(buffer);
    }

    /**
     * 写入节点数据
     */
    private void writeNodesData(FileChannel channel, HnswIndex index) throws IOException {
        Collection<HnswNode> nodes = index.getAllNodes();

        // 写入节点数量
        ByteBuffer countBuffer = ByteBuffer.allocate(8);
        countBuffer.putLong(nodes.size());
        countBuffer.flip();
        channel.write(countBuffer);

        // 写入每个节点
        for (HnswNode node : nodes) {
            writeNode(channel, node);
        }
    }

    /**
     * 写入单个节点
     */
    private void writeNode(FileChannel channel, HnswNode node) throws IOException {
        Vector vector = node.getVector();
        float[] data = vector.getData();

        // 计算节点大小
        int nodeSize = 8 + // ID
                      4 + // level
                      4 + // dimension
                      data.length * 4; // vector data

        ByteBuffer buffer = ByteBuffer.allocate(nodeSize);

        // 写入节点ID
        buffer.putLong(node.getId());

        // 写入层级
        buffer.putInt(node.getLevel());

        // 写入向量维度
        buffer.putInt(data.length);

        // 写入向量数据
        for (float value : data) {
            buffer.putFloat(value);
        }

        buffer.flip();
        channel.write(buffer);
    }

    /**
     * 写入连接数据
     */
    private void writeConnectionsData(FileChannel channel, HnswIndex index) throws IOException {
        Map<Long, Map<Integer, Set<Long>>> allConnections = index.getAllConnections();

        // 写入连接数据大小
        ByteBuffer sizeBuffer = ByteBuffer.allocate(8);
        sizeBuffer.putLong(allConnections.size());
        sizeBuffer.flip();
        channel.write(sizeBuffer);

        // 写入每个节点的连接
        for (Map.Entry<Long, Map<Integer, Set<Long>>> nodeEntry : allConnections.entrySet()) {
            writeNodeConnections(channel, nodeEntry.getKey(), nodeEntry.getValue());
        }
    }

    /**
     * 写入单个节点的连接
     */
    private void writeNodeConnections(FileChannel channel, Long nodeId, 
                                    Map<Integer, Set<Long>> levelConnections) throws IOException {

        // 计算缓冲区大小
        int totalConnections = levelConnections.values().stream()
            .mapToInt(Set::size)
            .sum();

        int bufferSize = 8 + // node ID
                        4 + // level count
                        levelConnections.size() * 8 + // level headers
                        totalConnections * 8; // connection IDs

        ByteBuffer buffer = ByteBuffer.allocate(bufferSize);

        // 写入节点ID
        buffer.putLong(nodeId);

        // 写入层级数量
        buffer.putInt(levelConnections.size());

        // 写入每层的连接
        for (Map.Entry<Integer, Set<Long>> levelEntry : levelConnections.entrySet()) {
            int level = levelEntry.getKey();
            Set<Long> connections = levelEntry.getValue();

            buffer.putInt(level);
            buffer.putInt(connections.size());

            for (Long connectionId : connections) {
                buffer.putLong(connectionId);
            }
        }

        buffer.flip();
        channel.write(buffer);
    }

    /**
     * 写入校验和
     */
    private void writeChecksum(FileChannel channel) throws IOException {
        // 计算整个文件的CRC32校验和
        channel.position(0);
        CRC32 crc32 = new CRC32();

        ByteBuffer buffer = ByteBuffer.allocate(8192);
        while (channel.read(buffer) > 0) {
            buffer.flip();
            byte[] bytes = new byte[buffer.remaining()];
            buffer.get(bytes);
            crc32.update(bytes);
            buffer.clear();
        }

        // 写入校验和
        ByteBuffer checksumBuffer = ByteBuffer.allocate(8);
        checksumBuffer.putLong(crc32.getValue());
        checksumBuffer.flip();
        channel.write(checksumBuffer);
    }

    @Override
    public String getName() {
        return "custom-binary";
    }

    @Override
    public boolean supportsIncremental() {
        return true;
    }

    @Override
    public boolean supportsCompression() {
        return true;
    }

    private static final int CUSTOM_MAGIC_NUMBER = 0x43555342; // "CUSB"
    private static final int SERIALIZATION_VERSION = 1;
}

9.3 文件存储实现

9.3.1 标准文件存储

代码语言:javascript
代码运行次数:0
运行
复制
package com.jvector.storage.file;

import com.jvector.index.HnswIndex;
import com.jvector.storage.IndexStorage;
import com.jvector.storage.serialization.SerializationStrategy;
import java.io.IOException;
import java.nio.file.*;
import java.nio.file.attribute.BasicFileAttributes;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

/**
 * 标准文件存储实现
 */
public class FileIndexStorage implements IndexStorage {
    private static final Logger logger = LoggerFactory.getLogger(FileIndexStorage.class);

    private final SerializationStrategy serializationStrategy;
    private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
    private final StorageConfig config;

    public FileIndexStorage(SerializationStrategy serializationStrategy, StorageConfig config) {
        this.serializationStrategy = serializationStrategy;
        this.config = config;
    }

    @Override
    public void save(HnswIndex index, Path path) throws IOException {
        rwLock.writeLock().lock();
        try {
            logger.info("Saving index to: {}", path);

            // 确保目录存在
            Files.createDirectories(path.getParent());

            // 使用临时文件确保原子性
            Path tempPath = path.resolveSibling(path.getFileName() + ".tmp");

            try {
                // 序列化到临时文件
                serializationStrategy.serialize(index, tempPath);

                // 验证文件完整性
                if (config.isValidateAfterSave()) {
                    validateSavedFile(tempPath);
                }

                // 原子性移动文件
                Files.move(tempPath, path, StandardCopyOption.REPLACE_EXISTING);

                // 保存元数据
                saveMetadata(createMetadata(index), getMetadataPath(path));

                logger.info("Index saved successfully to: {}", path);

            } catch (Exception e) {
                // 清理临时文件
                Files.deleteIfExists(tempPath);
                throw e;
            }

        } finally {
            rwLock.writeLock().unlock();
        }
    }

    @Override
    public HnswIndex load(Path path) throws IOException {
        rwLock.readLock().lock();
        try {
            logger.info("Loading index from: {}", path);

            if (!Files.exists(path)) {
                throw new IOException("Index file not found: " + path);
            }

            // 验证文件完整性
            if (config.isValidateBeforeLoad()) {
                validateIndexFile(path);
            }

            // 反序列化索引
            HnswIndex index = serializationStrategy.deserialize(path);

            logger.info("Index loaded successfully from: {}", path);
            return index;

        } finally {
            rwLock.readLock().unlock();
        }
    }

    @Override
    public void saveMetadata(IndexMetadata metadata, Path path) throws IOException {
        try (ObjectOutputStream oos = new ObjectOutputStream(
                Files.newOutputStream(path))) {
            oos.writeObject(metadata);
        }
    }

    @Override
    public IndexMetadata loadMetadata(Path path) throws IOException {
        try (ObjectInputStream ois = new ObjectInputStream(
                Files.newInputStream(path))) {
            return (IndexMetadata) ois.readObject();
        } catch (ClassNotFoundException e) {
            throw new IOException("Failed to load metadata", e);
        }
    }

    @Override
    public void saveIncremental(HnswIndex index, Path basePath, long version) throws IOException {
        if (!serializationStrategy.supportsIncremental()) {
            throw new UnsupportedOperationException("Serialization strategy does not support incremental save");
        }

        rwLock.writeLock().lock();
        try {
            // 获取变化的节点
            Set<Long> changedNodes = index.getChangedNodesSince(version - 1);

            if (changedNodes.isEmpty()) {
                logger.info("No changes detected, skipping incremental save");
                return;
            }

            // 增量保存
            Path incrementalPath = getIncrementalPath(basePath, version);
            serializationStrategy.serializeIncremental(index, basePath, changedNodes, version);

            // 更新版本信息
            updateVersionInfo(basePath, version);

            logger.info("Incremental save completed, version: {}, changed nodes: {}", 
                       version, changedNodes.size());

        } finally {
            rwLock.writeLock().unlock();
        }
    }

    @Override
    public boolean exists(Path path) {
        return Files.exists(path) && Files.isRegularFile(path);
    }

    @Override
    public StorageStats getStats(Path path) {
        try {
            if (!exists(path)) {
                return StorageStats.empty();
            }

            BasicFileAttributes attrs = Files.readAttributes(path, BasicFileAttributes.class);

            return StorageStats.builder()
                .fileSize(attrs.size())
                .creationTime(attrs.creationTime().toInstant())
                .lastModifiedTime(attrs.lastModifiedTime().toInstant())
                .build();

        } catch (IOException e) {
            logger.warn("Failed to get storage stats for: {}", path, e);
            return StorageStats.empty();
        }
    }

    @Override
    public void compact(Path path) throws IOException {
        logger.info("Compacting storage at: {}", path);

        // 加载索引
        HnswIndex index = load(path);

        // 优化索引结构
        index.optimize();

        // 重新保存
        save(index, path);

        // 清理增量文件
        cleanupIncrementalFiles(path.getParent());

        logger.info("Storage compaction completed");
    }

    /**
     * 验证保存的文件
     */
    private void validateSavedFile(Path path) throws IOException {
        try {
            // 尝试加载以验证完整性
            serializationStrategy.deserialize(path);
        } catch (Exception e) {
            throw new IOException("Saved file validation failed", e);
        }
    }

    /**
     * 验证索引文件
     */
    private void validateIndexFile(Path path) throws IOException {
        // 检查文件大小
        long fileSize = Files.size(path);
        if (fileSize == 0) {
            throw new IOException("Index file is empty: " + path);
        }

        // 可以添加更多验证逻辑,如校验和验证等
    }

    /**
     * 创建索引元数据
     */
    private IndexMetadata createMetadata(HnswIndex index) {
        return IndexMetadata.builder()
            .config(index.getConfig())
            .nodeCount(index.size())
            .entryPointId(index.getEntryPointId())
            .timestamp(System.currentTimeMillis())
            .serializationStrategy(serializationStrategy.getName())
            .build();
    }

    /**
     * 获取元数据文件路径
     */
    private Path getMetadataPath(Path indexPath) {
        return indexPath.resolveSibling(indexPath.getFileName() + ".meta");
    }

    /**
     * 清理增量文件
     */
    private void cleanupIncrementalFiles(Path directory) throws IOException {
        try (DirectoryStream<Path> stream = Files.newDirectoryStream(directory, "incremental_*.dat")) {
            for (Path file : stream) {
                Files.deleteIfExists(file);
            }
        }
    }
}

9.3.2 内存映射文件存储

代码语言:javascript
代码运行次数:0
运行
复制
package com.jvector.storage.file;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;

/**
 * 内存映射文件存储实现
 * 适用于大型索引的高性能访问
 */
public class MemoryMappedFileStorage extends FileIndexStorage {
    private static final Logger logger = LoggerFactory.getLogger(MemoryMappedFileStorage.class);

    private final long mappingSize;
    private final Map<Path, MappedByteBuffer> mappedBuffers = new ConcurrentHashMap<>();

    public MemoryMappedFileStorage(SerializationStrategy serializationStrategy, 
                                  StorageConfig config, long mappingSize) {
        super(serializationStrategy, config);
        this.mappingSize = mappingSize;
    }

    /**
     * 创建内存映射
     */
    public MappedByteBuffer createMapping(Path path) throws IOException {
        try (FileChannel channel = FileChannel.open(path, 
                StandardOpenOption.READ, 
                StandardOpenOption.WRITE, 
                StandardOpenOption.CREATE)) {

            long fileSize = channel.size();
            long mapSize = Math.max(fileSize, mappingSize);

            // 扩展文件大小(如果需要)
            if (fileSize < mapSize) {
                channel.position(mapSize - 1);
                channel.write(ByteBuffer.allocate(1));
            }

            MappedByteBuffer buffer = channel.map(
                FileChannel.MapMode.READ_WRITE, 0, mapSize);

            mappedBuffers.put(path, buffer);

            logger.info("Created memory mapping for: {}, size: {} bytes", path, mapSize);
            return buffer;
        }
    }

    /**
     * 获取内存映射缓冲区
     */
    public MappedByteBuffer getMapping(Path path) throws IOException {
        return mappedBuffers.computeIfAbsent(path, p -> {
            try {
                return createMapping(p);
            } catch (IOException e) {
                throw new RuntimeException("Failed to create memory mapping", e);
            }
        });
    }

    /**
     * 刷新内存映射到磁盘
     */
    public void flushMapping(Path path) {
        MappedByteBuffer buffer = mappedBuffers.get(path);
        if (buffer != null) {
            buffer.force();
            logger.debug("Flushed memory mapping for: {}", path);
        }
    }

    /**
     * 关闭内存映射
     */
    public void closeMapping(Path path) {
        MappedByteBuffer buffer = mappedBuffers.remove(path);
        if (buffer != null) {
            // 刷新到磁盘
            buffer.force();

            // 清理内存映射(JVM会在GC时自动清理)
            logger.debug("Closed memory mapping for: {}", path);
        }
    }

    /**
     * 关闭所有内存映射
     */
    public void closeAllMappings() {
        for (Path path : new HashSet<>(mappedBuffers.keySet())) {
            closeMapping(path);
        }
    }

    @Override
    public void save(HnswIndex index, Path path) throws IOException {
        // 使用内存映射进行高性能保存
        MappedByteBuffer buffer = getMapping(path);

        // 将索引数据直接写入内存映射缓冲区
        serializeDirectToBuffer(index, buffer);

        // 强制刷新到磁盘
        flushMapping(path);
    }

    /**
     * 直接序列化到内存缓冲区
     */
    private void serializeDirectToBuffer(HnswIndex index, MappedByteBuffer buffer) {
        buffer.position(0);

        // 写入索引数据到内存映射缓冲区
        // 这里可以实现高效的二进制格式写入

        // 重置缓冲区状态
        buffer.flip();
    }
}

9.3.3 压缩存储

代码语言:javascript
代码运行次数:0
运行
复制
package com.jvector.storage.compression;

import java.io.*;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;

/**
 * 压缩存储装饰器
 */
public class CompressedStorageWrapper implements IndexStorage {
    private final IndexStorage delegate;
    private final CompressionStrategy compressionStrategy;

    public CompressedStorageWrapper(IndexStorage delegate, CompressionStrategy compressionStrategy) {
        this.delegate = delegate;
        this.compressionStrategy = compressionStrategy;
    }

    @Override
    public void save(HnswIndex index, Path path) throws IOException {
        // 保存到临时未压缩文件
        Path tempPath = path.resolveSibling(path.getFileName() + ".tmp");
        delegate.save(index, tempPath);

        // 压缩文件
        Path compressedPath = path.resolveSibling(path.getFileName() + ".gz");
        compressionStrategy.compress(tempPath, compressedPath);

        // 移动到最终位置
        Files.move(compressedPath, path, StandardCopyOption.REPLACE_EXISTING);

        // 清理临时文件
        Files.deleteIfExists(tempPath);
    }

    @Override
    public HnswIndex load(Path path) throws IOException {
        // 解压到临时文件
        Path tempPath = path.resolveSibling(path.getFileName() + ".tmp");
        compressionStrategy.decompress(path, tempPath);

        try {
            // 从临时文件加载
            return delegate.load(tempPath);
        } finally {
            // 清理临时文件
            Files.deleteIfExists(tempPath);
        }
    }

    // 委托其他方法到原始存储
    @Override
    public void saveMetadata(IndexMetadata metadata, Path path) throws IOException {
        delegate.saveMetadata(metadata, path);
    }

    @Override
    public IndexMetadata loadMetadata(Path path) throws IOException {
        return delegate.loadMetadata(path);
    }
}

/**
 * 压缩策略接口
 */
public interface CompressionStrategy {
    void compress(Path source, Path target) throws IOException;
    void decompress(Path source, Path target) throws IOException;
    String getName();
    double getCompressionRatio();
}

/**
 * GZIP压缩策略
 */
public class GzipCompressionStrategy implements CompressionStrategy {

    @Override
    public void compress(Path source, Path target) throws IOException {
        try (InputStream in = Files.newInputStream(source);
             OutputStream out = new GZIPOutputStream(Files.newOutputStream(target))) {

            byte[] buffer = new byte[8192];
            int len;
            while ((len = in.read(buffer)) > 0) {
                out.write(buffer, 0, len);
            }
        }
    }

    @Override
    public void decompress(Path source, Path target) throws IOException {
        try (InputStream in = new GZIPInputStream(Files.newInputStream(source));
             OutputStream out = Files.newOutputStream(target)) {

            byte[] buffer = new byte[8192];
            int len;
            while ((len = in.read(buffer)) > 0) {
                out.write(buffer, 0, len);
            }
        }
    }

    @Override
    public String getName() {
        return "gzip";
    }

    @Override
    public double getCompressionRatio() {
        return 0.3; // 估计压缩比
    }
}

小结

本章详细介绍了向量搜索引擎的持久化存储与序列化机制:

  1. 存储架构
    • 分层的存储系统设计
    • 统一的存储接口
    • 多种存储引擎支持
  2. 序列化策略
    • Java原生序列化(兼容性好)
    • Kryo序列化(高性能)
    • 自定义二进制格式(最优空间)
  3. 文件存储
    • 标准文件存储
    • 内存映射文件
    • 压缩存储支持
  4. 高级特性
    • 增量备份
    • 原子性操作
    • 数据完整性验证

这些存储机制确保了向量数据的安全持久化,支持大规模数据的高效存储和快速恢复。


思考题:

  1. 在什么情况下应该选择内存映射文件而不是标准文件IO?
  2. 如何设计一个支持版本回滚的增量备份机制?
  3. 如何在分布式环境中实现索引数据的一致性复制?
本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2025-07-29,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 Coder建设 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 第九章:持久化存储与序列化
    • 9.1 存储系统架构
      • 9.1.1 存储架构设计
      • 9.1.2 存储接口设计
    • 9.2 索引序列化策略
      • 9.2.1 Java原生序列化
      • 9.2.2 Kryo高性能序列化
      • 9.2.3 自定义二进制格式
    • 9.3 文件存储实现
      • 9.3.1 标准文件存储
      • 9.3.2 内存映射文件存储
      • 9.3.3 压缩存储
    • 小结
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档