Browse Source

多核负载均衡(#193)

* feat(sched):CPU负载检测初步实现

* fix(smp):调整smp中的apic的头文件声明

* fix(smp):简单的负载均衡算法实现

* fix(sched):抽离负载均衡方法

* fix(sched):修改rt中的运行队列bug,调整负载均衡逻辑

* fix(process):移除无用测试代码

* reformat code
kong 2 years ago
parent
commit
1d48996375

+ 2 - 1
kernel/src/include/bindings/wrapper.h

@@ -35,4 +35,5 @@
 #include <process/process.h>
 #include <sched/sched.h>
 #include <time/sleep.h>
-#include <mm/mm-types.h>
+#include <mm/mm-types.h>
+#include <smp/smp.h>

+ 1 - 2
kernel/src/libs/mod.rs

@@ -7,7 +7,6 @@ pub mod atomic;
 pub mod list;
 pub mod lockref;
 pub mod mutex;
+pub mod rwlock;
 pub mod semaphore;
 pub mod wait_queue;
-pub mod rwlock;
-

+ 4 - 5
kernel/src/libs/rwlock.rs

@@ -113,7 +113,7 @@ impl<T> RwLock<T> {
         let reader_value = self.current_reader();
         //得到自增后的reader_value, 包括了尝试获得READER守卫的进程
         let value;
-        
+
         if reader_value.is_err() {
             return None; //获取失败
         } else {
@@ -165,9 +165,10 @@ impl<T> RwLock<T> {
     #[inline]
     /// @brief 尝试获得WRITER守卫
     pub fn try_write(&self) -> Option<RwLockWriteGuard<T>> {
-        let res:bool = self
+        let res: bool = self
             .lock
-            .compare_exchange(0, WRITER, Ordering::Acquire, Ordering::Relaxed).is_ok();
+            .compare_exchange(0, WRITER, Ordering::Acquire, Ordering::Relaxed)
+            .is_ok();
         //只有lock大小为0的时候能获得写者守卫
         if res {
             return Some(RwLockWriteGuard {
@@ -443,5 +444,3 @@ impl<'rwlock, T> Drop for RwLockWriteGuard<'rwlock, T> {
             .fetch_and(!(WRITER | UPGRADED), Ordering::Release);
     }
 }
-
-

+ 8 - 0
kernel/src/sched/cfs.rs

@@ -103,6 +103,10 @@ impl CFSQueue {
             return None;
         }
     }
+    /// 获取运行队列的长度
+    pub fn get_cfs_queue_size(&mut self) -> usize {
+        return self.queue.len();
+    }
 }
 
 /// @brief CFS调度器类
@@ -172,6 +176,10 @@ impl SchedulerCFS {
         // kdebug!("set cpu idle: id={}", cpu_id);
         self.cpu_queue[cpu_id].idle_pcb = pcb;
     }
+    /// 获取某个cpu的运行队列中的进程数
+    pub fn get_cfs_queue_len(&mut self, cpu_id: u32) -> usize {
+        return self.cpu_queue[cpu_id as usize].get_cfs_queue_size();
+    }
 }
 
 impl Scheduler for SchedulerCFS {

+ 49 - 3
kernel/src/sched/core.rs

@@ -6,6 +6,7 @@ use crate::{
         context::switch_process,
         interrupt::{cli, sti},
     },
+    include::bindings::bindings::smp_get_total_cpu,
     include::bindings::bindings::{
         process_control_block, pt_regs, EINVAL, EPERM, MAX_CPU_NUM, PF_NEED_MIGRATE, PROC_RUNNING,
         SCHED_FIFO, SCHED_NORMAL, SCHED_RR,
@@ -27,7 +28,43 @@ pub fn cpu_executing(cpu_id: u32) -> &'static mut process_control_block {
         todo!()
     }
 }
+// 获取某个cpu的负载情况,返回当前负载,cpu_id 是获取负载的cpu的id
+// TODO:将获取负载情况调整为最近一段时间运行进程的数量
+pub fn get_cpu_loads(cpu_id: u32) -> u32 {
+    let cfs_scheduler = __get_cfs_scheduler();
+    let rt_scheduler = __get_rt_scheduler();
+    let len_cfs = cfs_scheduler.get_cfs_queue_len(cpu_id);
+    let len_rt = rt_scheduler.get_rt_queue_len(cpu_id);
+    // let load_rt = rt_scheduler.get_load_list_len(cpu_id);
+    // kdebug!("this cpu_id {} is load rt {}", cpu_id, load_rt);
 
+    return (len_rt + len_cfs) as u32;
+}
+// 负载均衡
+pub fn loads_balance(pcb: &mut process_control_block) {
+    // 对pcb的迁移情况进行调整
+    // 获取总的CPU数量
+    let cpu_num = unsafe { smp_get_total_cpu() };
+    // 获取当前负载最小的CPU的id
+    let mut min_loads_cpu_id = pcb.cpu_id;
+    let mut min_loads = get_cpu_loads(pcb.cpu_id);
+    for cpu_id in 0..cpu_num {
+        let tmp_cpu_loads = get_cpu_loads(cpu_id);
+        if min_loads - tmp_cpu_loads > 0 {
+            min_loads_cpu_id = cpu_id;
+            min_loads = tmp_cpu_loads;
+        }
+    }
+
+    // 将当前pcb迁移到负载最小的CPU
+    // 如果当前pcb的PF_NEED_MIGRATE已经置位,则不进行迁移操作
+    if (min_loads_cpu_id != pcb.cpu_id) && (pcb.flags & (PF_NEED_MIGRATE as u64)) == 0 {
+        // sched_migrate_process(pcb, min_loads_cpu_id as usize);
+        pcb.flags |= PF_NEED_MIGRATE as u64;
+        pcb.migrate_to = min_loads_cpu_id;
+        // kdebug!("set migrating, pcb:{:?}", pcb);
+    }
+}
 /// @brief 具体的调度器应当实现的trait
 pub trait Scheduler {
     /// @brief 使用该调度器发起调度的时候,要调用的函数
@@ -44,12 +81,13 @@ fn __sched() -> Option<&'static mut process_control_block> {
     compiler_fence(core::sync::atomic::Ordering::SeqCst);
 
     let next: &'static mut process_control_block;
-    match rt_scheduler.pick_next_task_rt() {
+    match rt_scheduler.pick_next_task_rt(current_pcb().cpu_id) {
         Some(p) => {
             next = p;
             // kdebug!("next pcb is {}",next.pid);
-            // rt_scheduler.enqueue_task_rt(next.priority as usize, next);
-            sched_enqueue(next, false);
+            // 将pick的进程放回原处
+            rt_scheduler.enqueue_front(next);
+
             return rt_scheduler.sched();
         }
         None => {
@@ -65,12 +103,20 @@ fn __sched() -> Option<&'static mut process_control_block> {
 #[allow(dead_code)]
 #[no_mangle]
 pub extern "C" fn sched_enqueue(pcb: &'static mut process_control_block, mut reset_time: bool) {
+    compiler_fence(core::sync::atomic::Ordering::SeqCst);
+
     // 调度器不处理running位为0的进程
     if pcb.state & (PROC_RUNNING as u64) == 0 {
         return;
     }
     let cfs_scheduler = __get_cfs_scheduler();
     let rt_scheduler = __get_rt_scheduler();
+    // TODO 前几号进程不进行迁移,这里需要判断修改,当前的意思为了调试已经初始化完成的rt进程
+    // if pcb.pid > 4 && pcb.policy!=0{
+    if pcb.pid > 4 {
+        loads_balance(pcb);
+    }
+    compiler_fence(core::sync::atomic::Ordering::SeqCst);
 
     compiler_fence(core::sync::atomic::Ordering::SeqCst);
     if (pcb.flags & (PF_NEED_MIGRATE as u64)) != 0 {

+ 58 - 10
kernel/src/sched/rt.rs

@@ -4,7 +4,9 @@ use alloc::{boxed::Box, collections::LinkedList, vec::Vec};
 
 use crate::{
     arch::asm::current::current_pcb,
-    include::bindings::bindings::{process_control_block, PF_NEED_SCHED, SCHED_FIFO, SCHED_RR},
+    include::bindings::bindings::{
+        process_control_block, MAX_CPU_NUM, PF_NEED_SCHED, SCHED_FIFO, SCHED_RR,
+    },
     kBUG, kdebug,
     libs::spinlock::RawSpinlock,
 };
@@ -23,7 +25,7 @@ pub fn __get_rt_scheduler() -> &'static mut SchedulerRT {
 
 /// @brief 初始化rt调度器
 pub unsafe fn sched_rt_init() {
-    kdebug!("test rt init");
+    kdebug!("rt scheduler init");
     if RT_SCHEDULER_PTR.is_null() {
         RT_SCHEDULER_PTR = Box::leak(Box::new(SchedulerRT::new()));
     } else {
@@ -86,11 +88,15 @@ impl RTQueue {
         self.queue.push_front(pcb);
         self.lock.unlock();
     }
+    pub fn get_rt_queue_size(&mut self) -> usize {
+        return self.queue.len();
+    }
 }
 
 /// @brief RT调度器类
 pub struct SchedulerRT {
-    cpu_queue: Vec<&'static mut RTQueue>,
+    cpu_queue: Vec<Vec<&'static mut RTQueue>>,
+    load_list: Vec<&'static mut LinkedList<u64>>,
 }
 
 impl SchedulerRT {
@@ -102,20 +108,32 @@ impl SchedulerRT {
         // todo: 从cpu模块来获取核心的数目
         let mut result = SchedulerRT {
             cpu_queue: Default::default(),
+            load_list: Default::default(),
         };
 
         // 为每个cpu核心创建队列
-        for _ in 0..SchedulerRT::MAX_RT_PRIO {
-            result.cpu_queue.push(Box::leak(Box::new(RTQueue::new())));
+        for cpu_id in 0..MAX_CPU_NUM {
+            result.cpu_queue.push(Vec::new());
+            // 每个CPU有MAX_RT_PRIO个优先级队列
+            for _ in 0..SchedulerRT::MAX_RT_PRIO {
+                result.cpu_queue[cpu_id as usize].push(Box::leak(Box::new(RTQueue::new())));
+            }
+        }
+        // 为每个cpu核心创建负载统计队列
+        for _ in 0..MAX_CPU_NUM {
+            result
+                .load_list
+                .push(Box::leak(Box::new(LinkedList::new())));
         }
         return result;
     }
+
     /// @brief 挑选下一个可执行的rt进程
-    pub fn pick_next_task_rt(&mut self) -> Option<&'static mut process_control_block> {
+    pub fn pick_next_task_rt(&mut self, cpu_id: u32) -> Option<&'static mut process_control_block> {
         // 循环查找,直到找到
         // 这里应该是优先级数量,而不是CPU数量,需要修改
         for i in 0..SchedulerRT::MAX_RT_PRIO {
-            let cpu_queue_i: &mut RTQueue = self.cpu_queue[i as usize];
+            let cpu_queue_i: &mut RTQueue = self.cpu_queue[cpu_id as usize][i as usize];
             let proc: Option<&'static mut process_control_block> = cpu_queue_i.dequeue();
             if proc.is_some() {
                 return proc;
@@ -124,6 +142,22 @@ impl SchedulerRT {
         // return 一个空值
         None
     }
+
+    pub fn get_rt_queue_len(&mut self, cpu_id: u32) -> usize {
+        let mut sum = 0;
+        for prio in 0..SchedulerRT::MAX_RT_PRIO {
+            sum += self.cpu_queue[cpu_id as usize][prio as usize].get_rt_queue_size();
+        }
+        return sum as usize;
+    }
+
+    pub fn get_load_list_len(&mut self, cpu_id: u32) -> usize {
+        return self.load_list[cpu_id as usize].len();
+    }
+
+    pub fn enqueue_front(&mut self, pcb: &'static mut process_control_block) {
+        self.cpu_queue[pcb.cpu_id as usize][pcb.priority as usize].enqueue_front(pcb);
+    }
 }
 
 impl Scheduler for SchedulerRT {
@@ -132,8 +166,9 @@ impl Scheduler for SchedulerRT {
     fn sched(&mut self) -> Option<&'static mut process_control_block> {
         current_pcb().flags &= !(PF_NEED_SCHED as u64);
         // 正常流程下,这里一定是会pick到next的pcb的,如果是None的话,要抛出错误
+        let cpu_id = current_pcb().cpu_id;
         let proc: &'static mut process_control_block =
-            self.pick_next_task_rt().expect("No RT process found");
+            self.pick_next_task_rt(cpu_id).expect("No RT process found");
 
         // 如果是fifo策略,则可以一直占有cpu直到有优先级更高的任务就绪(即使优先级相同也不行)或者主动放弃(等待资源)
         if proc.policy == SCHED_FIFO {
@@ -167,14 +202,27 @@ impl Scheduler for SchedulerRT {
             }
             // curr优先级更大,说明一定是实时进程,将所选进程入队列,此时需要入队首
             else {
-                self.cpu_queue[proc.cpu_id as usize].enqueue_front(proc);
+                self.cpu_queue[cpu_id as usize][proc.cpu_id as usize].enqueue_front(proc);
             }
         }
         return None;
     }
 
     fn enqueue(&mut self, pcb: &'static mut process_control_block) {
+        let cpu_id = pcb.cpu_id;
         let cpu_queue = &mut self.cpu_queue[pcb.cpu_id as usize];
-        cpu_queue.enqueue(pcb);
+        cpu_queue[cpu_id as usize].enqueue(pcb);
+        // // 获取当前时间
+        // let time = unsafe { _rdtsc() };
+        // let freq = unsafe { Cpu_tsc_freq };
+        // // kdebug!("this is timeeeeeeer {},freq is {}, {}", time, freq, cpu_id);
+        // // 将当前时间加入负载记录队列
+        // self.load_list[cpu_id as usize].push_back(time);
+        // // 如果队首元素与当前时间差超过设定值,则移除队首元素
+        // while self.load_list[cpu_id as usize].len() > 1
+        //     && (time - *self.load_list[cpu_id as usize].front().unwrap() > 10000000000)
+        // {
+        //     self.load_list[cpu_id as usize].pop_front();
+        // }
     }
 }

+ 2 - 1
kernel/src/smp/smp.c

@@ -9,7 +9,8 @@
 
 #include <process/preempt.h>
 #include <sched/sched.h>
-
+#include <driver/acpi/acpi.h>
+#include <driver/interrupt/apic/apic.h>
 #include "ipi.h"
 
 static void __smp_kick_cpu_handler(uint64_t irq_num, uint64_t param, struct pt_regs *regs);

+ 0 - 2
kernel/src/smp/smp.h

@@ -2,8 +2,6 @@
 #include <common/glib.h>
 
 #include <common/asm.h>
-#include <driver/acpi/acpi.h>
-#include <driver/interrupt/apic/apic.h>
 
 #define MAX_SUPPORTED_PROCESSOR_NUM 1024