📢本篇文章是博主强化学习(RL)领域学习时,用于个人学习、研究或者欣赏使用,并基于博主对相关等领域的一些理解而记录的学习摘录和笔记,若有不当和侵权之处,指出后将会立即改正,还望谅解。文章分类在👉**强化学习**专栏: 【强化学习】(48)---《多级反馈队列(MFQ)算法》
多级反馈队列(MFQ)是一种经典的调度算法,广泛用于操作系统任务调度,也可用于强化学习环境的任务调度中。它是一种灵活且高效的调度机制,通过动态调整任务在不同队列中的优先级,实现公平性和响应时间的优化。
多级反馈队列通过使用多个优先级队列,根据任务的运行时间和系统负载动态调整任务的优先级。高优先级队列处理短任务或新到达的任务,低优先级队列处理较长的任务,且允许任务随着时间从一个队列转移到另一个队列。
以下公式和方法经常出现在与强化学习相关的研究中,用于描述任务调度过程:
**优先级动态调整:** 动态调整优先级可以建模为一个递减函数:
在强化学习中,优先级的调整可以通过奖励信号结合使用。例如,对于具有奖励
的任务:
其中,
:是一个奖励衰减函数,可以是线性或非线性形式。
: 当前任务优先级。
:任务运行时间。
: 权重因子,控制优先级衰减速度。
**任务等待时间:** 任务的等待时间 (W) 可定义为:
强化学习中,目标通常是最小化总等待时间(或延迟):
: 任务完成时间。
: 任务到达时间。
**时间片分配:** 高优先级队列时间片通常更短,可以用几何级数表示:
: 基础时间片长度。
: 当前队列编号。
老化机制: 为防止低优先级任务饥饿,采用老化公式提高优先级:
: 提高后的优先级。
: 当前优先级。
: 老化系数。
: 任务等待时间。
在强化学习背景下,MFQ可被用于任务调度问题的建模与求解。例如,在深度强化学习中,可以将多级反馈队列的任务分布作为环境状态,通过设计奖励函数引导智能体优化任务调度策略。
**状态空间:** 定义为每个队列中的任务数、优先级等系统状态。
**动作空间:** 定义为任务的迁移操作,例如将任务从低优先级队列提升到高优先级队列。
**奖励函数:** 设计为任务延迟、完成时间和调度开销的负值,例如:
: 任务等待时间。
: 调度开销。
: 权重系数
以下将通过 Python 实现一个多级反馈队列调度器,并逐步解释代码的核心部分和执行逻辑。
Task
),包括任务的基本信息(到达时间、执行时间等)。MFQScheduler
),包含多个优先级队列。🔥若是下面代码复现困难或者有问题,**欢迎评论区留言**;需要以整个项目形式的代码,请**在评论区留下您的邮箱**📌,以便于及时分享给您(私信难以及时回复)。
任务类存储每个任务的基本信息,并包含状态更新方法。
"""《 MFQ 算法的具体实现项目》
时间:2024.11
作者:不去幼儿园
"""
class Task:
def __init__(self, task_id, arrival_time, burst_time):
self.task_id = task_id
self.arrival_time = arrival_time
self.burst_time = burst_time
self.remaining_time = burst_time
self.priority = 0 # 初始优先级为最高
self.waiting_time = 0
self.completion_time = None
def __str__(self):
return f"Task(id={self.task_id}, priority={self.priority}, remaining_time={self.remaining_time})"
调度器类实现多级反馈队列的核心逻辑。
from collections import deque
class MFQScheduler:
def __init__(self, num_queues, time_slices):
self.num_queues = num_queues
self.time_slices = time_slices # 每个队列的时间片
self.queues = [deque() for _ in range(num_queues)] # 创建多级队列
self.current_time = 0
def add_task(self, task):
self.queues[0].append(task) # 新任务进入最高优先级队列
print(f"Task {task.task_id} added to queue 0 at time {self.current_time}")
def execute(self):
while any(self.queues): # 如果队列中还有任务
for priority, queue in enumerate(self.queues):
if queue:
task = queue.popleft()
time_slice = self.time_slices[priority]
execution_time = min(task.remaining_time, time_slice)
# 模拟任务执行
print(f"Executing Task {task.task_id} from queue {priority} for {execution_time} time units")
self.current_time += execution_time
task.remaining_time -= execution_time
# 检查任务状态
if task.remaining_time == 0:
task.completion_time = self.current_time
print(f"Task {task.task_id} completed at time {self.current_time}")
else:
# 降低优先级(若可能),重新插入队列
next_priority = min(priority + 1, self.num_queues - 1)
self.queues[next_priority].append(task)
print(f"Task {task.task_id} moved to queue {next_priority}")
# 任务执行完或降级后,继续调度
break
模拟添加任务,并执行调度。
# 创建调度器:3级队列,时间片分别为2、4、8
scheduler = MFQScheduler(num_queues=3, time_slices=[2, 4, 8])
# 添加任务
tasks = [
Task(task_id=1, arrival_time=0, burst_time=5),
Task(task_id=2, arrival_time=1, burst_time=10),
Task(task_id=3, arrival_time=2, burst_time=4)
]
for task in tasks:
scheduler.add_task(task)
# 执行调度
scheduler.execute()
deque
实现多级队列,高优先级队列(下标越小)任务优先处理。remaining_time
。老化机制
可在调度器中加入老化机制,定期提升低优先级队列中等待时间过长的任务:
def aging(self):
for priority in range(1, self.num_queues): # 忽略最高优先级队列
for task in list(self.queues[priority]):
task.waiting_time += 1
if task.waiting_time > 10: # 阈值
self.queues[priority].remove(task)
self.queues[priority - 1].append(task)
print(f"Task {task.task_id} aged to queue {priority - 1}")
强化学习可通过模拟动态任务负载(任务到达时间不均、任务数量波动)来优化 `MFQScheduler` 的奖励函数,提升整体效率。
由于博文主要为了介绍相关算法的原理和应用的方法,缺乏对于实际效果的关注,算法可能在上述环境中的效果不佳或者无法运行,一是算法不适配上述环境,二是算法未调参和优化,三是没有呈现完整的代码,四是等等。上述代码用于了解和学习算法足够了,但若是想直接将上面代码应用于实际项目中,还需要进行修改。
进一步改进代码,加入老化机制和日志模块
from collections import deque
class Task:
def __init__(self, task_id, arrival_time, burst_time):
self.task_id = task_id
self.arrival_time = arrival_time
self.burst_time = burst_time
self.remaining_time = burst_time
self.priority = 0 # Initial priority is the highest
self.waiting_time = 0
self.completion_time = None
def __str__(self):
return f"Task(id={self.task_id}, priority={self.priority}, remaining_time={self.remaining_time})"
class MFQScheduler:
def __init__(self, num_queues, time_slices, aging_threshold=10):
self.num_queues = num_queues
self.time_slices = time_slices # Time slice for each queue
self.queues = [deque() for _ in range(num_queues)]
self.current_time = 0
self.log = [] # To store log messages
self.aging_threshold = aging_threshold
def log_event(self, message):
self.log.append(f"[Time {self.current_time}]: {message}")
print(self.log[-1])
def add_task(self, task):
self.queues[0].append(task) # New tasks enter the highest priority queue
self.log_event(f"Task {task.task_id} added to queue 0")
def execute(self):
while any(self.queues): # While there are tasks in any queue
self.apply_aging() # Apply aging before each scheduling cycle
for priority, queue in enumerate(self.queues):
if queue:
task = queue.popleft()
time_slice = self.time_slices[priority]
execution_time = min(task.remaining_time, time_slice)
# Simulate task execution
self.log_event(f"Executing Task {task.task_id} from queue {priority} for {execution_time} time units")
self.current_time += execution_time
task.remaining_time -= execution_time
# Check task status
if task.remaining_time == 0:
task.completion_time = self.current_time
self.log_event(f"Task {task.task_id} completed")
else:
# Demote task to lower priority queue
next_priority = min(priority + 1, self.num_queues - 1)
self.queues[next_priority].append(task)
self.log_event(f"Task {task.task_id} moved to queue {next_priority}")
break # Start a new scheduling cycle after executing one task
def apply_aging(self):
for priority in range(1, self.num_queues): # Skip the highest priority queue
for task in list(self.queues[priority]):
task.waiting_time += 1
if task.waiting_time > self.aging_threshold:
self.queues[priority].remove(task)
self.queues[priority - 1].append(task)
task.waiting_time = 0 # Reset waiting time after aging
self.log_event(f"Task {task.task_id} aged to queue {priority - 1}")
def print_log(self):
print("\nExecution Log:")
for entry in self.log:
print(entry)
# Test the improved scheduler
if __name__ == "__main__":
# Create scheduler: 3 queues, time slices are 2, 4, 8 units
scheduler = MFQScheduler(num_queues=3, time_slices=[2, 4, 8], aging_threshold=5)
# Add tasks
tasks = [
Task(task_id=1, arrival_time=0, burst_time=5),
Task(task_id=2, arrival_time=1, burst_time=10),
Task(task_id=3, arrival_time=2, burst_time=4)
]
for task in tasks:
scheduler.add_task(task)
# Execute scheduler
scheduler.execute()
# Print log
scheduler.print```python
from collections import deque
class Task:
def __init__(self, task_id, arrival_time, burst_time):
self.task_id = task_id
self.arrival_time = arrival_time
self.burst_time = burst_time
self.remaining_time = burst_time
self.priority = 0 # Initial priority is the highest
self.waiting_time = 0
self.completion_time = None
def __str__(self):
return f"Task(id={self.task_id}, priority={self.priority}, remaining_time={self.remaining_time})"
class MFQScheduler:
def __init__(self, num_queues, time_slices, aging_threshold=10):
self.num_queues = num_queues
self.time_slices = time_slices # Time slice for each queue
self.queues = [deque() for _ in range(num_queues)]
self.current_time = 0
self.log = [] # To store log messages
self.aging_threshold = aging_threshold
def log_event(self, message):
self.log.append(f"[Time {self.current_time}]: {message}")
print(self.log[-1])
def add_task(self, task):
self.queues[0].append(task) # New tasks enter the highest priority queue
self.log_event(f"Task {task.task_id} added to queue 0")
def execute(self):
while any(self.queues): # While there are tasks in any queue
self.apply_aging() # Apply aging before each scheduling cycle
for priority, queue in enumerate(self.queues):
if queue:
task = queue.popleft()
time_slice = self.time_slices[priority]
execution_time = min(task.remaining_time, time_slice)
# Simulate task execution
self.log_event(f"Executing Task {task.task_id} from queue {priority} for {execution_time} time units")
self.current_time += execution_time
task.remaining_time -= execution_time
# Check task status
if task.remaining_time == 0:
task.completion_time = self.current_time
self.log_event(f"Task {task.task_id} completed")
else:
# Demote task to lower priority queue
next_priority = min(priority + 1, self.num_queues - 1)
self.queues[next_priority].append(task)
self.log_event(f"Task {task.task_id} moved to queue {next_priority}")
break # Start a new scheduling cycle after executing one task
def apply_aging(self):
for priority in range(1, self.num_queues): # Skip the highest priority queue
for task in list(self.queues[priority]):
task.waiting_time += 1
if task.waiting_time > self.aging_threshold:
self.queues[priority].remove(task)
self.queues[priority - 1].append(task)
task.waiting_time = 0 # Reset waiting time after aging
self.log_event(f"Task {task.task_id} aged to queue {priority - 1}")
def print_log(self):
print("\nExecution Log:")
for entry in self.log:
print(entry)
# Test the improved scheduler
if __name__ == "__main__":
# Create scheduler: 3 queues, time slices are 2, 4, 8 units
scheduler = MFQScheduler(num_queues=3, time_slices=[2, 4, 8], aging_threshold=5)
# Add tasks
tasks = [
Task(task_id=1, arrival_time=0, burst_time=5),
Task(task_id=2, arrival_time=1, burst_time=10),
Task(task_id=3, arrival_time=2, burst_time=4)
]
for task in tasks:
scheduler.add_task(task)
# Execute scheduler
scheduler.execute()
# Print log
scheduler.print_log()
日志模块:
log_event
方法记录调度器在每个时间点的行为。
日志存储在 `self.log` 中,可以通过 `print_log` 方法输出完整执行日志。
老化机制:
在每次调度循环中调用 `apply_aging` 方法。
老化逻辑检查每个低优先级队列中任务的等待时间,超过阈值的任务提升到更高优先级队列。
通过上述模型与公式,MFQ算法在强化学习环境中的应用可以进一步扩展为对动态任务负载的自适应调度,实现更高效的资源利用和任务完成时间优化。
更多文章,请查看文章:
博客都是给自己看的笔记,如有误导深表抱歉。文章若有不当和不正确之处,还望理解与指出。由于部分文字、图片等来源于互联网,无法核实真实出处,如涉及相关争议,请联系博主删除。如有错误、疑问和侵权,欢迎评论留言联系作者