计算引擎是向量搜索系统的性能核心,负责:

package com.jvector.compute;
import com.jvector.core.DistanceMetric;
/**
* 计算引擎接口
*/
public interface ComputeEngine {
/**
* 计算单对向量距离
*/
float distance(float[] a, float[] b, DistanceMetric metric);
/**
* 批量计算距离
*/
float[] batchDistance(float[] query, float[][] vectors, DistanceMetric metric);
/**
* 矩阵向量乘法
*/
float[] matrixVectorMultiply(float[][] matrix, float[] vector);
/**
* 向量点积
*/
float dotProduct(float[] a, float[] b);
/**
* 获取引擎名称
*/
String getName();
/**
* 检查是否可用
*/
boolean isAvailable();
/**
* 预热引擎
*/
void warmup();
/**
* 清理资源
*/
void cleanup();
}
package com.jvector.compute;
import com.jvector.core.DistanceMetric;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;
/**
* CPU计算引擎实现
*/
public class CpuComputeEngine implements ComputeEngine {
private static final Logger logger = LoggerFactory.getLogger(CpuComputeEngine.class);
private final ForkJoinPool forkJoinPool;
private final int parallelThreshold;
public CpuComputeEngine() {
this(Runtime.getRuntime().availableProcessors(), 1000);
}
public CpuComputeEngine(int parallelism, int parallelThreshold) {
this.forkJoinPool = new ForkJoinPool(parallelism);
this.parallelThreshold = parallelThreshold;
}
@Override
public float distance(float[] a, float[] b, DistanceMetric metric) {
return metric.distance(a, b);
}
@Override
public float[] batchDistance(float[] query, float[][] vectors, DistanceMetric metric) {
if (vectors.length < parallelThreshold) {
return sequentialBatchDistance(query, vectors, metric);
} else {
return parallelBatchDistance(query, vectors, metric);
}
}
/**
* 顺序批量计算
*/
private float[] sequentialBatchDistance(float[] query, float[][] vectors, DistanceMetric metric) {
float[] distances = new float[vectors.length];
for (int i = 0; i < vectors.length; i++) {
distances[i] = metric.distance(query, vectors[i]);
}
return distances;
}
/**
* 并行批量计算
*/
private float[] parallelBatchDistance(float[] query, float[][] vectors, DistanceMetric metric) {
BatchDistanceTask task = new BatchDistanceTask(query, vectors, metric, 0, vectors.length);
return forkJoinPool.invoke(task);
}
@Override
public String getName() {
return "CPU";
}
@Override
public boolean isAvailable() {
return true; // CPU总是可用的
}
@Override
public void warmup() {
logger.info("Warming up CPU compute engine...");
// 预热:执行一些计算操作来优化JIT编译
float[] warmupVector1 = new float[128];
float[] warmupVector2 = new float[128];
for (int i = 0; i < warmupVector1.length; i++) {
warmupVector1[i] = (float) Math.random();
warmupVector2[i] = (float) Math.random();
}
// 执行多次计算预热
for (int i = 0; i < 1000; i++) {
dotProduct(warmupVector1, warmupVector2);
}
logger.info("CPU compute engine warmed up");
}
@Override
public void cleanup() {
if (forkJoinPool != null && !forkJoinPool.isShutdown()) {
forkJoinPool.shutdown();
}
}
}
/**
* SIMD优化的向量运算
*/
public class SIMDOptimizedOperations {
/**
* SIMD优化的点积计算
*/
public static float dotProductSIMD(float[] a, float[] b) {
if (a.length != b.length) {
throw new IllegalArgumentException("Vector dimensions must match");
}
int len = a.length;
float sum = 0.0f;
int i = 0;
// 向量化循环:一次处理8个元素
for (; i < len - 7; i += 8) {
// 手动循环展开模拟SIMD操作
sum += a[i] * b[i] + a[i+1] * b[i+1] +
a[i+2] * b[i+2] + a[i+3] * b[i+3] +
a[i+4] * b[i+4] + a[i+5] * b[i+5] +
a[i+6] * b[i+6] + a[i+7] * b[i+7];
}
// 处理剩余元素
for (; i < len; i++) {
sum += a[i] * b[i];
}
return sum;
}
/**
* SIMD优化的欧几里得距离平方
*/
public static float euclideanDistanceSquaredSIMD(float[] a, float[] b) {
if (a.length != b.length) {
throw new IllegalArgumentException("Vector dimensions must match");
}
int len = a.length;
float sum = 0.0f;
int i = 0;
// 向量化循环:一次处理4个元素
for (; i < len - 3; i += 4) {
float diff0 = a[i] - b[i];
float diff1 = a[i+1] - b[i+1];
float diff2 = a[i+2] - b[i+2];
float diff3 = a[i+3] - b[i+3];
sum += diff0 * diff0 + diff1 * diff1 +
diff2 * diff2 + diff3 * diff3;
}
// 处理剩余元素
for (; i < len; i++) {
float diff = a[i] - b[i];
sum += diff * diff;
}
return sum;
}
/**
* 向量加法SIMD优化
*/
public static void vectorAddSIMD(float[] a, float[] b, float[] result) {
int len = a.length;
int i = 0;
// 向量化处理
for (; i < len - 3; i += 4) {
result[i] = a[i] + b[i];
result[i+1] = a[i+1] + b[i+1];
result[i+2] = a[i+2] + b[i+2];
result[i+3] = a[i+3] + b[i+3];
}
// 处理剩余元素
for (; i < len; i++) {
result[i] = a[i] + b[i];
}
}
}
/**
* Fork/Join并行距离计算任务
*/
private class BatchDistanceTask extends RecursiveTask<float[]> {
private final float[] query;
private final float[][] vectors;
private final DistanceMetric metric;
private final int start;
private final int end;
private static final int THRESHOLD = 100; // 分割阈值
public BatchDistanceTask(float[] query, float[][] vectors, DistanceMetric metric,
int start, int end) {
this.query = query;
this.vectors = vectors;
this.metric = metric;
this.start = start;
this.end = end;
}
@Override
protected float[] compute() {
int length = end - start;
if (length <= THRESHOLD) {
// 直接计算
return computeDirectly();
} else {
// 分割任务
int mid = start + length / 2;
BatchDistanceTask leftTask = new BatchDistanceTask(query, vectors, metric, start, mid);
BatchDistanceTask rightTask = new BatchDistanceTask(query, vectors, metric, mid, end);
// 异步执行左侧任务
leftTask.fork();
// 同步执行右侧任务
float[] rightResult = rightTask.compute();
// 等待左侧任务完成
float[] leftResult = leftTask.join();
// 合并结果
return mergeResults(leftResult, rightResult);
}
}
/**
* 直接计算距离
*/
private float[] computeDirectly() {
float[] distances = new float[end - start];
for (int i = start; i < end; i++) {
distances[i - start] = metric.distance(query, vectors[i]);
}
return distances;
}
/**
* 合并计算结果
*/
private float[] mergeResults(float[] left, float[] right) {
float[] result = new float[left.length + right.length];
System.arraycopy(left, 0, result, 0, left.length);
System.arraycopy(right, 0, result, left.length, right.length);
return result;
}
}
package com.jvector.compute;
import jcuda.*;
import jcuda.driver.*;
import jcuda.runtime.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* GPU计算引擎实现
*/
public class GpuComputeEngine implements ComputeEngine {
private static final Logger logger = LoggerFactory.getLogger(GpuComputeEngine.class);
private boolean isInitialized = false;
private boolean isAvailable = false;
private CUcontext context;
private CUmodule module;
private CUfunction distanceKernel;
public GpuComputeEngine() {
initializeGpu();
}
/**
* 初始化GPU环境
*/
private void initializeGpu() {
try {
// 初始化JCuda驱动API
JCudaDriver.setExceptionsEnabled(true);
JCudaDriver.cuInit(0);
// 获取GPU设备
CUdevice device = new CUdevice();
JCudaDriver.cuDeviceGet(device, 0);
// 创建GPU上下文
context = new CUcontext();
JCudaDriver.cuCtxCreate(context, 0, device);
// 加载CUDA模块
loadCudaModule();
isInitialized = true;
isAvailable = true;
logger.info("GPU compute engine initialized successfully");
} catch (Exception e) {
logger.warn("Failed to initialize GPU compute engine: {}", e.getMessage());
isAvailable = false;
}
}
/**
* 加载CUDA模块和核函数
*/
private void loadCudaModule() {
// CUDA源代码
String cudaSource =
"extern "C" __global__ void computeDistances(" +
" float* query," +
" float* vectors," +
" float* distances," +
" int vectorCount," +
" int dimension)" +
"{" +
" int idx = blockIdx.x * blockDim.x + threadIdx.x;" +
" if (idx < vectorCount) {" +
" float sum = 0.0f;" +
" for (int i = 0; i < dimension; i++) {" +
" float diff = query[i] - vectors[idx * dimension + i];" +
" sum += diff * diff;" +
" }" +
" distances[idx] = sqrtf(sum);" +
" }" +
"}";
// 编译CUDA代码
try {
// 这里应该使用NVRTC编译CUDA代码
// 为简化示例,假设已经有编译好的PTX文件
module = new CUmodule();
JCudaDriver.cuModuleLoadData(module, cudaSource.getBytes());
// 获取核函数
distanceKernel = new CUfunction();
JCudaDriver.cuModuleGetFunction(distanceKernel, module, "computeDistances");
} catch (Exception e) {
throw new RuntimeException("Failed to load CUDA module", e);
}
}
@Override
public float[] batchDistance(float[] query, float[][] vectors, DistanceMetric metric) {
if (!isAvailable) {
throw new IllegalStateException("GPU compute engine is not available");
}
return computeDistancesOnGpu(query, vectors);
}
/**
* 在GPU上计算距离
*/
private float[] computeDistancesOnGpu(float[] query, float[][] vectors) {
int vectorCount = vectors.length;
int dimension = query.length;
// 分配GPU内存
CUdeviceptr d_query = new CUdeviceptr();
CUdeviceptr d_vectors = new CUdeviceptr();
CUdeviceptr d_distances = new CUdeviceptr();
try {
// 分配内存
JCudaDriver.cuMemAlloc(d_query, query.length * Sizeof.FLOAT);
JCudaDriver.cuMemAlloc(d_vectors, vectorCount * dimension * Sizeof.FLOAT);
JCudaDriver.cuMemAlloc(d_distances, vectorCount * Sizeof.FLOAT);
// 拷贝数据到GPU
JCudaDriver.cuMemcpyHtoD(d_query, Pointer.to(query), query.length * Sizeof.FLOAT);
// 展平向量数组
float[] flatVectors = flattenVectors(vectors);
JCudaDriver.cuMemcpyHtoD(d_vectors, Pointer.to(flatVectors), flatVectors.length * Sizeof.FLOAT);
// 设置核函数参数
Pointer kernelParameters = Pointer.to(
Pointer.to(d_query),
Pointer.to(d_vectors),
Pointer.to(d_distances),
Pointer.to(new int[]{vectorCount}),
Pointer.to(new int[]{dimension})
);
// 计算网格和块大小
int blockSize = 256;
int gridSize = (vectorCount + blockSize - 1) / blockSize;
// 启动核函数
JCudaDriver.cuLaunchKernel(
distanceKernel,
gridSize, 1, 1, // 网格大小
blockSize, 1, 1, // 块大小
0, null, // 共享内存和流
kernelParameters, null // 参数
);
// 等待计算完成
JCudaDriver.cuCtxSynchronize();
// 拷贝结果回CPU
float[] distances = new float[vectorCount];
JCudaDriver.cuMemcpyDtoH(Pointer.to(distances), d_distances, vectorCount * Sizeof.FLOAT);
return distances;
} finally {
// 释放GPU内存
JCudaDriver.cuMemFree(d_query);
JCudaDriver.cuMemFree(d_vectors);
JCudaDriver.cuMemFree(d_distances);
}
}
/**
* 展平二维向量数组
*/
private float[] flattenVectors(float[][] vectors) {
int totalSize = vectors.length * vectors[0].length;
float[] flat = new float[totalSize];
int index = 0;
for (float[] vector : vectors) {
System.arraycopy(vector, 0, flat, index, vector.length);
index += vector.length;
}
return flat;
}
@Override
public String getName() {
return "GPU";
}
@Override
public boolean isAvailable() {
return isAvailable;
}
@Override
public void cleanup() {
if (isInitialized) {
try {
if (module != null) {
JCudaDriver.cuModuleUnload(module);
}
if (context != null) {
JCudaDriver.cuCtxDestroy(context);
}
} catch (Exception e) {
logger.warn("Error during GPU cleanup: {}", e.getMessage());
}
}
}
}
/**
* GPU内存管理器
*/
public class GpuMemoryManager {
private static final Logger logger = LoggerFactory.getLogger(GpuMemoryManager.class);
private final Map<String, CUdeviceptr> memoryPool = new ConcurrentHashMap<>();
private final AtomicLong totalAllocated = new AtomicLong(0);
private final long maxMemory;
public GpuMemoryManager() {
this.maxMemory = getGpuMemoryInfo();
}
/**
* 分配GPU内存
*/
public CUdeviceptr allocate(String key, long size) {
if (totalAllocated.get() + size > maxMemory * 0.8) { // 保留20%空间
freeUnusedMemory();
}
CUdeviceptr ptr = new CUdeviceptr();
JCudaDriver.cuMemAlloc(ptr, size);
memoryPool.put(key, ptr);
totalAllocated.addAndGet(size);
logger.debug("Allocated {} bytes GPU memory for key: {}", size, key);
return ptr;
}
/**
* 释放GPU内存
*/
public void free(String key, long size) {
CUdeviceptr ptr = memoryPool.remove(key);
if (ptr != null) {
JCudaDriver.cuMemFree(ptr);
totalAllocated.addAndGet(-size);
logger.debug("Freed {} bytes GPU memory for key: {}", size, key);
}
}
/**
* 获取GPU内存信息
*/
private long getGpuMemoryInfo() {
long[] free = new long[1];
long[] total = new long[1];
JCudaDriver.cuMemGetInfo(free, total);
return total[0];
}
/**
* 释放未使用的内存
*/
private void freeUnusedMemory() {
// 实现LRU或其他策略释放内存
}
}
/**
* 自适应计算引擎选择器
*/
public class AdaptiveComputeEngine implements ComputeEngine {
private final ComputeEngine cpuEngine;
private final ComputeEngine gpuEngine;
private final PerformanceProfiler profiler;
// 性能阈值
private static final int GPU_THRESHOLD_VECTOR_COUNT = 1000;
private static final int GPU_THRESHOLD_DIMENSION = 128;
public AdaptiveComputeEngine() {
this.cpuEngine = new CpuComputeEngine();
this.gpuEngine = new GpuComputeEngine();
this.profiler = new PerformanceProfiler();
}
@Override
public float[] batchDistance(float[] query, float[][] vectors, DistanceMetric metric) {
// 选择最优计算引擎
ComputeEngine selectedEngine = selectEngine(query.length, vectors.length);
long startTime = System.nanoTime();
float[] result = selectedEngine.batchDistance(query, vectors, metric);
long duration = System.nanoTime() - startTime;
// 记录性能数据
profiler.recordPerformance(selectedEngine.getName(), vectors.length, query.length, duration);
return result;
}
/**
* 选择计算引擎
*/
private ComputeEngine selectEngine(int dimension, int vectorCount) {
// 如果GPU不可用,使用CPU
if (!gpuEngine.isAvailable()) {
return cpuEngine;
}
// 基于历史性能数据选择
if (profiler.hasEnoughData()) {
return profiler.recommendEngine(dimension, vectorCount);
}
// 基于经验规则选择
if (vectorCount >= GPU_THRESHOLD_VECTOR_COUNT && dimension >= GPU_THRESHOLD_DIMENSION) {
return gpuEngine;
} else {
return cpuEngine;
}
}
@Override
public String getName() {
return "Adaptive";
}
@Override
public boolean isAvailable() {
return cpuEngine.isAvailable(); // CPU总是可用
}
}
/**
* 计算引擎性能分析器
*/
public class PerformanceProfiler {
private final Map<String, List<PerformanceRecord>> records = new ConcurrentHashMap<>();
private final int maxRecords = 1000;
public void recordPerformance(String engineName, int vectorCount, int dimension, long duration) {
PerformanceRecord record = new PerformanceRecord(vectorCount, dimension, duration, System.currentTimeMillis());
records.computeIfAbsent(engineName, k -> new ArrayList<>()).add(record);
// 限制记录数量
List<PerformanceRecord> engineRecords = records.get(engineName);
if (engineRecords.size() > maxRecords) {
engineRecords.remove(0); // 移除最老的记录
}
}
public ComputeEngine recommendEngine(int dimension, int vectorCount) {
double cpuScore = calculatePerformanceScore("CPU", dimension, vectorCount);
double gpuScore = calculatePerformanceScore("GPU", dimension, vectorCount);
return cpuScore < gpuScore ? cpuEngine : gpuEngine;
}
private double calculatePerformanceScore(String engineName, int dimension, int vectorCount) {
List<PerformanceRecord> engineRecords = records.get(engineName);
if (engineRecords == null || engineRecords.isEmpty()) {
return Double.MAX_VALUE;
}
// 找到相似的历史记录
return engineRecords.stream()
.filter(r -> Math.abs(r.dimension - dimension) <= dimension * 0.1 &&
Math.abs(r.vectorCount - vectorCount) <= vectorCount * 0.1)
.mapToLong(r -> r.duration)
.average()
.orElse(Double.MAX_VALUE);
}
public boolean hasEnoughData() {
return records.values().stream().mapToInt(List::size).sum() >= 100;
}
private static class PerformanceRecord {
final int vectorCount;
final int dimension;
final long duration;
final long timestamp;
PerformanceRecord(int vectorCount, int dimension, long duration, long timestamp) {
this.vectorCount = vectorCount;
this.dimension = dimension;
this.duration = duration;
this.timestamp = timestamp;
}
}
}
/**
* 计算引擎基准测试
*/
public class ComputeEngineBenchmark {
private static final Logger logger = LoggerFactory.getLogger(ComputeEngineBenchmark.class);
@Test
public void benchmarkComputeEngines() {
int[] dimensions = {64, 128, 256, 512, 1024};
int[] vectorCounts = {100, 1000, 10000, 100000};
ComputeEngine cpuEngine = new CpuComputeEngine();
ComputeEngine gpuEngine = new GpuComputeEngine();
// 预热
warmupEngines(cpuEngine, gpuEngine);
for (int dim : dimensions) {
for (int count : vectorCounts) {
benchmarkConfiguration(cpuEngine, gpuEngine, dim, count);
}
}
}
private void benchmarkConfiguration(ComputeEngine cpuEngine, ComputeEngine gpuEngine,
int dimension, int vectorCount) {
// 生成测试数据
float[] query = generateRandomVector(dimension);
float[][] vectors = generateRandomVectors(vectorCount, dimension);
DistanceMetric metric = new EuclideanDistance();
// 测试CPU引擎
long cpuTime = benchmarkEngine(cpuEngine, query, vectors, metric, "CPU");
// 测试GPU引擎
long gpuTime = 0;
if (gpuEngine.isAvailable()) {
gpuTime = benchmarkEngine(gpuEngine, query, vectors, metric, "GPU");
}
// 输出结果
logger.info("Dimension: {}, Vectors: {}, CPU: {}ms, GPU: {}ms, Speedup: {}x",
dimension, vectorCount, cpuTime / 1_000_000, gpuTime / 1_000_000,
gpuTime > 0 ? (double) cpuTime / gpuTime : "N/A");
}
private long benchmarkEngine(ComputeEngine engine, float[] query, float[][] vectors,
DistanceMetric metric, String name) {
int warmupRuns = 5;
int benchmarkRuns = 10;
// 预热运行
for (int i = 0; i < warmupRuns; i++) {
engine.batchDistance(query, vectors, metric);
}
// 基准测试
long totalTime = 0;
for (int i = 0; i < benchmarkRuns; i++) {
long startTime = System.nanoTime();
engine.batchDistance(query, vectors, metric);
totalTime += System.nanoTime() - startTime;
}
return totalTime / benchmarkRuns;
}
private void warmupEngines(ComputeEngine... engines) {
float[] warmupQuery = generateRandomVector(128);
float[][] warmupVectors = generateRandomVectors(1000, 128);
DistanceMetric metric = new EuclideanDistance();
for (ComputeEngine engine : engines) {
if (engine.isAvailable()) {
engine.warmup();
for (int i = 0; i < 10; i++) {
engine.batchDistance(warmupQuery, warmupVectors, metric);
}
}
}
}
}
本章详细介绍了CPU和GPU计算引擎的设计与实现:
计算引擎是向量搜索系统性能的关键,通过CPU和GPU的协同工作,可以显著提升系统的计算效率。
思考题: