向量搜索引擎的存储系统需要处理索引数据、元数据和操作日志的持久化,确保数据的安全性和快速恢复能力。
package com.jvector.storage;
import com.jvector.index.HnswIndex;
import java.io.IOException;
import java.nio.file.Path;
import java.util.Map;
/**
* 索引存储接口
*/
public interface IndexStorage {
/**
* 保存索引到指定路径
*/
void save(HnswIndex index, Path path) throws IOException;
/**
* 从指定路径加载索引
*/
HnswIndex load(Path path) throws IOException;
/**
* 保存索引元数据
*/
void saveMetadata(IndexMetadata metadata, Path path) throws IOException;
/**
* 加载索引元数据
*/
IndexMetadata loadMetadata(Path path) throws IOException;
/**
* 增量保存(只保存变化的部分)
*/
void saveIncremental(HnswIndex index, Path basePath, long version) throws IOException;
/**
* 增量加载
*/
void loadIncremental(HnswIndex index, Path basePath, long fromVersion, long toVersion) throws IOException;
/**
* 检查索引文件是否存在且有效
*/
boolean exists(Path path);
/**
* 获取存储统计信息
*/
StorageStats getStats(Path path);
/**
* 压缩存储文件
*/
void compact(Path path) throws IOException;
}
package com.jvector.storage.serialization;
import com.jvector.index.HnswIndex;
import com.jvector.index.HnswNode;
import java.io.*;
import java.nio.file.Files;
import java.nio.file.Path;
/**
* Java原生序列化实现
*/
public class JavaSerializationStrategy implements SerializationStrategy {
private static final Logger logger = LoggerFactory.getLogger(JavaSerializationStrategy.class);
@Override
public void serialize(HnswIndex index, Path path) throws IOException {
logger.info("Serializing index using Java native serialization to: {}", path);
try (ObjectOutputStream oos = new ObjectOutputStream(
new BufferedOutputStream(Files.newOutputStream(path)))) {
// 写入魔法数字和版本信息
oos.writeInt(JAVA_MAGIC_NUMBER);
oos.writeInt(SERIALIZATION_VERSION);
// 写入索引基本信息
writeIndexMetadata(oos, index);
// 写入所有节点
writeNodes(oos, index);
// 写入连接信息
writeConnections(oos, index);
oos.flush();
logger.info("Index serialization completed");
}
}
@Override
public HnswIndex deserialize(Path path) throws IOException {
logger.info("Deserializing index from: {}", path);
try (ObjectInputStream ois = new ObjectInputStream(
new BufferedInputStream(Files.newInputStream(path)))) {
// 验证魔法数字和版本
validateHeader(ois);
// 读取索引元数据
IndexMetadata metadata = readIndexMetadata(ois);
// 创建新的索引实例
HnswIndex index = new HnswIndex(metadata.getConfig());
// 读取节点数据
readNodes(ois, index, metadata.getNodeCount());
// 读取连接信息
readConnections(ois, index);
logger.info("Index deserialization completed, loaded {} nodes", metadata.getNodeCount());
return index;
}
}
/**
* 写入索引元数据
*/
private void writeIndexMetadata(ObjectOutputStream oos, HnswIndex index) throws IOException {
oos.writeObject(index.getConfig());
oos.writeLong(index.size());
oos.writeObject(index.getEntryPointId());
oos.writeLong(System.currentTimeMillis()); // 保存时间戳
}
/**
* 写入节点信息
*/
private void writeNodes(ObjectOutputStream oos, HnswIndex index) throws IOException {
Collection<HnswNode> nodes = index.getAllNodes();
oos.writeInt(nodes.size());
for (HnswNode node : nodes) {
oos.writeObject(node);
}
}
/**
* 写入连接信息
*/
private void writeConnections(ObjectOutputStream oos, HnswIndex index) throws IOException {
Map<Long, Map<Integer, Set<Long>>> allConnections = index.getAllConnections();
oos.writeObject(allConnections);
}
/**
* 验证文件头
*/
private void validateHeader(ObjectInputStream ois) throws IOException {
int magic = ois.readInt();
int version = ois.readInt();
if (magic != JAVA_MAGIC_NUMBER) {
throw new IOException("Invalid file format, magic number mismatch");
}
if (version != SERIALIZATION_VERSION) {
throw new IOException("Unsupported serialization version: " + version);
}
}
/**
* 读取索引元数据
*/
private IndexMetadata readIndexMetadata(ObjectInputStream ois) throws IOException {
try {
HnswConfig config = (HnswConfig) ois.readObject();
long nodeCount = ois.readLong();
Long entryPointId = (Long) ois.readObject();
long timestamp = ois.readLong();
return new IndexMetadata(config, nodeCount, entryPointId, timestamp);
} catch (ClassNotFoundException e) {
throw new IOException("Failed to read index metadata", e);
}
}
@Override
public String getName() {
return "java-native";
}
@Override
public boolean supportsIncremental() {
return false; // Java原生序列化不支持增量保存
}
private static final int JAVA_MAGIC_NUMBER = 0x4A564543; // "JVEC"
private static final int SERIALIZATION_VERSION = 1;
}
package com.jvector.storage.serialization;
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import com.esotericsoftware.kryo.serializers.MapSerializer;
import com.jvector.core.Vector;
import com.jvector.index.HnswIndex;
import com.jvector.index.HnswNode;
/**
* Kryo序列化策略实现
*/
public class KryoSerializationStrategy implements SerializationStrategy {
private static final Logger logger = LoggerFactory.getLogger(KryoSerializationStrategy.class);
private final ThreadLocal<Kryo> kryoThreadLocal = ThreadLocal.withInitial(this::createKryo);
@Override
public void serialize(HnswIndex index, Path path) throws IOException {
logger.info("Serializing index using Kryo to: {}", path);
Kryo kryo = kryoThreadLocal.get();
try (Output output = new Output(Files.newOutputStream(path))) {
// 写入文件头
writeHeader(output);
// 序列化索引
kryo.writeObject(output, index);
output.flush();
logger.info("Kryo serialization completed");
} catch (Exception e) {
throw new IOException("Kryo serialization failed", e);
}
}
@Override
public HnswIndex deserialize(Path path) throws IOException {
logger.info("Deserializing index using Kryo from: {}", path);
Kryo kryo = kryoThreadLocal.get();
try (Input input = new Input(Files.newInputStream(path))) {
// 验证文件头
validateHeader(input);
// 反序列化索引
HnswIndex index = kryo.readObject(input, HnswIndex.class);
logger.info("Kryo deserialization completed");
return index;
} catch (Exception e) {
throw new IOException("Kryo deserialization failed", e);
}
}
/**
* 创建并配置Kryo实例
*/
private Kryo createKryo() {
Kryo kryo = new Kryo();
// 注册类以提高性能
kryo.register(HnswIndex.class);
kryo.register(HnswNode.class);
kryo.register(Vector.class);
kryo.register(float[].class);
kryo.register(HashMap.class);
kryo.register(HashSet.class);
kryo.register(ConcurrentHashMap.class);
// 配置Map序列化器
MapSerializer mapSerializer = new MapSerializer();
mapSerializer.setKeyClass(Long.class, kryo.getSerializer(Long.class));
mapSerializer.setValueClass(HnswNode.class, kryo.getSerializer(HnswNode.class));
kryo.register(HashMap.class, mapSerializer);
// 设置引用处理
kryo.setReferences(true);
// 设置注册要求
kryo.setRegistrationRequired(false);
return kryo;
}
/**
* 写入文件头
*/
private void writeHeader(Output output) {
output.writeInt(KRYO_MAGIC_NUMBER);
output.writeInt(SERIALIZATION_VERSION);
output.writeLong(System.currentTimeMillis());
}
/**
* 验证文件头
*/
private void validateHeader(Input input) throws IOException {
int magic = input.readInt();
int version = input.readInt();
long timestamp = input.readLong();
if (magic != KRYO_MAGIC_NUMBER) {
throw new IOException("Invalid Kryo file format");
}
if (version != SERIALIZATION_VERSION) {
throw new IOException("Unsupported Kryo serialization version: " + version);
}
}
@Override
public String getName() {
return "kryo";
}
@Override
public boolean supportsIncremental() {
return true;
}
/**
* 增量序列化实现
*/
public void serializeIncremental(HnswIndex index, Path basePath,
Set<Long> changedNodeIds, long version) throws IOException {
Path incrementalPath = basePath.resolve("incremental_" + version + ".kryo");
Kryo kryo = kryoThreadLocal.get();
try (Output output = new Output(Files.newOutputStream(incrementalPath))) {
writeHeader(output);
// 写入版本号
output.writeLong(version);
// 写入变化的节点数量
output.writeInt(changedNodeIds.size());
// 写入变化的节点
for (Long nodeId : changedNodeIds) {
HnswNode node = index.getNode(nodeId);
if (node != null) {
output.writeLong(nodeId);
kryo.writeObject(output, node);
} else {
// 标记删除的节点
output.writeLong(nodeId);
output.writeBoolean(true); // deleted flag
}
}
output.flush();
}
}
private static final int KRYO_MAGIC_NUMBER = 0x4B52594F; // "KRYO"
private static final int SERIALIZATION_VERSION = 1;
}
package com.jvector.storage.serialization;
import com.jvector.core.Vector;
import com.jvector.index.HnswIndex;
import com.jvector.index.HnswNode;
import java.io.*;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.zip.CRC32;
/**
* 自定义二进制序列化策略
* 提供最高的性能和最小的存储空间
*/
public class CustomBinarySerializationStrategy implements SerializationStrategy {
private static final Logger logger = LoggerFactory.getLogger(CustomBinarySerializationStrategy.class);
@Override
public void serialize(HnswIndex index, Path path) throws IOException {
logger.info("Serializing index using custom binary format to: {}", path);
try (FileChannel channel = FileChannel.open(path,
StandardOpenOption.CREATE,
StandardOpenOption.WRITE,
StandardOpenOption.TRUNCATE_EXISTING)) {
// 写入文件头
writeFileHeader(channel, index);
// 写入节点数据
writeNodesData(channel, index);
// 写入连接数据
writeConnectionsData(channel, index);
// 写入校验和
writeChecksum(channel);
channel.force(true);
logger.info("Custom binary serialization completed");
}
}
@Override
public HnswIndex deserialize(Path path) throws IOException {
logger.info("Deserializing index using custom binary format from: {}", path);
try (FileChannel channel = FileChannel.open(path, StandardOpenOption.READ)) {
// 读取并验证文件头
FileHeader header = readAndValidateHeader(channel);
// 创建索引实例
HnswIndex index = new HnswIndex(header.getConfig());
// 读取节点数据
readNodesData(channel, index, header);
// 读取连接数据
readConnectionsData(channel, index, header);
// 验证校验和
validateChecksum(channel);
logger.info("Custom binary deserialization completed");
return index;
}
}
/**
* 写入文件头
*/
private void writeFileHeader(FileChannel channel, HnswIndex index) throws IOException {
ByteBuffer buffer = ByteBuffer.allocate(1024); // 固定大小的头部
// 魔法数字
buffer.putInt(CUSTOM_MAGIC_NUMBER);
// 版本号
buffer.putInt(SERIALIZATION_VERSION);
// 时间戳
buffer.putLong(System.currentTimeMillis());
// 索引配置
HnswConfig config = index.getConfig();
buffer.putInt(config.getM());
buffer.putInt(config.getEfConstruction());
buffer.putInt(config.getMaxLevel());
// 索引统计
buffer.putLong(index.size());
// 入口点ID
Long entryPointId = index.getEntryPointId();
buffer.put(entryPointId != null ? (byte) 1 : (byte) 0);
if (entryPointId != null) {
buffer.putLong(entryPointId);
}
// 距离度量名称
String metricName = index.getDistanceMetric().getName();
byte[] metricBytes = metricName.getBytes("UTF-8");
buffer.putInt(metricBytes.length);
buffer.put(metricBytes);
// 填充剩余空间
while (buffer.hasRemaining()) {
buffer.put((byte) 0);
}
buffer.flip();
channel.write(buffer);
}
/**
* 写入节点数据
*/
private void writeNodesData(FileChannel channel, HnswIndex index) throws IOException {
Collection<HnswNode> nodes = index.getAllNodes();
// 写入节点数量
ByteBuffer countBuffer = ByteBuffer.allocate(8);
countBuffer.putLong(nodes.size());
countBuffer.flip();
channel.write(countBuffer);
// 写入每个节点
for (HnswNode node : nodes) {
writeNode(channel, node);
}
}
/**
* 写入单个节点
*/
private void writeNode(FileChannel channel, HnswNode node) throws IOException {
Vector vector = node.getVector();
float[] data = vector.getData();
// 计算节点大小
int nodeSize = 8 + // ID
4 + // level
4 + // dimension
data.length * 4; // vector data
ByteBuffer buffer = ByteBuffer.allocate(nodeSize);
// 写入节点ID
buffer.putLong(node.getId());
// 写入层级
buffer.putInt(node.getLevel());
// 写入向量维度
buffer.putInt(data.length);
// 写入向量数据
for (float value : data) {
buffer.putFloat(value);
}
buffer.flip();
channel.write(buffer);
}
/**
* 写入连接数据
*/
private void writeConnectionsData(FileChannel channel, HnswIndex index) throws IOException {
Map<Long, Map<Integer, Set<Long>>> allConnections = index.getAllConnections();
// 写入连接数据大小
ByteBuffer sizeBuffer = ByteBuffer.allocate(8);
sizeBuffer.putLong(allConnections.size());
sizeBuffer.flip();
channel.write(sizeBuffer);
// 写入每个节点的连接
for (Map.Entry<Long, Map<Integer, Set<Long>>> nodeEntry : allConnections.entrySet()) {
writeNodeConnections(channel, nodeEntry.getKey(), nodeEntry.getValue());
}
}
/**
* 写入单个节点的连接
*/
private void writeNodeConnections(FileChannel channel, Long nodeId,
Map<Integer, Set<Long>> levelConnections) throws IOException {
// 计算缓冲区大小
int totalConnections = levelConnections.values().stream()
.mapToInt(Set::size)
.sum();
int bufferSize = 8 + // node ID
4 + // level count
levelConnections.size() * 8 + // level headers
totalConnections * 8; // connection IDs
ByteBuffer buffer = ByteBuffer.allocate(bufferSize);
// 写入节点ID
buffer.putLong(nodeId);
// 写入层级数量
buffer.putInt(levelConnections.size());
// 写入每层的连接
for (Map.Entry<Integer, Set<Long>> levelEntry : levelConnections.entrySet()) {
int level = levelEntry.getKey();
Set<Long> connections = levelEntry.getValue();
buffer.putInt(level);
buffer.putInt(connections.size());
for (Long connectionId : connections) {
buffer.putLong(connectionId);
}
}
buffer.flip();
channel.write(buffer);
}
/**
* 写入校验和
*/
private void writeChecksum(FileChannel channel) throws IOException {
// 计算整个文件的CRC32校验和
channel.position(0);
CRC32 crc32 = new CRC32();
ByteBuffer buffer = ByteBuffer.allocate(8192);
while (channel.read(buffer) > 0) {
buffer.flip();
byte[] bytes = new byte[buffer.remaining()];
buffer.get(bytes);
crc32.update(bytes);
buffer.clear();
}
// 写入校验和
ByteBuffer checksumBuffer = ByteBuffer.allocate(8);
checksumBuffer.putLong(crc32.getValue());
checksumBuffer.flip();
channel.write(checksumBuffer);
}
@Override
public String getName() {
return "custom-binary";
}
@Override
public boolean supportsIncremental() {
return true;
}
@Override
public boolean supportsCompression() {
return true;
}
private static final int CUSTOM_MAGIC_NUMBER = 0x43555342; // "CUSB"
private static final int SERIALIZATION_VERSION = 1;
}
package com.jvector.storage.file;
import com.jvector.index.HnswIndex;
import com.jvector.storage.IndexStorage;
import com.jvector.storage.serialization.SerializationStrategy;
import java.io.IOException;
import java.nio.file.*;
import java.nio.file.attribute.BasicFileAttributes;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
* 标准文件存储实现
*/
public class FileIndexStorage implements IndexStorage {
private static final Logger logger = LoggerFactory.getLogger(FileIndexStorage.class);
private final SerializationStrategy serializationStrategy;
private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
private final StorageConfig config;
public FileIndexStorage(SerializationStrategy serializationStrategy, StorageConfig config) {
this.serializationStrategy = serializationStrategy;
this.config = config;
}
@Override
public void save(HnswIndex index, Path path) throws IOException {
rwLock.writeLock().lock();
try {
logger.info("Saving index to: {}", path);
// 确保目录存在
Files.createDirectories(path.getParent());
// 使用临时文件确保原子性
Path tempPath = path.resolveSibling(path.getFileName() + ".tmp");
try {
// 序列化到临时文件
serializationStrategy.serialize(index, tempPath);
// 验证文件完整性
if (config.isValidateAfterSave()) {
validateSavedFile(tempPath);
}
// 原子性移动文件
Files.move(tempPath, path, StandardCopyOption.REPLACE_EXISTING);
// 保存元数据
saveMetadata(createMetadata(index), getMetadataPath(path));
logger.info("Index saved successfully to: {}", path);
} catch (Exception e) {
// 清理临时文件
Files.deleteIfExists(tempPath);
throw e;
}
} finally {
rwLock.writeLock().unlock();
}
}
@Override
public HnswIndex load(Path path) throws IOException {
rwLock.readLock().lock();
try {
logger.info("Loading index from: {}", path);
if (!Files.exists(path)) {
throw new IOException("Index file not found: " + path);
}
// 验证文件完整性
if (config.isValidateBeforeLoad()) {
validateIndexFile(path);
}
// 反序列化索引
HnswIndex index = serializationStrategy.deserialize(path);
logger.info("Index loaded successfully from: {}", path);
return index;
} finally {
rwLock.readLock().unlock();
}
}
@Override
public void saveMetadata(IndexMetadata metadata, Path path) throws IOException {
try (ObjectOutputStream oos = new ObjectOutputStream(
Files.newOutputStream(path))) {
oos.writeObject(metadata);
}
}
@Override
public IndexMetadata loadMetadata(Path path) throws IOException {
try (ObjectInputStream ois = new ObjectInputStream(
Files.newInputStream(path))) {
return (IndexMetadata) ois.readObject();
} catch (ClassNotFoundException e) {
throw new IOException("Failed to load metadata", e);
}
}
@Override
public void saveIncremental(HnswIndex index, Path basePath, long version) throws IOException {
if (!serializationStrategy.supportsIncremental()) {
throw new UnsupportedOperationException("Serialization strategy does not support incremental save");
}
rwLock.writeLock().lock();
try {
// 获取变化的节点
Set<Long> changedNodes = index.getChangedNodesSince(version - 1);
if (changedNodes.isEmpty()) {
logger.info("No changes detected, skipping incremental save");
return;
}
// 增量保存
Path incrementalPath = getIncrementalPath(basePath, version);
serializationStrategy.serializeIncremental(index, basePath, changedNodes, version);
// 更新版本信息
updateVersionInfo(basePath, version);
logger.info("Incremental save completed, version: {}, changed nodes: {}",
version, changedNodes.size());
} finally {
rwLock.writeLock().unlock();
}
}
@Override
public boolean exists(Path path) {
return Files.exists(path) && Files.isRegularFile(path);
}
@Override
public StorageStats getStats(Path path) {
try {
if (!exists(path)) {
return StorageStats.empty();
}
BasicFileAttributes attrs = Files.readAttributes(path, BasicFileAttributes.class);
return StorageStats.builder()
.fileSize(attrs.size())
.creationTime(attrs.creationTime().toInstant())
.lastModifiedTime(attrs.lastModifiedTime().toInstant())
.build();
} catch (IOException e) {
logger.warn("Failed to get storage stats for: {}", path, e);
return StorageStats.empty();
}
}
@Override
public void compact(Path path) throws IOException {
logger.info("Compacting storage at: {}", path);
// 加载索引
HnswIndex index = load(path);
// 优化索引结构
index.optimize();
// 重新保存
save(index, path);
// 清理增量文件
cleanupIncrementalFiles(path.getParent());
logger.info("Storage compaction completed");
}
/**
* 验证保存的文件
*/
private void validateSavedFile(Path path) throws IOException {
try {
// 尝试加载以验证完整性
serializationStrategy.deserialize(path);
} catch (Exception e) {
throw new IOException("Saved file validation failed", e);
}
}
/**
* 验证索引文件
*/
private void validateIndexFile(Path path) throws IOException {
// 检查文件大小
long fileSize = Files.size(path);
if (fileSize == 0) {
throw new IOException("Index file is empty: " + path);
}
// 可以添加更多验证逻辑,如校验和验证等
}
/**
* 创建索引元数据
*/
private IndexMetadata createMetadata(HnswIndex index) {
return IndexMetadata.builder()
.config(index.getConfig())
.nodeCount(index.size())
.entryPointId(index.getEntryPointId())
.timestamp(System.currentTimeMillis())
.serializationStrategy(serializationStrategy.getName())
.build();
}
/**
* 获取元数据文件路径
*/
private Path getMetadataPath(Path indexPath) {
return indexPath.resolveSibling(indexPath.getFileName() + ".meta");
}
/**
* 清理增量文件
*/
private void cleanupIncrementalFiles(Path directory) throws IOException {
try (DirectoryStream<Path> stream = Files.newDirectoryStream(directory, "incremental_*.dat")) {
for (Path file : stream) {
Files.deleteIfExists(file);
}
}
}
}
package com.jvector.storage.file;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
/**
* 内存映射文件存储实现
* 适用于大型索引的高性能访问
*/
public class MemoryMappedFileStorage extends FileIndexStorage {
private static final Logger logger = LoggerFactory.getLogger(MemoryMappedFileStorage.class);
private final long mappingSize;
private final Map<Path, MappedByteBuffer> mappedBuffers = new ConcurrentHashMap<>();
public MemoryMappedFileStorage(SerializationStrategy serializationStrategy,
StorageConfig config, long mappingSize) {
super(serializationStrategy, config);
this.mappingSize = mappingSize;
}
/**
* 创建内存映射
*/
public MappedByteBuffer createMapping(Path path) throws IOException {
try (FileChannel channel = FileChannel.open(path,
StandardOpenOption.READ,
StandardOpenOption.WRITE,
StandardOpenOption.CREATE)) {
long fileSize = channel.size();
long mapSize = Math.max(fileSize, mappingSize);
// 扩展文件大小(如果需要)
if (fileSize < mapSize) {
channel.position(mapSize - 1);
channel.write(ByteBuffer.allocate(1));
}
MappedByteBuffer buffer = channel.map(
FileChannel.MapMode.READ_WRITE, 0, mapSize);
mappedBuffers.put(path, buffer);
logger.info("Created memory mapping for: {}, size: {} bytes", path, mapSize);
return buffer;
}
}
/**
* 获取内存映射缓冲区
*/
public MappedByteBuffer getMapping(Path path) throws IOException {
return mappedBuffers.computeIfAbsent(path, p -> {
try {
return createMapping(p);
} catch (IOException e) {
throw new RuntimeException("Failed to create memory mapping", e);
}
});
}
/**
* 刷新内存映射到磁盘
*/
public void flushMapping(Path path) {
MappedByteBuffer buffer = mappedBuffers.get(path);
if (buffer != null) {
buffer.force();
logger.debug("Flushed memory mapping for: {}", path);
}
}
/**
* 关闭内存映射
*/
public void closeMapping(Path path) {
MappedByteBuffer buffer = mappedBuffers.remove(path);
if (buffer != null) {
// 刷新到磁盘
buffer.force();
// 清理内存映射(JVM会在GC时自动清理)
logger.debug("Closed memory mapping for: {}", path);
}
}
/**
* 关闭所有内存映射
*/
public void closeAllMappings() {
for (Path path : new HashSet<>(mappedBuffers.keySet())) {
closeMapping(path);
}
}
@Override
public void save(HnswIndex index, Path path) throws IOException {
// 使用内存映射进行高性能保存
MappedByteBuffer buffer = getMapping(path);
// 将索引数据直接写入内存映射缓冲区
serializeDirectToBuffer(index, buffer);
// 强制刷新到磁盘
flushMapping(path);
}
/**
* 直接序列化到内存缓冲区
*/
private void serializeDirectToBuffer(HnswIndex index, MappedByteBuffer buffer) {
buffer.position(0);
// 写入索引数据到内存映射缓冲区
// 这里可以实现高效的二进制格式写入
// 重置缓冲区状态
buffer.flip();
}
}
package com.jvector.storage.compression;
import java.io.*;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;
/**
* 压缩存储装饰器
*/
public class CompressedStorageWrapper implements IndexStorage {
private final IndexStorage delegate;
private final CompressionStrategy compressionStrategy;
public CompressedStorageWrapper(IndexStorage delegate, CompressionStrategy compressionStrategy) {
this.delegate = delegate;
this.compressionStrategy = compressionStrategy;
}
@Override
public void save(HnswIndex index, Path path) throws IOException {
// 保存到临时未压缩文件
Path tempPath = path.resolveSibling(path.getFileName() + ".tmp");
delegate.save(index, tempPath);
// 压缩文件
Path compressedPath = path.resolveSibling(path.getFileName() + ".gz");
compressionStrategy.compress(tempPath, compressedPath);
// 移动到最终位置
Files.move(compressedPath, path, StandardCopyOption.REPLACE_EXISTING);
// 清理临时文件
Files.deleteIfExists(tempPath);
}
@Override
public HnswIndex load(Path path) throws IOException {
// 解压到临时文件
Path tempPath = path.resolveSibling(path.getFileName() + ".tmp");
compressionStrategy.decompress(path, tempPath);
try {
// 从临时文件加载
return delegate.load(tempPath);
} finally {
// 清理临时文件
Files.deleteIfExists(tempPath);
}
}
// 委托其他方法到原始存储
@Override
public void saveMetadata(IndexMetadata metadata, Path path) throws IOException {
delegate.saveMetadata(metadata, path);
}
@Override
public IndexMetadata loadMetadata(Path path) throws IOException {
return delegate.loadMetadata(path);
}
}
/**
* 压缩策略接口
*/
public interface CompressionStrategy {
void compress(Path source, Path target) throws IOException;
void decompress(Path source, Path target) throws IOException;
String getName();
double getCompressionRatio();
}
/**
* GZIP压缩策略
*/
public class GzipCompressionStrategy implements CompressionStrategy {
@Override
public void compress(Path source, Path target) throws IOException {
try (InputStream in = Files.newInputStream(source);
OutputStream out = new GZIPOutputStream(Files.newOutputStream(target))) {
byte[] buffer = new byte[8192];
int len;
while ((len = in.read(buffer)) > 0) {
out.write(buffer, 0, len);
}
}
}
@Override
public void decompress(Path source, Path target) throws IOException {
try (InputStream in = new GZIPInputStream(Files.newInputStream(source));
OutputStream out = Files.newOutputStream(target)) {
byte[] buffer = new byte[8192];
int len;
while ((len = in.read(buffer)) > 0) {
out.write(buffer, 0, len);
}
}
}
@Override
public String getName() {
return "gzip";
}
@Override
public double getCompressionRatio() {
return 0.3; // 估计压缩比
}
}
本章详细介绍了向量搜索引擎的持久化存储与序列化机制:
这些存储机制确保了向量数据的安全持久化,支持大规模数据的高效存储和快速恢复。
思考题: