Ver Fonte

并行执行多个DADK任务 (#15)

Jomo há 1 ano atrás
pai
commit
d3b91fdfc2
6 ficheiros alterados com 393 adições e 142 exclusões
  1. 4 0
      src/console/mod.rs
  2. 11 11
      src/executor/cache.rs
  3. 8 7
      src/executor/mod.rs
  4. 13 1
      src/main.rs
  5. 261 123
      src/scheduler/mod.rs
  6. 96 0
      src/scheduler/task_deque.rs

+ 4 - 0
src/console/mod.rs

@@ -39,6 +39,10 @@ pub struct CommandLineArgs {
     /// DADK缓存根目录
     #[arg(long, value_parser = parse_check_dir_exists)]
     pub cache_dir: Option<PathBuf>,
+
+    /// DADK任务并行线程数量
+    #[arg(short, long)]
+    pub thread: Option<usize>,
 }
 
 /// @brief 检查目录是否存在

+ 11 - 11
src/executor/cache.rs

@@ -1,4 +1,4 @@
-use std::{path::PathBuf, rc::Rc};
+use std::{path::PathBuf, sync::Arc};
 
 use log::info;
 
@@ -78,7 +78,7 @@ pub fn cache_root_init(path: Option<PathBuf>) -> Result<(), ExecutorError> {
 #[derive(Debug, Clone)]
 pub struct CacheDir {
     #[allow(dead_code)]
-    entity: Rc<SchedEntity>,
+    entity: Arc<SchedEntity>,
     pub path: PathBuf,
     pub cache_type: CacheDirType,
 }
@@ -92,9 +92,9 @@ pub enum CacheDirType {
 impl CacheDir {
     pub const DADK_BUILD_CACHE_DIR_ENV_KEY_PREFIX: &'static str = "DADK_BUILD_CACHE_DIR";
     pub const DADK_SOURCE_CACHE_DIR_ENV_KEY_PREFIX: &'static str = "DADK_SOURCE_CACHE_DIR";
-    pub fn new(entity: Rc<SchedEntity>, cache_type: CacheDirType) -> Result<Self, ExecutorError> {
+    pub fn new(entity: Arc<SchedEntity>, cache_type: CacheDirType) -> Result<Self, ExecutorError> {
         let task = entity.task();
-        let path = Self::get_path(task, cache_type);
+        let path = Self::get_path(&task, cache_type);
 
         let result = Self {
             entity,
@@ -122,15 +122,15 @@ impl CacheDir {
         return PathBuf::from(cache_dir);
     }
 
-    pub fn build_dir(entity: Rc<SchedEntity>) -> Result<PathBuf, ExecutorError> {
-        return Ok(Self::new(entity, CacheDirType::Build)?.path);
+    pub fn build_dir(entity: Arc<SchedEntity>) -> Result<PathBuf, ExecutorError> {
+        return Ok(Self::new(entity.clone(), CacheDirType::Build)?.path);
     }
 
-    pub fn source_dir(entity: Rc<SchedEntity>) -> Result<PathBuf, ExecutorError> {
-        return Ok(Self::new(entity, CacheDirType::Source)?.path);
+    pub fn source_dir(entity: Arc<SchedEntity>) -> Result<PathBuf, ExecutorError> {
+        return Ok(Self::new(entity.clone(), CacheDirType::Source)?.path);
     }
 
-    pub fn build_dir_env_key(entity: &Rc<SchedEntity>) -> Result<String, ExecutorError> {
+    pub fn build_dir_env_key(entity: &Arc<SchedEntity>) -> Result<String, ExecutorError> {
         let name_version_env = entity.task().name_version_env();
         return Ok(format!(
             "{}_{}",
@@ -139,7 +139,7 @@ impl CacheDir {
         ));
     }
 
-    pub fn source_dir_env_key(entity: &Rc<SchedEntity>) -> Result<String, ExecutorError> {
+    pub fn source_dir_env_key(entity: &Arc<SchedEntity>) -> Result<String, ExecutorError> {
         let name_version_env = entity.task().name_version_env();
         return Ok(format!(
             "{}_{}",
@@ -148,7 +148,7 @@ impl CacheDir {
         ));
     }
 
-    pub fn need_source_cache(entity: &Rc<SchedEntity>) -> bool {
+    pub fn need_source_cache(entity: &Arc<SchedEntity>) -> bool {
         let task_type = &entity.task().task_type;
 
         if let TaskType::BuildFromSource(cs) = task_type {

+ 8 - 7
src/executor/mod.rs

@@ -3,8 +3,7 @@ use std::{
     env::Vars,
     path::PathBuf,
     process::{Command, Stdio},
-    rc::Rc,
-    sync::RwLock,
+    sync::{Arc, RwLock},
 };
 
 use log::{debug, error, info, warn};
@@ -30,7 +29,7 @@ lazy_static! {
 
 #[derive(Debug, Clone)]
 pub struct Executor {
-    entity: Rc<SchedEntity>,
+    entity: Arc<SchedEntity>,
     action: Action,
     local_envs: EnvMap,
     /// 任务构建结果输出到的目录
@@ -55,7 +54,7 @@ impl Executor {
     /// * `Ok(Executor)` - 创建成功
     /// * `Err(ExecutorError)` - 创建失败
     pub fn new(
-        entity: Rc<SchedEntity>,
+        entity: Arc<SchedEntity>,
         action: Action,
         dragonos_sysroot: PathBuf,
     ) -> Result<Self, ExecutorError> {
@@ -148,7 +147,8 @@ impl Executor {
 
     /// # 执行安装操作,把构建结果安装到DragonOS
     fn install(&self) -> Result<(), ExecutorError> {
-        let in_dragonos_path = self.entity.task().install.in_dragonos_path.as_ref();
+        let binding = self.entity.task();
+        let in_dragonos_path = binding.install.in_dragonos_path.as_ref();
         // 如果没有指定安装路径,则不执行安装
         if in_dragonos_path.is_none() {
             return Ok(());
@@ -336,7 +336,8 @@ impl Executor {
         // 设置本地环境变量
         self.prepare_target_env()?;
 
-        let task_envs: Option<&Vec<TaskEnv>> = self.entity.task().envs.as_ref();
+        let binding = self.entity.task();
+        let task_envs: Option<&Vec<TaskEnv>> = binding.envs.as_ref();
         if task_envs.is_none() {
             return Ok(());
         }
@@ -538,7 +539,7 @@ pub fn prepare_env(sched_entities: &SchedEntities) -> Result<(), ExecutorError>
     env_list.add_vars(envs);
 
     // 为每个任务创建特定的环境变量
-    for entity in sched_entities.iter() {
+    for entity in sched_entities.entities().iter() {
         // 导出任务的构建目录环境变量
         let build_dir = CacheDir::build_dir(entity.clone())?;
 

+ 13 - 1
src/main.rs

@@ -83,6 +83,7 @@
 //! - 支持自动更新
 //! - 完善clean命令的逻辑
 
+#![feature(extract_if)]
 #![feature(io_error_more)]
 
 #[macro_use]
@@ -103,7 +104,7 @@ use simple_logger::SimpleLogger;
 use crate::{
     console::{interactive::InteractiveConsole, CommandLineArgs},
     executor::cache::cache_root_init,
-    scheduler::Scheduler,
+    scheduler::{task_deque::TASK_DEQUE, Scheduler},
 };
 
 mod console;
@@ -124,6 +125,7 @@ fn main() {
     let dragonos_dir = args.dragonos_dir.clone();
     let config_dir = args.config_dir.clone();
     let action = args.action;
+    let thread = args.thread;
     info!(
         "DragonOS sysroot dir: {}",
         dragonos_dir
@@ -137,6 +139,12 @@ fn main() {
             .map_or_else(|| "None".to_string(), |d| d.display().to_string())
     );
     info!("Action: {:?}", action);
+    info!(
+        "Thread num: {}",
+        thread
+            .as_ref()
+            .map_or_else(|| "None".to_string(), |d| d.to_string())
+    );
 
     match action {
         console::Action::New => {
@@ -150,6 +158,10 @@ fn main() {
         _ => {}
     }
 
+    if let Some(thread) = thread {
+        TASK_DEQUE.lock().unwrap().set_thread(thread);
+    }
+
     // 初始化缓存目录
     let r = cache_root_init(args.cache_dir);
     if r.is_err() {

+ 261 - 123
src/scheduler/mod.rs

@@ -1,10 +1,13 @@
 use std::{
-    collections::BTreeMap,
+    collections::{BTreeMap, HashMap},
     fmt::Debug,
     path::PathBuf,
     process::exit,
-    rc::Rc,
-    sync::atomic::{AtomicI32, Ordering},
+    sync::{
+        atomic::{AtomicI32, Ordering},
+        Arc, Mutex, RwLock,
+    },
+    thread::ThreadId,
 };
 
 use log::{error, info};
@@ -15,53 +18,103 @@ use crate::{
     parser::task::DADKTask,
 };
 
-/// # 调度实体
+use self::task_deque::TASK_DEQUE;
+
+pub mod task_deque;
+
+lazy_static! {
+    // 线程id与任务实体id映射表
+    pub static ref TID_EID: Mutex<HashMap<ThreadId,i32>> = Mutex::new(HashMap::new());
+}
+
+/// # 调度实体内部结构
 #[derive(Debug, Clone)]
-pub struct SchedEntity {
+pub struct InnerEntity {
     /// 任务ID
     id: i32,
     file_path: PathBuf,
     /// 任务
     task: DADKTask,
+    /// 入度
+    indegree: usize,
+    /// 子节点
+    children: Vec<Arc<SchedEntity>>,
     /// target管理
     target: Option<Target>,
 }
 
+/// # 调度实体
+#[derive(Debug)]
+pub struct SchedEntity {
+    inner: Mutex<InnerEntity>,
+}
+
 impl PartialEq for SchedEntity {
     fn eq(&self, other: &Self) -> bool {
-        self.id == other.id
+        self.inner.lock().unwrap().id == other.inner.lock().unwrap().id
     }
 }
 
 impl SchedEntity {
     #[allow(dead_code)]
     pub fn id(&self) -> i32 {
-        self.id
+        self.inner.lock().unwrap().id
     }
 
     #[allow(dead_code)]
-    pub fn file_path(&self) -> &PathBuf {
-        &self.file_path
+    pub fn file_path(&self) -> PathBuf {
+        self.inner.lock().unwrap().file_path.clone()
     }
 
     #[allow(dead_code)]
-    pub fn task(&self) -> &DADKTask {
-        &self.task
+    pub fn task(&self) -> DADKTask {
+        self.inner.lock().unwrap().task.clone()
     }
 
-    #[allow(dead_code)]
-    pub fn task_mut(&mut self) -> &mut DADKTask {
-        &mut self.task
+    /// 入度加1
+    pub fn add_indegree(&self) {
+        self.inner.lock().unwrap().indegree += 1;
     }
 
-    #[allow(dead_code)]
-    pub fn target(&self) -> &Option<Target> {
-        &self.target
+    /// 入度减1
+    pub fn sub_indegree(&self) -> usize {
+        self.inner.lock().unwrap().indegree -= 1;
+        return self.inner.lock().unwrap().indegree;
     }
 
-    #[allow(dead_code)]
-    pub fn target_mut(&mut self) -> &mut Option<Target> {
-        &mut self.target
+    /// 增加子节点
+    pub fn add_child(&self, entity: Arc<SchedEntity>) {
+        self.inner.lock().unwrap().children.push(entity);
+    }
+
+    /// 获取入度
+    pub fn indegree(&self) -> usize {
+        self.inner.lock().unwrap().indegree
+    }
+
+    /// 获取target
+    pub fn target(&self) -> Option<Target> {
+        self.inner.lock().unwrap().target.clone()
+    }
+
+    /// 当前任务完成后,所有子节点入度减1
+    ///
+    /// ## 参数
+    ///
+    /// 无
+    ///
+    /// ## 返回值
+    ///
+    /// 所有入度为0的子节点集合
+    pub fn sub_children_indegree(&self) -> Vec<Arc<SchedEntity>> {
+        let mut zero_child = Vec::new();
+        let children = &self.inner.lock().unwrap().children;
+        for child in children.iter() {
+            if child.sub_indegree() == 0 {
+                zero_child.push(child.clone());
+            }
+        }
+        return zero_child;
     }
 }
 
@@ -70,68 +123,72 @@ impl SchedEntity {
 /// 用于存储所有的调度实体
 #[derive(Debug)]
 pub struct SchedEntities {
-    /// 调度实体列表
-    entities: Vec<Rc<SchedEntity>>,
     /// 任务ID到调度实体的映射
-    id2entity: BTreeMap<i32, Rc<SchedEntity>>,
-    /// 任务名和版本到调度实体的映射
-    name_version_2_entity: BTreeMap<String, Rc<SchedEntity>>,
+    id2entity: RwLock<BTreeMap<i32, Arc<SchedEntity>>>,
 }
 
 impl SchedEntities {
     pub fn new() -> Self {
         Self {
-            entities: Vec::new(),
-            id2entity: BTreeMap::new(),
-            name_version_2_entity: BTreeMap::new(),
+            id2entity: RwLock::new(BTreeMap::new()),
         }
     }
 
-    pub fn add(&mut self, entity: Rc<SchedEntity>) {
-        self.entities.push(entity.clone());
-        self.id2entity.insert(entity.id, entity.clone());
-        self.name_version_2_entity
-            .insert(entity.task.name_version_env(), entity);
+    pub fn add(&mut self, entity: Arc<SchedEntity>) {
+        self.id2entity
+            .write()
+            .unwrap()
+            .insert(entity.id(), entity.clone());
     }
 
     #[allow(dead_code)]
-    pub fn get(&self, id: i32) -> Option<Rc<SchedEntity>> {
-        self.id2entity.get(&id).cloned()
+    pub fn get(&self, id: i32) -> Option<Arc<SchedEntity>> {
+        self.id2entity.read().unwrap().get(&id).cloned()
+    }
+
+    pub fn get_by_name_version(&self, name: &str, version: &str) -> Option<Arc<SchedEntity>> {
+        for e in self.id2entity.read().unwrap().iter() {
+            if e.1.task().name_version_env() == DADKTask::name_version_uppercase(name, version) {
+                return Some(e.1.clone());
+            }
+        }
+        return None;
     }
 
-    pub fn get_by_name_version(&self, name: &str, version: &str) -> Option<Rc<SchedEntity>> {
-        self.name_version_2_entity
-            .get(&DADKTask::name_version_uppercase(name, version))
-            .cloned()
+    pub fn entities(&self) -> Vec<Arc<SchedEntity>> {
+        let mut v = Vec::new();
+        for e in self.id2entity.read().unwrap().iter() {
+            v.push(e.1.clone());
+        }
+        return v;
     }
 
-    pub fn iter(&self) -> impl Iterator<Item = &Rc<SchedEntity>> {
-        self.entities.iter()
+    pub fn id2entity(&self) -> BTreeMap<i32, Arc<SchedEntity>> {
+        self.id2entity.read().unwrap().clone()
     }
 
     #[allow(dead_code)]
     pub fn len(&self) -> usize {
-        self.entities.len()
+        self.id2entity.read().unwrap().len()
     }
 
     #[allow(dead_code)]
     pub fn is_empty(&self) -> bool {
-        self.entities.is_empty()
+        self.id2entity.read().unwrap().is_empty()
     }
 
     #[allow(dead_code)]
     pub fn clear(&mut self) {
-        self.entities.clear();
-        self.id2entity.clear();
-        self.name_version_2_entity.clear();
+        self.id2entity.write().unwrap().clear();
     }
 
-    pub fn topo_sort(&self) -> Vec<Rc<SchedEntity>> {
+    pub fn topo_sort(&self) -> Vec<Arc<SchedEntity>> {
         let mut result = Vec::new();
         let mut visited = BTreeMap::new();
-        for entity in self.entities.iter() {
-            if !visited.contains_key(&entity.id) {
-                let r = self.dfs(entity, &mut visited, &mut result);
+        let btree = self.id2entity.write().unwrap().clone();
+        for entity in btree.iter() {
+            if !visited.contains_key(entity.0) {
+                let r = self.dfs(entity.1, &mut visited, &mut result);
                 if r.is_err() {
                     let err = r.unwrap_err();
                     error!("{}", err.display());
@@ -145,21 +202,27 @@ impl SchedEntities {
 
     fn dfs(
         &self,
-        entity: &Rc<SchedEntity>,
+        entity: &Arc<SchedEntity>,
         visited: &mut BTreeMap<i32, bool>,
-        result: &mut Vec<Rc<SchedEntity>>,
+        result: &mut Vec<Arc<SchedEntity>>,
     ) -> Result<(), DependencyCycleError> {
-        visited.insert(entity.id, false);
-        for dep in entity.task.depends.iter() {
+        visited.insert(entity.id(), false);
+        for dep in entity.task().depends.iter() {
             if let Some(dep_entity) = self.get_by_name_version(&dep.name, &dep.version) {
-                if let Some(&false) = visited.get(&dep_entity.id) {
+                let guard = self.id2entity.write().unwrap();
+                let e = guard.get(&entity.id()).unwrap();
+                let d = guard.get(&dep_entity.id()).unwrap();
+                e.add_indegree();
+                d.add_child(e.clone());
+                if let Some(&false) = visited.get(&dep_entity.id()) {
                     // 输出完整环形依赖
                     let mut err = DependencyCycleError::new(dep_entity.clone());
 
                     err.add(entity.clone(), dep_entity);
                     return Err(err);
                 }
-                if !visited.contains_key(&dep_entity.id) {
+                if !visited.contains_key(&dep_entity.id()) {
+                    drop(guard);
                     let r = self.dfs(&dep_entity, visited, result);
                     if r.is_err() {
                         let mut err: DependencyCycleError = r.unwrap_err();
@@ -178,13 +241,13 @@ impl SchedEntities {
             } else {
                 error!(
                     "Dependency not found: {} -> {}",
-                    entity.task.name_version(),
+                    entity.task().name_version(),
                     dep.name_version()
                 );
                 std::process::exit(1);
             }
         }
-        visited.insert(entity.id, true);
+        visited.insert(entity.id(), true);
         result.push(entity.clone());
         return Ok(());
     }
@@ -203,7 +266,7 @@ pub struct Scheduler {
 
 pub enum SchedulerError {
     TaskError(String),
-    DependencyNotFound(Rc<SchedEntity>, String),
+    DependencyNotFound(Arc<SchedEntity>, String),
     RunError(String),
 }
 
@@ -217,9 +280,9 @@ impl Debug for SchedulerError {
                 write!(
                     f,
                     "For task {}, dependency not found: {}. Please check file: {}",
-                    current.task.name_version(),
+                    current.task().name_version(),
                     msg,
-                    current.file_path.display()
+                    current.file_path().display()
                 )
             }
             SchedulerError::RunError(msg) => {
@@ -268,14 +331,20 @@ impl Scheduler {
     /// 添加任务到调度器中,如果任务已经存在,则返回错误
     pub fn add_task(&mut self, path: PathBuf, task: DADKTask) -> Result<(), SchedulerError> {
         let id: i32 = self.generate_task_id();
+        let indegree: usize = 0;
+        let children = Vec::new();
         let target = self.generate_task_target(&path, &task.rust_target)?;
-        let entity = Rc::new(SchedEntity {
-            id,
-            task,
-            file_path: path.clone(),
-            target,
+        let entity = Arc::new(SchedEntity {
+            inner: Mutex::new(InnerEntity {
+                id,
+                task,
+                file_path: path.clone(),
+                indegree,
+                children,
+                target,
+            }),
         });
-        let name_version = (entity.task.name.clone(), entity.task.version.clone());
+        let name_version = (entity.task().name.clone(), entity.task().version.clone());
 
         if self
             .target
@@ -292,7 +361,7 @@ impl Scheduler {
 
         self.target.add(entity.clone());
 
-        info!("Task added: {}", entity.task.name_version());
+        info!("Task added: {}", entity.task().name_version());
         return Ok(());
     }
 
@@ -364,14 +433,39 @@ impl Scheduler {
         }
 
         // 对调度实体进行拓扑排序
-        let r: Vec<Rc<SchedEntity>> = self.target.topo_sort();
-
-        for entity in r.iter() {
-            let mut executor = Executor::new(
-                entity.clone(),
-                self.action.clone(),
-                self.dragonos_dir.clone(),
-            )
+        let r: Vec<Arc<SchedEntity>> = self.target.topo_sort();
+
+        let action = self.action.clone();
+        let dragonos_dir = self.dragonos_dir.clone();
+        let id2entity = self.target.id2entity();
+        let count = r.len();
+
+        // 启动守护线程
+        let handler = std::thread::spawn(move || {
+            Self::build_install_daemon(action, dragonos_dir, id2entity, count, &r)
+        });
+
+        handler.join().expect("Could not join deamon");
+
+        return Ok(());
+    }
+
+    /// Action不需要按照拓扑序执行
+    fn run_without_topo_sort(&self) -> Result<(), SchedulerError> {
+        // 启动守护线程
+        let action = self.action.clone();
+        let dragonos_dir = self.dragonos_dir.clone();
+        let mut r = self.target.entities();
+        let handler = std::thread::spawn(move || {
+            Self::clean_daemon(action, dragonos_dir, &mut r);
+        });
+
+        handler.join().expect("Could not join deamon");
+        return Ok(());
+    }
+
+    pub fn execute(action: Action, dragonos_dir: PathBuf, entity: Arc<SchedEntity>) {
+        let mut executor = Executor::new(entity.clone(), action.clone(), dragonos_dir.clone())
             .map_err(|e| {
                 error!(
                     "Error while creating executor for task {} : {:?}",
@@ -382,60 +476,104 @@ impl Scheduler {
             })
             .unwrap();
 
-            executor
-                .execute()
-                .map_err(|e| {
-                    error!(
-                        "Error while executing task {} : {:?}",
-                        entity.task().name_version(),
-                        e
-                    );
-                    exit(-1);
-                })
-                .unwrap();
-        }
-        return Ok(());
-    }
-
-    /// Action不需要按照拓扑序执行
-    fn run_without_topo_sort(&self) -> Result<(), SchedulerError> {
-        for entity in self.target.iter() {
-            let mut executor = Executor::new(
-                entity.clone(),
-                self.action.clone(),
-                self.dragonos_dir.clone(),
-            )
+        executor
+            .execute()
             .map_err(|e| {
                 error!(
-                    "Error while creating executor for task {} : {:?}",
+                    "Error while executing task {} : {:?}",
                     entity.task().name_version(),
                     e
                 );
                 exit(-1);
             })
             .unwrap();
+    }
 
-            executor
-                .execute()
-                .map_err(|e| {
-                    error!(
-                        "Error while executing task {} : {:?}",
-                        entity.task().name_version(),
-                        e
-                    );
-                    exit(-1);
-                })
-                .unwrap();
+    /// 构建和安装DADK任务的守护线程
+    ///
+    /// ## 参数
+    ///
+    /// - `action` : 要执行的操作
+    /// - `dragonos_dir` : DragonOS sysroot在主机上的路径
+    /// - `id2entity` : DADK任务id与实体映射表
+    /// - `count` : 当前剩余任务数
+    /// - `r` : 总任务实体表
+    ///
+    /// ## 返回值
+    ///
+    /// 无
+    pub fn build_install_daemon(
+        action: Action,
+        dragonos_dir: PathBuf,
+        id2entity: BTreeMap<i32, Arc<SchedEntity>>,
+        mut count: usize,
+        r: &Vec<Arc<SchedEntity>>,
+    ) {
+        log::warn!("daemon");
+        let mut guard = TASK_DEQUE.lock().unwrap();
+        // 初始化0入度的任务实体
+        let mut zero_entity: Vec<Arc<SchedEntity>> = Vec::new();
+        for e in r.iter() {
+            if e.indegree() == 0 {
+                zero_entity.push(e.clone());
+            }
+        }
+
+        while count > 0 {
+            // 将入度为0的任务实体加入任务队列中,直至没有入度为0的任务实体 或 任务队列满了
+            while !zero_entity.is_empty()
+                && guard.build_install_task(
+                    action.clone(),
+                    dragonos_dir.clone(),
+                    zero_entity.last().unwrap().clone(),
+                )
+            {
+                zero_entity.pop();
+            }
+
+            let queue = guard.queue_mut();
+            // 如果任务线程已完成,将其从任务队列中删除,并把它的子节点入度减1,如果有0入度子节点,则加入zero_entity,后续可以加入任务队列中
+            queue.retain(|x| {
+                if x.is_finished() {
+                    count -= 1;
+                    let tid = x.thread().id();
+                    let eid = *TID_EID.lock().unwrap().get(&tid).unwrap();
+                    let entity = id2entity.get(&eid).unwrap();
+                    let zero = entity.sub_children_indegree();
+                    for e in zero.iter() {
+                        zero_entity.push(e.clone());
+                    }
+                    return false;
+                }
+                return true;
+            })
+        }
+    }
+
+    /// 清理DADK任务的守护线程
+    ///
+    /// ## 参数
+    ///
+    /// - `action` : 要执行的操作
+    /// - `dragonos_dir` : DragonOS sysroot在主机上的路径
+    /// - `r` : 总任务实体表
+    ///
+    /// ## 返回值
+    ///
+    /// 无
+    pub fn clean_daemon(action: Action, dragonos_dir: PathBuf, r: &mut Vec<Arc<SchedEntity>>) {
+        let mut guard = TASK_DEQUE.lock().unwrap();
+        while !guard.queue().is_empty() && !r.is_empty() {
+            guard.clean_task(action, dragonos_dir.clone(), r.pop().unwrap().clone());
         }
-        return Ok(());
     }
 
     /// # 检查是否有不存在的依赖
     ///
     /// 如果某个任务的dependency中的任务不存在,则返回错误
     fn check_not_exists_dependency(&self) -> Result<(), SchedulerError> {
-        for entity in self.target.iter() {
-            for dependency in entity.task.depends.iter() {
+        for entity in self.target.entities().iter() {
+            for dependency in entity.task().depends.iter() {
                 let name_version = (dependency.name.clone(), dependency.version.clone());
                 if !self
                     .target
@@ -474,15 +612,15 @@ impl Scheduler {
 pub struct DependencyCycleError {
     /// # 起始实体
     /// 本错误的起始实体,即环形依赖的起点
-    head_entity: Rc<SchedEntity>,
+    head_entity: Arc<SchedEntity>,
     /// 是否停止传播
     stop_propagation: bool,
     /// 依赖关系
-    dependencies: Vec<(Rc<SchedEntity>, Rc<SchedEntity>)>,
+    dependencies: Vec<(Arc<SchedEntity>, Arc<SchedEntity>)>,
 }
 
 impl DependencyCycleError {
-    pub fn new(head_entity: Rc<SchedEntity>) -> Self {
+    pub fn new(head_entity: Arc<SchedEntity>) -> Self {
         Self {
             head_entity,
             stop_propagation: false,
@@ -490,7 +628,7 @@ impl DependencyCycleError {
         }
     }
 
-    pub fn add(&mut self, current: Rc<SchedEntity>, dependency: Rc<SchedEntity>) {
+    pub fn add(&mut self, current: Arc<SchedEntity>, dependency: Arc<SchedEntity>) {
         self.dependencies.push((current, dependency));
     }
 
@@ -499,7 +637,7 @@ impl DependencyCycleError {
     }
 
     #[allow(dead_code)]
-    pub fn dependencies(&self) -> &Vec<(Rc<SchedEntity>, Rc<SchedEntity>)> {
+    pub fn dependencies(&self) -> &Vec<(Arc<SchedEntity>, Arc<SchedEntity>)> {
         &self.dependencies
     }
 
@@ -511,10 +649,10 @@ impl DependencyCycleError {
         for (current, dep) in tmp.iter() {
             ret.push_str(&format!(
                 "->\t{} ({})\t--depends-->\t{} ({})\n",
-                current.task.name_version(),
-                current.file_path.display(),
-                dep.task.name_version(),
-                dep.file_path.display()
+                current.task().name_version(),
+                current.file_path().display(),
+                dep.task().name_version(),
+                dep.file_path().display()
             ));
         }
         ret.push_str("-> End");

+ 96 - 0
src/scheduler/task_deque.rs

@@ -0,0 +1,96 @@
+use std::{
+    path::PathBuf,
+    sync::{Arc, Mutex},
+    thread::JoinHandle,
+};
+
+use crate::{console::Action, scheduler::TID_EID};
+
+use super::{SchedEntity, Scheduler};
+
+// 最大线程数
+pub const MAX_THREAD_NUM: usize = 32;
+// 默认线程数
+pub const DEFAULT_THREAD_NUM: usize = 2;
+
+lazy_static! {
+    // 全局任务队列
+    pub static ref TASK_DEQUE: Mutex<TaskDeque> = Mutex::new(TaskDeque {
+        max_num: DEFAULT_THREAD_NUM,
+        queue: Vec::new(),
+    });
+}
+
+/// # 任务队列
+pub struct TaskDeque {
+    max_num: usize,
+    queue: Vec<JoinHandle<()>>,
+}
+
+impl TaskDeque {
+    /// 将构建或安装DADK任务添加到任务队列中
+    ///
+    /// ## 参数
+    ///
+    /// - `action` : 要执行的操作
+    /// - `dragonos_dir` : DragonOS sysroot在主机上的路径
+    /// - `entity` : 任务实体
+    ///
+    /// ## 返回值
+    ///
+    /// true 任务添加成功
+    /// false 任务添加失败
+    pub fn build_install_task(
+        &mut self,
+        action: Action,
+        dragonos_dir: PathBuf,
+        entity: Arc<SchedEntity>,
+    ) -> bool {
+        // log::warn!("push stack: task:{} {entity:?}", entity.id());
+        if self.queue.len() < self.max_num {
+            let id = entity.id();
+            let handler = std::thread::spawn(move || {
+                Scheduler::execute(action, dragonos_dir.clone(), entity)
+            });
+            TID_EID.lock().unwrap().insert(handler.thread().id(), id);
+            self.queue.push(handler);
+            return true;
+        }
+        return false;
+    }
+
+    /// 将清理DADK任务添加到任务队列中
+    ///
+    /// ## 参数
+    ///
+    /// - `action` : 要执行的操作
+    /// - `dragonos_dir` : DragonOS sysroot在主机上的路径
+    /// - `entity` : 任务实体
+    ///
+    /// ## 返回值
+    ///
+    /// 无
+    pub fn clean_task(&mut self, action: Action, dragonos_dir: PathBuf, entity: Arc<SchedEntity>) {
+        while self.queue.len() >= self.max_num {
+            self.queue.retain(|x| !x.is_finished());
+        }
+        let handler =
+            std::thread::spawn(move || Scheduler::execute(action, dragonos_dir.clone(), entity));
+        self.queue.push(handler);
+    }
+
+    pub fn queue(&self) -> &Vec<JoinHandle<()>> {
+        return &self.queue;
+    }
+
+    pub fn queue_mut(&mut self) -> &mut Vec<JoinHandle<()>> {
+        return &mut self.queue;
+    }
+
+    pub fn set_thread(&mut self, mut thread: usize) {
+        if thread > MAX_THREAD_NUM {
+            thread = MAX_THREAD_NUM;
+        }
+        self.max_num = thread;
+    }
+}