【RL Base】多级反馈队列(MFQ)算法

举报
不去幼儿园 发表于 2024/12/03 08:29:55 2024/12/03
【摘要】 多级反馈队列(MFQ)是一种经典的调度算法,广泛用于操作系统任务调度,也可用于强化学习环境中。它是一种灵活且高效的调度机制,通过动态调整任务在不同队列中的优先级,实现公平性和响应时间的优化。多级反馈队列通过使用多个优先级队列,根据任务的运行时间和系统负载动态调整任务的优先级。高优先级队列处理短任务或新到达的任务,低优先级队列处理较长的任务,且允许任务随着时间从一个队列转移到另一个队列。

  📢本篇文章是博主强化学习(RL)领域学习时,用于个人学习、研究或者欣赏使用,并基于博主对相关等领域的一些理解而记录的学习摘录和笔记,若有不当和侵权之处,指出后将会立即改正,还望谅解。文章分类在👉强化学习专栏:

       【强化学习】(10)---《多级反馈队列(MFQ)算法》

多级反馈队列(MFQ)算法

目录

1.前言

2.核心思想

3.基本原则:

4.具体实现步骤

5.公式和关键数学模型

6.强化学习中的MFQ应用

[Python] MFQ算法的具体实现代码和流程

实现步骤

1. 定义任务类

2. 定义调度器类

3. 测试调度器逻辑

[Notice]  核心解释

7.MFQ总结


 1.前言

        多级反馈队列(MFQ)是一种经典的调度算法,广泛用于操作系统任务调度,也可用于强化学习环境的任务调度中。它是一种灵活且高效的调度机制,通过动态调整任务在不同队列中的优先级,实现公平性和响应时间的优化。

编辑


2.核心思想

        多级反馈队列通过使用多个优先级队列,根据任务的运行时间和系统负载动态调整任务的优先级。高优先级队列处理短任务或新到达的任务,低优先级队列处理较长的任务,且允许任务随着时间从一个队列转移到另一个队列。


3.基本原则:

  1. 优先级动态调整: 刚进入系统的任务放入高优先级队列。随着运行时间增加,任务的优先级会降低,从高优先级队列逐渐移动到低优先级队列。
  2. 抢占机制: 高优先级队列任务优先执行,低优先级队列任务只有在高优先级队列为空时才会被执行。
  3. 时间片分配: 每个队列分配不同长度的时间片,通常高优先级队列的时间片较短,而低优先级队列时间片较长。

4.具体实现步骤

  1. 系统为任务设置多个优先级队列,每个队列有不同的时间片长度。
  2. 当新任务到达时,默认放置在最高优先级队列中。
  3. 如果任务在当前队列的时间片内未完成,会被移动到下一级队列。
  4. 高优先级任务抢占低优先级任务的执行资源。
  5. 低优先级队列中任务若长期未被调度,可通过老化(Aging)机制提高优先级,防止“饥饿”问题。T_{\text{end}}

5.公式和关键数学模型

        以下公式和方法经常出现在与强化学习相关的研究中,用于描述任务调度过程:

        优先级动态调整: 动态调整优先级可以建模为一个递减函数:

P_{i+1} = P_i - \Delta t \cdot w

在强化学习中,优先级的调整可以通过奖励信号结合使用。例如,对于具有奖励 R_t的任务:

P_{i+1} = P_i - f(R_t)

其中, f(R_t) :是一个奖励衰减函数,可以是线性或非线性形式。P_i: 当前任务优先级。\Delta t:任务运行时间。w: 权重因子,控制优先级衰减速度。

        任务等待时间: 任务的等待时间 (W) 可定义为:

W_i = T_{\text{end}} - T_{\text{arrival}}

强化学习中,目标通常是最小化总等待时间(或延迟):

\min \sum_{i} W_i

T_{\text{end}}: 任务完成时间。T_{\text{arrival}}: 任务到达时间。

        时间片分配: 高优先级队列时间片通常更短,可以用几何级数表示:

T_i = T_{\text{base}} \cdot 2^i

T_{\text{base}}: 基础时间片长度。i: 当前队列编号。

老化机制: 为防止低优先级任务饥饿,采用老化公式提高优先级:

P_{\text{new}} = P_{\text{old}} + k \cdot \Delta T

P_{\text{new}}: 提高后的优先级。P_{\text{old}}: 当前优先级。k: 老化系数。\Delta T: 任务等待时间。


6.强化学习中的MFQ应用

        在强化学习背景下,MFQ可被用于任务调度问题的建模与求解。例如,在深度强化学习中,可以将多级反馈队列的任务分布作为环境状态,通过设计奖励函数引导智能体优化任务调度策略。

        状态空间: 定义为每个队列中的任务数、优先级等系统状态。

        动作空间: 定义为任务的迁移操作,例如将任务从低优先级队列提升到高优先级队列。

        奖励函数: 设计为任务延迟、完成时间和调度开销的负值,例如:

R_t = -\left( \alpha \cdot W_i + \beta \cdot L_i \right)

W_i: 任务等待时间。L_i: 调度开销。\alpha, \beta: 权重系数。


[Python] MFQ算法的具体实现代码和流程

        以下将通过 Python 实现一个多级反馈队列调度器,并逐步解释代码的核心部分和执行逻辑。

实现步骤

  1. 创建任务类(Task),包括任务的基本信息(到达时间、执行时间等)。
  2. 定义多级反馈队列调度器类(MFQScheduler),包含多个优先级队列。
  3. 实现任务调度逻辑,包括优先级动态调整、时间片分配、任务迁移等。
  4. 实现一个模拟环境以测试调度器的行为。

        🔥若是下面代码复现困难或者有问题,欢迎评论区留言;需要以整个项目形式的代码,请在评论区留下您的邮箱📌,以便于及时分享给您(私信难以及时回复)。

1. 定义任务类

        任务类存储每个任务的基本信息,并包含状态更新方法。

"""《 MFQ 算法的具体实现项目》
    时间:2024.11
    作者:不去幼儿园
"""
class Task:
    def __init__(self, task_id, arrival_time, burst_time):
        self.task_id = task_id
        self.arrival_time = arrival_time
        self.burst_time = burst_time
        self.remaining_time = burst_time
        self.priority = 0  # 初始优先级为最高
        self.waiting_time = 0
        self.completion_time = None

    def __str__(self):
        return f"Task(id={self.task_id}, priority={self.priority}, remaining_time={self.remaining_time})"

2. 定义调度器类

        调度器类实现多级反馈队列的核心逻辑。

from collections import deque

class MFQScheduler:
    def __init__(self, num_queues, time_slices):
        self.num_queues = num_queues
        self.time_slices = time_slices  # 每个队列的时间片
        self.queues = [deque() for _ in range(num_queues)]  # 创建多级队列
        self.current_time = 0

    def add_task(self, task):
        self.queues[0].append(task)  # 新任务进入最高优先级队列
        print(f"Task {task.task_id} added to queue 0 at time {self.current_time}")

    def execute(self):
        while any(self.queues):  # 如果队列中还有任务
            for priority, queue in enumerate(self.queues):
                if queue:
                    task = queue.popleft()
                    time_slice = self.time_slices[priority]
                    execution_time = min(task.remaining_time, time_slice)
                    
                    # 模拟任务执行
                    print(f"Executing Task {task.task_id} from queue {priority} for {execution_time} time units")
                    self.current_time += execution_time
                    task.remaining_time -= execution_time

                    # 检查任务状态
                    if task.remaining_time == 0:
                        task.completion_time = self.current_time
                        print(f"Task {task.task_id} completed at time {self.current_time}")
                    else:
                        # 降低优先级(若可能),重新插入队列
                        next_priority = min(priority + 1, self.num_queues - 1)
                        self.queues[next_priority].append(task)
                        print(f"Task {task.task_id} moved to queue {next_priority}")
                    
                    # 任务执行完或降级后,继续调度
                    break

3. 测试调度器逻辑

        模拟添加任务,并执行调度。

# 创建调度器:3级队列,时间片分别为2、4、8
scheduler = MFQScheduler(num_queues=3, time_slices=[2, 4, 8])

# 添加任务
tasks = [
    Task(task_id=1, arrival_time=0, burst_time=5),
    Task(task_id=2, arrival_time=1, burst_time=10),
    Task(task_id=3, arrival_time=2, burst_time=4)
]

for task in tasks:
    scheduler.add_task(task)

# 执行调度
scheduler.execute()

[Notice]  核心解释

优先级队列与时间片

  • 调度器通过 deque 实现多级队列,高优先级队列(下标越小)任务优先处理。
  • 每级队列的时间片长度可自定义(如 2、4、8),用以控制不同任务的执行时间。

任务状态更新

  • 每个任务在执行后,会更新其剩余时间 remaining_time
  • 如果任务未完成,会降低优先级;若已完成,记录其完成时间。

老化机制

        可在调度器中加入老化机制,定期提升低优先级队列中等待时间过长的任务:

def aging(self):
    for priority in range(1, self.num_queues):  # 忽略最高优先级队列
        for task in list(self.queues[priority]):
            task.waiting_time += 1
            if task.waiting_time > 10:  # 阈值
                self.queues[priority].remove(task)
                self.queues[priority - 1].append(task)
                print(f"Task {task.task_id} aged to queue {priority - 1}")

动态负载的调度

        强化学习可通过模拟动态任务负载(任务到达时间不均、任务数量波动)来优化 MFQScheduler 的奖励函数,提升整体效率。

        由于博文主要为了介绍相关算法的原理和应用的方法,缺乏对于实际效果的关注,算法可能在上述环境中的效果不佳或者无法运行,一是算法不适配上述环境,二是算法未调参和优化,三是没有呈现完整的代码,四是等等。上述代码用于了解和学习算法足够了,但若是想直接将上面代码应用于实际项目中,还需要进行修改。

 进一步改进代码,加入老化机制和日志模块

from collections import deque

class Task:
    def __init__(self, task_id, arrival_time, burst_time):
        self.task_id = task_id
        self.arrival_time = arrival_time
        self.burst_time = burst_time
        self.remaining_time = burst_time
        self.priority = 0  # Initial priority is the highest
        self.waiting_time = 0
        self.completion_time = None

    def __str__(self):
        return f"Task(id={self.task_id}, priority={self.priority}, remaining_time={self.remaining_time})"


class MFQScheduler:
    def __init__(self, num_queues, time_slices, aging_threshold=10):
        self.num_queues = num_queues
        self.time_slices = time_slices  # Time slice for each queue
        self.queues = [deque() for _ in range(num_queues)]
        self.current_time = 0
        self.log = []  # To store log messages
        self.aging_threshold = aging_threshold

    def log_event(self, message):
        self.log.append(f"[Time {self.current_time}]: {message}")
        print(self.log[-1])

    def add_task(self, task):
        self.queues[0].append(task)  # New tasks enter the highest priority queue
        self.log_event(f"Task {task.task_id} added to queue 0")

    def execute(self):
        while any(self.queues):  # While there are tasks in any queue
            self.apply_aging()  # Apply aging before each scheduling cycle

            for priority, queue in enumerate(self.queues):
                if queue:
                    task = queue.popleft()
                    time_slice = self.time_slices[priority]
                    execution_time = min(task.remaining_time, time_slice)

                    # Simulate task execution
                    self.log_event(f"Executing Task {task.task_id} from queue {priority} for {execution_time} time units")
                    self.current_time += execution_time
                    task.remaining_time -= execution_time

                    # Check task status
                    if task.remaining_time == 0:
                        task.completion_time = self.current_time
                        self.log_event(f"Task {task.task_id} completed")
                    else:
                        # Demote task to lower priority queue
                        next_priority = min(priority + 1, self.num_queues - 1)
                        self.queues[next_priority].append(task)
                        self.log_event(f"Task {task.task_id} moved to queue {next_priority}")

                    break  # Start a new scheduling cycle after executing one task

    def apply_aging(self):
        for priority in range(1, self.num_queues):  # Skip the highest priority queue
            for task in list(self.queues[priority]):
                task.waiting_time += 1
                if task.waiting_time > self.aging_threshold:
                    self.queues[priority].remove(task)
                    self.queues[priority - 1].append(task)
                    task.waiting_time = 0  # Reset waiting time after aging
                    self.log_event(f"Task {task.task_id} aged to queue {priority - 1}")

    def print_log(self):
        print("\nExecution Log:")
        for entry in self.log:
            print(entry)


# Test the improved scheduler
if __name__ == "__main__":
    # Create scheduler: 3 queues, time slices are 2, 4, 8 units
    scheduler = MFQScheduler(num_queues=3, time_slices=[2, 4, 8], aging_threshold=5)

    # Add tasks
    tasks = [
        Task(task_id=1, arrival_time=0, burst_time=5),
        Task(task_id=2, arrival_time=1, burst_time=10),
        Task(task_id=3, arrival_time=2, burst_time=4)
    ]

    for task in tasks:
        scheduler.add_task(task)

    # Execute scheduler
    scheduler.execute()

    # Print log
    scheduler.print```python
from collections import deque

class Task:
    def __init__(self, task_id, arrival_time, burst_time):
        self.task_id = task_id
        self.arrival_time = arrival_time
        self.burst_time = burst_time
        self.remaining_time = burst_time
        self.priority = 0  # Initial priority is the highest
        self.waiting_time = 0
        self.completion_time = None

    def __str__(self):
        return f"Task(id={self.task_id}, priority={self.priority}, remaining_time={self.remaining_time})"


class MFQScheduler:
    def __init__(self, num_queues, time_slices, aging_threshold=10):
        self.num_queues = num_queues
        self.time_slices = time_slices  # Time slice for each queue
        self.queues = [deque() for _ in range(num_queues)]
        self.current_time = 0
        self.log = []  # To store log messages
        self.aging_threshold = aging_threshold

    def log_event(self, message):
        self.log.append(f"[Time {self.current_time}]: {message}")
        print(self.log[-1])

    def add_task(self, task):
        self.queues[0].append(task)  # New tasks enter the highest priority queue
        self.log_event(f"Task {task.task_id} added to queue 0")

    def execute(self):
        while any(self.queues):  # While there are tasks in any queue
            self.apply_aging()  # Apply aging before each scheduling cycle

            for priority, queue in enumerate(self.queues):
                if queue:
                    task = queue.popleft()
                    time_slice = self.time_slices[priority]
                    execution_time = min(task.remaining_time, time_slice)

                    # Simulate task execution
                    self.log_event(f"Executing Task {task.task_id} from queue {priority} for {execution_time} time units")
                    self.current_time += execution_time
                    task.remaining_time -= execution_time

                    # Check task status
                    if task.remaining_time == 0:
                        task.completion_time = self.current_time
                        self.log_event(f"Task {task.task_id} completed")
                    else:
                        # Demote task to lower priority queue
                        next_priority = min(priority + 1, self.num_queues - 1)
                        self.queues[next_priority].append(task)
                        self.log_event(f"Task {task.task_id} moved to queue {next_priority}")

                    break  # Start a new scheduling cycle after executing one task

    def apply_aging(self):
        for priority in range(1, self.num_queues):  # Skip the highest priority queue
            for task in list(self.queues[priority]):
                task.waiting_time += 1
                if task.waiting_time > self.aging_threshold:
                    self.queues[priority].remove(task)
                    self.queues[priority - 1].append(task)
                    task.waiting_time = 0  # Reset waiting time after aging
                    self.log_event(f"Task {task.task_id} aged to queue {priority - 1}")

    def print_log(self):
        print("\nExecution Log:")
        for entry in self.log:
            print(entry)


# Test the improved scheduler
if __name__ == "__main__":
    # Create scheduler: 3 queues, time slices are 2, 4, 8 units
    scheduler = MFQScheduler(num_queues=3, time_slices=[2, 4, 8], aging_threshold=5)

    # Add tasks
    tasks = [
        Task(task_id=1, arrival_time=0, burst_time=5),
        Task(task_id=2, arrival_time=1, burst_time=10),
        Task(task_id=3, arrival_time=2, burst_time=4)
    ]

    for task in tasks:
        scheduler.add_task(task)

    # Execute scheduler
    scheduler.execute()

    # Print log
    scheduler.print_log()

日志模块:

   log_event 方法记录调度器在每个时间点的行为。

        日志存储在 self.log 中,可以通过 print_log 方法输出完整执行日志。

老化机制:

        在每次调度循环中调用 apply_aging 方法。

        老化逻辑检查每个低优先级队列中任务的等待时间,超过阈值的任务提升到更高优先级队列。

优点:

  • 增强可观测性: 日志模块帮助跟踪每个任务的状态变化。
  • 解决饥饿问题: 老化机制确保长期等待任务能被及时处理。

7.MFQ总结

重要特性

  1. 公平性: 通过动态调整优先级和老化机制,保证所有任务最终都能得到服务。
  2. 灵活性: 能动态适应任务的运行时间分布。
  3. 高响应性: 对于短任务能够迅速调度,保证系统响应时间。

应用案例

  1. 操作系统: MFQ广泛用于进程调度(如Linux内核调度器)。
  2. 强化学习: 用于建模任务分配问题,例如云计算任务分配和边缘计算调度。

        通过上述模型与公式,MFQ算法在强化学习环境中的应用可以进一步扩展为对动态任务负载的自适应调度,实现更高效的资源利用和任务完成时间优化。

更多文章,请查看文章:

【MADRL】多智能体深度强化学习《纲要》


        博客都是给自己看的笔记,如有误导深表抱歉。文章若有不当和不正确之处,还望理解与指出。由于部分文字、图片等来源于互联网,无法核实真实出处,如涉及相关争议,请联系博主删除。如有错误、疑问和侵权,欢迎评论留言联系作者,或者添加VX:Rainbook_2,联系作者。✨

【版权声明】本文为华为云社区用户原创内容,未经允许不得转载,如需转载请自行联系原作者进行授权。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。