
作者:HOS(安全风信子) 日期:2026-01-19 来源平台:GitHub 摘要: 本文深入剖析vLLM框架中的Worker / Driver架构,作为分布式推理的核心,它实现了高效的任务协调与执行机制。通过分析Driver的协调逻辑、Worker的执行流程、Ray Actor通信机制以及故障恢复策略,结合真实源码示例和性能数据,揭示vLLM如何实现大规模分布式推理。文章还探讨了多节点扩展、动态资源分配以及与Ray生态的深度集成,为推理工程师提供全面的Worker / Driver架构理解与实践指南。
在vLLM框架中,Worker / Driver架构是实现分布式推理的核心组件,它直接影响系统的可扩展性、可靠性和性能:
随着模型规模的不断增长和推理请求的日益复杂,Worker / Driver架构面临着多重挑战:
vLLM的Worker / Driver架构在设计中引入了多项创新:
vLLM采用Ray Actor作为Worker / Driver之间的通信机制,实现了高效的分布式通信:
vLLM支持动态Worker管理,根据负载情况自动调整Worker数量:
vLLM采用分层调度策略,提高大规模系统的调度效率:
vLLM实现了完善的故障自动恢复机制:
vLLM支持资源感知调度,根据Worker的资源使用情况智能分配任务:
vLLM的Worker / Driver架构采用了分层设计,确保灵活性和扩展性:
#mermaid-svg-PneT9iOhtAlR2N8K{font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}@keyframes edge-animation-frame{from{stroke-dashoffset:0;}}@keyframes dash{to{stroke-dashoffset:0;}}#mermaid-svg-PneT9iOhtAlR2N8K .edge-animation-slow{stroke-dasharray:9,5!important;stroke-dashoffset:900;animation:dash 50s linear infinite;stroke-linecap:round;}#mermaid-svg-PneT9iOhtAlR2N8K .edge-animation-fast{stroke-dasharray:9,5!important;stroke-dashoffset:900;animation:dash 20s linear infinite;stroke-linecap:round;}#mermaid-svg-PneT9iOhtAlR2N8K .error-icon{fill:#552222;}#mermaid-svg-PneT9iOhtAlR2N8K .error-text{fill:#552222;stroke:#552222;}#mermaid-svg-PneT9iOhtAlR2N8K .edge-thickness-normal{stroke-width:1px;}#mermaid-svg-PneT9iOhtAlR2N8K .edge-thickness-thick{stroke-width:3.5px;}#mermaid-svg-PneT9iOhtAlR2N8K .edge-pattern-solid{stroke-dasharray:0;}#mermaid-svg-PneT9iOhtAlR2N8K .edge-thickness-invisible{stroke-width:0;fill:none;}#mermaid-svg-PneT9iOhtAlR2N8K .edge-pattern-dashed{stroke-dasharray:3;}#mermaid-svg-PneT9iOhtAlR2N8K .edge-pattern-dotted{stroke-dasharray:2;}#mermaid-svg-PneT9iOhtAlR2N8K .marker{fill:#333333;stroke:#333333;}#mermaid-svg-PneT9iOhtAlR2N8K .marker.cross{stroke:#333333;}#mermaid-svg-PneT9iOhtAlR2N8K svg{font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;}#mermaid-svg-PneT9iOhtAlR2N8K p{margin:0;}#mermaid-svg-PneT9iOhtAlR2N8K g.classGroup text{fill:#9370DB;stroke:none;font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:10px;}#mermaid-svg-PneT9iOhtAlR2N8K g.classGroup text .title{font-weight:bolder;}#mermaid-svg-PneT9iOhtAlR2N8K .nodeLabel,#mermaid-svg-PneT9iOhtAlR2N8K .edgeLabel{color:#131300;}#mermaid-svg-PneT9iOhtAlR2N8K .edgeLabel .label rect{fill:#ECECFF;}#mermaid-svg-PneT9iOhtAlR2N8K .label text{fill:#131300;}#mermaid-svg-PneT9iOhtAlR2N8K .labelBkg{background:#ECECFF;}#mermaid-svg-PneT9iOhtAlR2N8K .edgeLabel .label span{background:#ECECFF;}#mermaid-svg-PneT9iOhtAlR2N8K .classTitle{font-weight:bolder;}#mermaid-svg-PneT9iOhtAlR2N8K .node rect,#mermaid-svg-PneT9iOhtAlR2N8K .node circle,#mermaid-svg-PneT9iOhtAlR2N8K .node ellipse,#mermaid-svg-PneT9iOhtAlR2N8K .node polygon,#mermaid-svg-PneT9iOhtAlR2N8K .node path{fill:#ECECFF;stroke:#9370DB;stroke-width:1px;}#mermaid-svg-PneT9iOhtAlR2N8K .divider{stroke:#9370DB;stroke-width:1;}#mermaid-svg-PneT9iOhtAlR2N8K g.clickable{cursor:pointer;}#mermaid-svg-PneT9iOhtAlR2N8K g.classGroup rect{fill:#ECECFF;stroke:#9370DB;}#mermaid-svg-PneT9iOhtAlR2N8K g.classGroup line{stroke:#9370DB;stroke-width:1;}#mermaid-svg-PneT9iOhtAlR2N8K .classLabel .box{stroke:none;stroke-width:0;fill:#ECECFF;opacity:0.5;}#mermaid-svg-PneT9iOhtAlR2N8K .classLabel .label{fill:#9370DB;font-size:10px;}#mermaid-svg-PneT9iOhtAlR2N8K .relation{stroke:#333333;stroke-width:1;fill:none;}#mermaid-svg-PneT9iOhtAlR2N8K .dashed-line{stroke-dasharray:3;}#mermaid-svg-PneT9iOhtAlR2N8K .dotted-line{stroke-dasharray:1 2;}#mermaid-svg-PneT9iOhtAlR2N8K #compositionStart,#mermaid-svg-PneT9iOhtAlR2N8K .composition{fill:#333333!important;stroke:#333333!important;stroke-width:1;}#mermaid-svg-PneT9iOhtAlR2N8K #compositionEnd,#mermaid-svg-PneT9iOhtAlR2N8K .composition{fill:#333333!important;stroke:#333333!important;stroke-width:1;}#mermaid-svg-PneT9iOhtAlR2N8K #dependencyStart,#mermaid-svg-PneT9iOhtAlR2N8K .dependency{fill:#333333!important;stroke:#333333!important;stroke-width:1;}#mermaid-svg-PneT9iOhtAlR2N8K #dependencyStart,#mermaid-svg-PneT9iOhtAlR2N8K .dependency{fill:#333333!important;stroke:#333333!important;stroke-width:1;}#mermaid-svg-PneT9iOhtAlR2N8K #extensionStart,#mermaid-svg-PneT9iOhtAlR2N8K .extension{fill:transparent!important;stroke:#333333!important;stroke-width:1;}#mermaid-svg-PneT9iOhtAlR2N8K #extensionEnd,#mermaid-svg-PneT9iOhtAlR2N8K .extension{fill:transparent!important;stroke:#333333!important;stroke-width:1;}#mermaid-svg-PneT9iOhtAlR2N8K #aggregationStart,#mermaid-svg-PneT9iOhtAlR2N8K .aggregation{fill:transparent!important;stroke:#333333!important;stroke-width:1;}#mermaid-svg-PneT9iOhtAlR2N8K #aggregationEnd,#mermaid-svg-PneT9iOhtAlR2N8K .aggregation{fill:transparent!important;stroke:#333333!important;stroke-width:1;}#mermaid-svg-PneT9iOhtAlR2N8K #lollipopStart,#mermaid-svg-PneT9iOhtAlR2N8K .lollipop{fill:#ECECFF!important;stroke:#333333!important;stroke-width:1;}#mermaid-svg-PneT9iOhtAlR2N8K #lollipopEnd,#mermaid-svg-PneT9iOhtAlR2N8K .lollipop{fill:#ECECFF!important;stroke:#333333!important;stroke-width:1;}#mermaid-svg-PneT9iOhtAlR2N8K .edgeTerminals{font-size:11px;line-height:initial;}#mermaid-svg-PneT9iOhtAlR2N8K .classTitleText{text-anchor:middle;font-size:18px;fill:#333;}#mermaid-svg-PneT9iOhtAlR2N8K .label-icon{display:inline-block;height:1em;overflow:visible;vertical-align:-0.125em;}#mermaid-svg-PneT9iOhtAlR2N8K .node .label-icon path{fill:currentColor;stroke:revert;stroke-width:revert;}#mermaid-svg-PneT9iOhtAlR2N8K :root{--mermaid-font-family:"trebuchet ms",verdana,arial,sans-serif;}
Driver
+init()
+start()
+stop()
+submit_task()
+get_result()
+handle_worker_failure()
+scale_workers()
WorkerManager
+add_worker()
+remove_worker()
+get_worker()
+get_all_workers()
+monitor_workers()
+balance_load()
TaskScheduler
+schedule_task()
+prioritize_tasks()
+batch_tasks()
+retry_failed_tasks()
+update_schedule()
Worker
+init()
+start()
+stop()
+execute_task()
+send_heartbeat()
+update_status()
+get_resource_usage()
RayActorWrapper
+create_actor()
+destroy_actor()
+send_message()
+receive_message()
+get_actor_status()
ResourceMonitor
+monitor_gpu()
+monitor_cpu()
+monitor_memory()
+monitor_network()
+get_resource_stats()
FailureDetector
+detect_failure()
+handle_failure()
+recover_task()
+update_failure_stats()
架构解析:
# vllm/driver/driver.py
import ray
from typing import Dict, List, Any, Optional
from vllm.worker_manager import WorkerManager
from vllm.task_scheduler import TaskScheduler
from vllm.failure_detector import FailureDetector
class Driver:
def __init__(self, config: Dict[str, Any]):
self.config = config
self.worker_manager = None
self.task_scheduler = None
self.failure_detector = None
self.tasks = {} # task_id -> Task
self.results = {} # task_id -> Result
self.is_running = False
def start(self):
"""启动Driver"""
# 初始化Ray
if not ray.is_initialized():
ray.init(
address=self.config.get('ray_address', 'auto'),
runtime_env=self.config.get('ray_runtime_env', {})
)
# 初始化组件
self.worker_manager = WorkerManager(self.config)
self.task_scheduler = TaskScheduler(self.config)
self.failure_detector = FailureDetector(self.config)
# 启动组件
self.worker_manager.start()
self.task_scheduler.start()
self.failure_detector.start()
# 创建初始Worker
initial_worker_count = self.config.get('initial_worker_count', 1)
for _ in range(initial_worker_count):
self.worker_manager.add_worker()
self.is_running = True
print(f"Driver started with {initial_worker_count} workers")
def stop(self):
"""停止Driver"""
if not self.is_running:
return
# 停止组件
self.failure_detector.stop()
self.task_scheduler.stop()
self.worker_manager.stop()
# 关闭Ray
if ray.is_initialized():
ray.shutdown()
self.is_running = False
print("Driver stopped")
def submit_task(self, task: Dict[str, Any]) -> str:
"""提交推理任务"""
if not self.is_running:
raise RuntimeError("Driver is not running")
# 生成任务ID
task_id = f"task_{len(self.tasks) + 1}"
# 创建任务对象
from vllm.task import Task
task_obj = Task(task_id, task)
self.tasks[task_id] = task_obj
# 提交给调度器
self.task_scheduler.submit_task(task_obj)
return task_id
def get_result(self, task_id: str, timeout: Optional[float] = None) -> Any:
"""获取任务结果"""
if task_id not in self.tasks:
raise ValueError(f"Task {task_id} not found")
# 检查结果是否已就绪
if task_id in self.results:
return self.results[task_id]
# 等待结果(简化实现)
import time
start_time = time.time()
while True:
if task_id in self.results:
return self.results[task_id]
if timeout is not None and time.time() - start_time > timeout:
raise TimeoutError(f"Task {task_id} timed out")
time.sleep(0.1)
def handle_task_completion(self, task_id: str, result: Any):
"""处理任务完成"""
self.results[task_id] = result
del self.tasks[task_id]
def scale_workers(self, target_count: int):
"""调整Worker数量"""
current_count = len(self.worker_manager.get_all_workers())
if target_count > current_count:
# 扩容
for _ in range(target_count - current_count):
self.worker_manager.add_worker()
elif target_count < current_count:
# 缩容
for _ in range(current_count - target_count):
self.worker_manager.remove_worker()代码解析:
# vllm/worker_manager.py
import ray
from typing import Dict, List, Any, Optional
import uuid
import time
class WorkerManager:
def __init__(self, config: Dict[str, Any]):
self.config = config
self.workers = {} # worker_id -> WorkerActorHandle
self.worker_resources = {} # worker_id -> ResourceStats
self.worker_tasks = {} # worker_id -> List[Task]
self.worker_monitor_thread = None
self.is_running = False
def start(self):
"""启动WorkerManager"""
self.is_running = True
# 启动Worker监控线程
import threading
self.worker_monitor_thread = threading.Thread(target=self._monitor_workers)
self.worker_monitor_thread.daemon = True
self.worker_monitor_thread.start()
def stop(self):
"""停止WorkerManager"""
self.is_running = False
if self.worker_monitor_thread:
self.worker_monitor_thread.join()
# 销毁所有Worker
for worker_id in list(self.workers.keys()):
self.remove_worker(worker_id)
def add_worker(self) -> str:
"""添加一个新Worker"""
# 生成Worker ID
worker_id = f"worker_{uuid.uuid4()}"
# 从vllm.worker导入Worker类
from vllm.worker import Worker
# 创建Ray Actor
worker_actor = ray.remote(Worker).remote(
worker_id=worker_id,
config=self.config
)
# 启动Worker
ray.get(worker_actor.start.remote())
# 添加到Worker列表
self.workers[worker_id] = worker_actor
self.worker_resources[worker_id] = {}
self.worker_tasks[worker_id] = []
print(f"Added worker: {worker_id}")
return worker_id
def remove_worker(self, worker_id: Optional[str] = None) -> bool:
"""移除一个Worker"""
if not worker_id:
# 移除最空闲的Worker
if not self.workers:
return False
worker_id = min(
self.workers.keys(),
key=lambda w: len(self.worker_tasks[w])
)
if worker_id not in self.workers:
return False
# 停止Worker
worker_actor = self.workers[worker_id]
try:
ray.get(worker_actor.stop.remote(), timeout=5.0)
except Exception as e:
print(f"Error stopping worker {worker_id}: {e}")
# 销毁Actor
ray.kill(worker_actor)
# 从列表中移除
del self.workers[worker_id]
del self.worker_resources[worker_id]
del self.worker_tasks[worker_id]
print(f"Removed worker: {worker_id}")
return True
def get_worker(self, task: Any) -> Optional[str]:
"""获取适合执行任务的Worker"""
if not self.workers:
return None
# 简单实现:选择任务最少的Worker
return min(
self.workers.keys(),
key=lambda w: len(self.worker_tasks[w])
)
def get_all_workers(self) -> List[str]:
"""获取所有Worker ID"""
return list(self.workers.keys())
def _monitor_workers(self):
"""监控Worker状态"""
while self.is_running:
time.sleep(self.config.get('worker_monitor_interval', 5.0))
for worker_id, worker_actor in list(self.workers.items()):
try:
# 获取Worker资源使用情况
resources = ray.get(worker_actor.get_resource_usage.remote())
self.worker_resources[worker_id] = resources
# 发送心跳
ray.get(worker_actor.send_heartbeat.remote())
except Exception as e:
print(f"Worker {worker_id} failed: {e}")
# 移除故障Worker
self.remove_worker(worker_id)
# 添加新Worker替换
self.add_worker()
def balance_load(self):
"""负载均衡"""
# 简化实现:将任务从繁忙Worker迁移到空闲Worker
if len(self.workers) <= 1:
return
# 按任务数量排序Worker
sorted_workers = sorted(
self.workers.keys(),
key=lambda w: len(self.worker_tasks[w])
)
# 迁移任务(简化实现)
# ...代码解析:
# vllm/worker.py
import torch
from typing import Dict, Any, Optional
import time
class Worker:
def __init__(self, worker_id: str, config: Dict[str, Any]):
self.worker_id = worker_id
self.config = config
self.model = None
self.tokenizer = None
self.is_running = False
self.heartbeat_time = time.time()
self.resource_usage = {
'gpu_memory_used': 0,
'gpu_memory_total': 0,
'gpu_utilization': 0,
'cpu_utilization': 0,
'memory_used': 0,
'memory_total': 0
}
def start(self):
"""启动Worker"""
# 加载模型
self._load_model()
# 初始化资源监控
self._init_resource_monitor()
self.is_running = True
print(f"Worker {self.worker_id} started")
def stop(self):
"""停止Worker"""
if not self.is_running:
return
# 释放模型资源
if self.model:
del self.model
self.model = None
if self.tokenizer:
del self.tokenizer
self.tokenizer = None
# 释放GPU内存
if torch.cuda.is_available():
torch.cuda.empty_cache()
self.is_running = False
print(f"Worker {self.worker_id} stopped")
def _load_model(self):
"""加载模型"""
from vllm.model_loader import ModelLoader
model_loader = ModelLoader(self.config)
loaded = model_loader.load()
self.model = loaded['model']
self.tokenizer = loaded['tokenizer']
# 设置模型为评估模式
self.model.eval()
# 预热模型
self._warmup_model()
def _warmup_model(self):
"""预热模型"""
print(f"Warming up model for worker {self.worker_id}...")
# 生成一个简单的输入
input_ids = torch.tensor([[1, 2, 3, 4, 5]], device=self.model.device)
# 执行一次前向传播
with torch.no_grad():
self.model(input_ids)
print(f"Model warmed up for worker {self.worker_id}")
def _init_resource_monitor(self):
"""初始化资源监控"""
# 简化实现:获取初始资源使用情况
if torch.cuda.is_available():
device = self.model.device
self.resource_usage['gpu_memory_total'] = torch.cuda.get_device_properties(device).total_memory
self.resource_usage['gpu_memory_used'] = torch.cuda.memory_allocated(device)
def execute_task(self, task: Dict[str, Any]) -> Dict[str, Any]:
"""执行推理任务"""
if not self.is_running:
raise RuntimeError(f"Worker {self.worker_id} is not running")
# 更新心跳时间
self.heartbeat_time = time.time()
# 解析任务
input_ids = task.get('input_ids')
max_new_tokens = task.get('max_new_tokens', 100)
temperature = task.get('temperature', 0.7)
top_p = task.get('top_p', 0.95)
# 转换为张量
input_ids = torch.tensor(input_ids, device=self.model.device)
# 执行推理
with torch.no_grad():
outputs = self.model.generate(
input_ids=input_ids,
max_new_tokens=max_new_tokens,
temperature=temperature,
top_p=top_p
)
# 解码结果
generated_text = self.tokenizer.decode(
outputs[0],
skip_special_tokens=True
)
# 更新资源使用情况
if torch.cuda.is_available():
device = self.model.device
self.resource_usage['gpu_memory_used'] = torch.cuda.memory_allocated(device)
# 构造结果
result = {
'generated_text': generated_text,
'input_ids': input_ids.tolist(),
'output_ids': outputs[0].tolist(),
'worker_id': self.worker_id
}
return result
def send_heartbeat(self) -> Dict[str, Any]:
"""发送心跳"""
return {
'worker_id': self.worker_id,
'timestamp': time.time(),
'is_running': self.is_running
}
def get_resource_usage(self) -> Dict[str, Any]:
"""获取资源使用情况"""
# 更新资源使用情况
if torch.cuda.is_available():
device = self.model.device
self.resource_usage['gpu_memory_used'] = torch.cuda.memory_allocated(device)
# 简化实现:GPU利用率需要额外的库
# 简化实现:CPU和内存使用情况需要额外的库
return self.resource_usage代码解析:
# vllm/task_scheduler.py
from typing import Dict, List, Any, Optional
import time
import threading
class TaskScheduler:
def __init__(self, config: Dict[str, Any]):
self.config = config
self.tasks = [] # 待执行的任务列表
self.running_tasks = {} # task_id -> worker_id
self.task_lock = threading.Lock()
self.is_running = False
self.schedule_thread = None
def start(self):
"""启动调度器"""
self.is_running = True
# 启动调度线程
self.schedule_thread = threading.Thread(target=self._schedule_loop)
self.schedule_thread.daemon = True
self.schedule_thread.start()
print("TaskScheduler started")
def stop(self):
"""停止调度器"""
self.is_running = False
if self.schedule_thread:
self.schedule_thread.join()
print("TaskScheduler stopped")
def submit_task(self, task: Any):
"""提交任务"""
with self.task_lock:
self.tasks.append(task)
print(f"Submitted task: {task.task_id}")
def _schedule_loop(self):
"""调度循环"""
while self.is_running:
time.sleep(self.config.get('schedule_interval', 0.1))
self._schedule_tasks()
def _schedule_tasks(self):
"""调度任务"""
from vllm.driver import Driver
with self.task_lock:
if not self.tasks:
return
# 获取Driver实例(简化实现)
driver = Driver.get_instance()
if not driver:
return
# 获取可用Worker
workers = driver.worker_manager.get_all_workers()
if not workers:
return
# 批处理优化:将相似任务合并
batched_tasks = self._batch_tasks(self.tasks)
# 调度任务
for batch in batched_tasks:
# 选择合适的Worker
worker_id = driver.worker_manager.get_worker(batch)
if not worker_id:
continue
# 分配任务
self._assign_task(batch, worker_id, driver)
def _batch_tasks(self, tasks: List[Any]) -> List[List[Any]]:
"""批处理任务"""
# 简化实现:将所有任务合并为一个批次
# 实际实现会根据任务类型、模型大小等因素进行智能批处理
batch_size = self.config.get('batch_size', 32)
return [tasks[i:i+batch_size] for i in range(0, len(tasks), batch_size)]
def _assign_task(self, batch: List[Any], worker_id: str, driver: Any):
"""分配任务给Worker"""
# 获取Worker Actor
worker_actor = driver.worker_manager.workers[worker_id]
# 提交任务到Worker
def task_done(future):
"""任务完成回调"""
try:
results = future.result()
# 处理结果
for task, result in zip(batch, results):
driver.handle_task_completion(task.task_id, result)
with self.task_lock:
if task in self.tasks:
self.tasks.remove(task)
if task.task_id in self.running_tasks:
del self.running_tasks[task.task_id]
except Exception as e:
print(f"Task execution failed: {e}")
# 重试任务
for task in batch:
with self.task_lock:
if task not in self.tasks:
self.tasks.append(task)
if task.task_id in self.running_tasks:
del self.running_tasks[task.task_id]
# 异步执行任务
from ray.util import ActorPool
# 简化实现:实际会使用更高效的异步通信方式
task_futures = []
for task in batch:
task_future = worker_actor.execute_task.remote(task.data)
task_future.add_done_callback(task_done)
task_futures.append(task_future)
self.running_tasks[task.task_id] = worker_id
print(f"Assigned {len(batch)} tasks to worker {worker_id}")
def prioritize_tasks(self):
"""任务优先级排序"""
# 简化实现:根据任务优先级排序
with self.task_lock:
self.tasks.sort(key=lambda t: t.priority, reverse=True)代码解析:
# vllm/ray_actor_wrapper.py
import ray
from typing import Dict, Any, Optional
class RayActorWrapper:
def __init__(self, config: Dict[str, Any]):
self.config = config
def create_actor(self, cls: type, *args, **kwargs) -> Any:
"""创建Ray Actor"""
# 设置Actor资源需求
resources = {
'CPU': self.config.get('actor_cpu', 1),
'GPU': self.config.get('actor_gpu', 0)
}
# 创建Actor
actor = ray.remote(
num_cpus=resources['CPU'],
num_gpus=resources['GPU'],
# 启用对象存储优化
object_store_memory=self.config.get('object_store_memory', 0)
)(cls).remote(*args, **kwargs)
return actor
def destroy_actor(self, actor: Any):
"""销毁Ray Actor"""
ray.kill(actor)
def send_message(self, actor: Any, method: str, *args, **kwargs) -> Any:
"""发送消息到Actor"""
# 简化实现:直接调用Actor方法
return getattr(actor, method).remote(*args, **kwargs)
def receive_message(self, future: Any, timeout: Optional[float] = None) -> Any:
"""接收消息结果"""
return ray.get(future, timeout=timeout)
def get_actor_status(self, actor: Any) -> Dict[str, Any]:
"""获取Actor状态"""
# 简化实现:通过心跳检测Actor状态
try:
# 调用心跳方法
result = ray.get(actor.send_heartbeat.remote(), timeout=2.0)
return {
'status': 'healthy',
'heartbeat': result
}
except Exception as e:
return {
'status': 'unhealthy',
'error': str(e)
}
def create_actor_pool(self, cls: type, actor_count: int, *args, **kwargs) -> Any:
"""创建Actor池"""
from ray.util import ActorPool
actors = [
self.create_actor(cls, *args, **kwargs)
for _ in range(actor_count)
]
return ActorPool(actors)代码解析:
vLLM的Worker / Driver工作流程包括任务提交、调度、执行和结果返回等阶段:

工作流程解析:
# vllm/failure_detector.py
from typing import Dict, Any, Optional
import time
import threading
class FailureDetector:
def __init__(self, config: Dict[str, Any]):
self.config = config
self.heartbeat_times = {} # worker_id -> last_heartbeat_time
self.failure_count = {} # worker_id -> failure_count
self.is_running = False
self.detection_thread = None
self.heartbeat_timeout = self.config.get('heartbeat_timeout', 10.0)
def start(self):
"""启动故障检测器"""
self.is_running = True
# 启动检测线程
self.detection_thread = threading.Thread(target=self._detection_loop)
self.detection_thread.daemon = True
self.detection_thread.start()
print("FailureDetector started")
def stop(self):
"""停止故障检测器"""
self.is_running = False
if self.detection_thread:
self.detection_thread.join()
print("FailureDetector stopped")
def update_heartbeat(self, worker_id: str):
"""更新Worker心跳时间"""
self.heartbeat_times[worker_id] = time.time()
# 重置故障计数
self.failure_count[worker_id] = 0
def _detection_loop(self):
"""检测循环"""
while self.is_running:
time.sleep(self.config.get('detection_interval', 1.0))
self._detect_failures()
def _detect_failures(self):
"""检测故障Worker"""
from vllm.driver import Driver
current_time = time.time()
driver = Driver.get_instance()
if not driver:
return
# 检查所有Worker的心跳
for worker_id in list(self.heartbeat_times.keys()):
last_heartbeat = self.heartbeat_times[worker_id]
# 检查心跳是否超时
if current_time - last_heartbeat > self.heartbeat_timeout:
# 增加故障计数
self.failure_count[worker_id] = self.failure_count.get(worker_id, 0) + 1
# 超过阈值,标记为故障
if self.failure_count[worker_id] > self.config.get('failure_threshold', 3):
print(f"Worker {worker_id} failed: heartbeat timeout")
# 处理故障
self._handle_failure(worker_id, driver)
def _handle_failure(self, worker_id: str, driver: Any):
"""处理Worker故障"""
# 1. 移除故障Worker
driver.worker_manager.remove_worker(worker_id)
# 2. 创建新Worker替换
driver.worker_manager.add_worker()
# 3. 迁移故障Worker上的任务
self._migrate_tasks(worker_id, driver)
# 4. 更新心跳记录
if worker_id in self.heartbeat_times:
del self.heartbeat_times[worker_id]
if worker_id in self.failure_count:
del self.failure_count[worker_id]
def _migrate_tasks(self, worker_id: str, driver: Any):
"""迁移故障Worker上的任务"""
from vllm.task_scheduler import TaskScheduler
# 获取TaskScheduler实例
scheduler = TaskScheduler.get_instance()
if not scheduler:
return
# 查找故障Worker上的运行中任务
with scheduler.task_lock:
for task_id, running_worker_id in list(scheduler.running_tasks.items()):
if running_worker_id == worker_id:
# 查找任务对象
task = next(
(t for t in scheduler.tasks if t.task_id == task_id),
None
)
if task:
# 重新提交任务
scheduler.tasks.append(task)
# 从运行中任务列表移除
del scheduler.running_tasks[task_id]
print(f"Migrated task {task_id} from failed worker {worker_id}")代码解析:
vLLM的Worker / Driver架构采用了多种性能优化策略:
优化策略 | 实现方式 | 预期收益 |
|---|---|---|
批处理优化 | 将相似任务合并为批次,提高GPU利用率 | 提高吞吐量,减少推理延迟 |
异步通信 | 采用异步通信模式,减少通信等待时间 | 提高系统响应速度 |
模型预热 | Worker启动时预热模型,避免首次请求延迟 | 减少冷启动延迟 |
资源感知调度 | 根据Worker资源使用情况智能分配任务 | 提高资源利用率,避免热点节点 |
动态扩缩容 | 根据负载情况调整Worker数量 | 提高资源利用率,降低成本 |
共享内存通信 | 同一节点内Worker通过共享内存通信 | 减少数据拷贝,提高通信效率 |
通信压缩 | 对传输的数据进行压缩 | 减少网络带宽占用,提高通信速度 |
任务优先级 | 高优先级任务优先执行 | 保证关键任务的响应时间 |
特性 | vLLM Worker / Driver | TensorRT-LLM 分布式 |
|---|---|---|
通信机制 | Ray Actor | NCCL + MPI |
调度策略 | 动态分层调度 | 静态调度 |
故障容错 | 自动恢复 | 有限的容错支持 |
动态扩缩容 | 支持 | 不支持 |
资源利用率 | 高,动态调整 | 中,静态配置 |
易用性 | 简单,Python API友好 | 复杂,需要C++开发 |
生态集成 | 深度集成Ray生态 | 主要集成TensorRT生态 |
启动时间 | 快,支持模型预热 | 慢,需要编译优化 |
灵活性 | 高,支持动态配置 | 低,编译后固定 |
性能 | 高,接近TensorRT-LLM | 极高,针对NVIDIA硬件优化 |
特性 | vLLM Worker / Driver | DeepSpeed-Inference |
|---|---|---|
设计目标 | 单节点到大规模分布式 | 大规模分布式推理 |
通信机制 | Ray Actor | ZeRO + NCCL |
内存优化 | Paged KVCache | ZeRO-Inference |
调度策略 | 动态调度 | 静态调度 |
故障容错 | 自动恢复 | 有限的容错支持 |
动态扩缩容 | 支持 | 不支持 |
易用性 | 简单,API友好 | 复杂,配置项多 |
社区活跃度 | 高,更新频繁 | 中,更新较慢 |
模型支持 | 广泛支持Hugging Face模型 | 支持有限的模型类型 |
启动时间 | 快 | 慢,复杂的初始化流程 |
特性 | vLLM Worker / Driver | ONNX Runtime 分布式 |
|---|---|---|
模型格式 | 原生支持PyTorch模型 | 需要转换为ONNX格式 |
通信机制 | Ray Actor | gRPC |
调度策略 | 动态分层调度 | 基本的负载均衡 |
故障容错 | 自动恢复 | 有限的容错支持 |
动态扩缩容 | 支持 | 不支持 |
硬件支持 | 主要支持GPU | 支持多种硬件(GPU、CPU、TPU等) |
性能 | 高,针对推理优化 | 中,通用性强但针对性弱 |
易用性 | 简单,Python API友好 | 中等,需要模型转换 |
生态集成 | 深度集成Ray生态 | 主要集成ONNX生态 |
启动时间 | 快,支持模型预热 | 中,需要加载ONNX模型 |
特性 | vLLM Worker / Driver | Hugging Face Accelerate |
|---|---|---|
设计目标 | 高性能分布式推理 | 简化分布式训练和推理 |
通信机制 | Ray Actor | torch.distributed |
调度策略 | 动态分层调度 | 基本的并行支持 |
故障容错 | 自动恢复 | 无 |
动态扩缩容 | 支持 | 不支持 |
性能优化 | 深度优化的推理引擎 | 基本的推理优化 |
易用性 | 简单,API友好 | 简单,与Hugging Face无缝集成 |
社区活跃度 | 高,更新频繁 | 高,与Hugging Face生态紧密结合 |
模型支持 | 广泛支持Hugging Face模型 | 原生支持Hugging Face模型 |
启动时间 | 快,支持模型预热 | 快,但优化程度低 |
vLLM的Worker / Driver架构支持动态扩缩容,能够根据负载情况调整Worker数量,提高系统的可扩展性,支持从单节点到大规模分布式的平滑过渡。
通过动态资源分配和任务调度,vLLM的Worker / Driver架构能够提高GPU等硬件资源的利用率,降低推理成本。
完善的故障检测和恢复机制,确保系统在Worker故障时能够自动恢复,提高系统的可靠性和可用性。
基于Ray的通信机制,简化了分布式部署流程,降低了分布式推理的开发和维护成本。
通过批处理优化、异步通信等策略,提高了推理性能,减少了推理延迟,提高了系统吞吐量。
vLLM的Worker / Driver架构依赖Ray,增加了系统的复杂性和依赖关系,可能导致部署和维护成本增加。
在大规模分布式场景下,Driver与Worker之间的通信开销可能成为性能瓶颈。
动态调度策略可能引入调度延迟,影响系统的响应时间。
Worker启动时需要加载模型,模型加载开销可能导致动态扩缩容的响应速度较慢。
在高负载情况下,Worker之间可能存在资源竞争,影响系统性能。
故障恢复需要创建新Worker和加载模型,可能导致服务中断时间较长。
未来,vLLM的Worker / Driver架构将采用更智能的调度策略:
未来,vLLM的Worker / Driver架构将采用更高效的通信机制:
未来,vLLM的Worker / Driver架构将支持更灵活的资源管理:
未来,vLLM的Worker / Driver架构将实现更完善的故障容错机制:
未来,vLLM的Worker / Driver架构将支持更广泛的硬件:
随着模型规模的不断增长,vLLM的Worker / Driver架构将支持更大规模的分布式推理,包括1T参数以上的模型。
vLLM的Worker / Driver架构将进一步优化实时推理性能,支持低延迟、高并发的实时推理服务,适合自动驾驶、金融交易等场景。
支持同时部署和管理多个模型,实现多模型的协同推理,适合多模态、多任务等场景。
更好地支持云原生部署,包括Kubernetes集成、Serverless支持等,提高系统的弹性和可扩展性。
支持边缘云协同推理,将部分推理任务卸载到边缘设备,减少延迟和带宽占用。
未来,vLLM的Worker / Driver架构将实现完全自适应,能够根据负载情况、硬件资源和任务特性,自动调整架构参数和配置,实现最优性能。
机器学习技术将广泛应用于调度策略、资源分配、故障检测等方面,实现更智能的系统优化。
CPU、GPU、FPGA等异构硬件的协同工作将成为趋势,vLLM的Worker / Driver架构将支持异构硬件的高效协同。
边缘云协同推理将得到普及,vLLM的Worker / Driver架构将支持边缘设备和云服务器的协同工作,实现低延迟、高可靠性的推理服务。
基于vLLM的Worker / Driver架构,模型即服务(MaaS)将更加成熟,用户可以方便地部署和管理自己的模型,实现模型的快速迭代和部署。
参考链接:
附录(Appendix):
参数名称 | 默认值 | 说明 |
|---|---|---|
ray_address | auto | Ray集群地址 |
initial_worker_count | 1 | 初始Worker数量 |
schedule_interval | 0.1 | 调度间隔(秒) |
batch_size | 32 | 批处理大小 |
worker_monitor_interval | 5.0 | Worker监控间隔(秒) |
heartbeat_timeout | 10.0 | 心跳超时时间(秒) |
detection_interval | 1.0 | 故障检测间隔(秒) |
failure_threshold | 3 | 故障计数阈值 |
actor_cpu | 1 | 每个Actor的CPU数量 |
actor_gpu | 0 | 每个Actor的GPU数量 |
compile | False | 是否启用模型编译 |
dtype | float16 | 模型数据类型 |
from vllm.driver import Driver
# 配置
config = {
'model': 'meta-llama/Llama-2-70b-hf',
'dtype': 'float16',
'initial_worker_count': 4,
'batch_size': 32,
'compile': True,
'actor_gpu': 1,
'trust_remote_code': True
}
# 创建并启动Driver
driver = Driver(config)
driver.start()
# 提交推理任务
task = {
'input_ids': [1, 2, 3, 4, 5],
'max_new_tokens': 100,
'temperature': 0.7,
'top_p': 0.95
}
task_id = driver.submit_task(task)
print(f"Submitted task: {task_id}")
# 获取推理结果
result = driver.get_result(task_id, timeout=30.0)
print(f"Generated text: {result['generated_text']}")
# 动态调整Worker数量
driver.scale_workers(8)
print(f"Scaled to 8 workers")
# 停止Driver
driver.stop()问题 | 可能原因 | 解决方案 |
|---|---|---|
Ray初始化失败 | Ray地址配置错误,或Ray集群不可用 | 检查Ray地址配置,确保Ray集群正常运行 |
Worker创建失败 | GPU资源不足,或模型加载失败 | 检查GPU资源使用情况,确保有足够的GPU内存 |
推理延迟高 | 批处理大小设置过大,或调度间隔过长 | 调整批处理大小和调度间隔 |
Worker频繁故障 | 心跳阈值设置过小,或硬件问题 | 调整心跳阈值,检查硬件状态 |
通信开销大 | 模型输出过大,或网络带宽不足 | 优化模型输出,增加网络带宽 |
资源利用率低 | Worker数量过多,或批处理大小过小 | 调整Worker数量和批处理大小 |
关键词: vLLM, Worker / Driver, 分布式推理, Ray Actor, 动态调度, 故障恢复, 批处理优化, 资源感知, 性能优化