mod.rs 11 KB


  1. use std::{
  2. collections::BTreeMap,
  3. fmt::Debug,
  4. path::PathBuf,
  5. rc::Rc,
  6. sync::atomic::{AtomicI32, Ordering}, process::exit,
  7. };
  8. use log::{error, info};
  9. use crate::{console::Action, executor::Executor, parser::task::DADKTask};
  10. /// # 调度实体
  11. #[derive(Debug)]
  12. pub struct SchedEntity {
  13. /// 任务ID
  14. id: i32,
  15. file_path: PathBuf,
  16. /// 任务
  17. task: DADKTask,
  18. }
  19. impl SchedEntity {
  20. #[allow(dead_code)]
  21. pub fn id(&self) -> i32 {
  22. self.id
  23. }
  24. #[allow(dead_code)]
  25. pub fn file_path(&self) -> &PathBuf {
  26. &self.file_path
  27. }
  28. #[allow(dead_code)]
  29. pub fn task(&self) -> &DADKTask {
  30. &self.task
  31. }
  32. #[allow(dead_code)]
  33. pub fn task_mut(&mut self) -> &mut DADKTask {
  34. &mut self.task
  35. }
  36. }
  37. /// # 调度实体列表
  38. ///
  39. /// 用于存储所有的调度实体
  40. #[derive(Debug)]
  41. pub struct SchedEntities {
  42. /// 调度实体列表
  43. entities: Vec<Rc<SchedEntity>>,
  44. /// 任务ID到调度实体的映射
  45. id2entity: BTreeMap<i32, Rc<SchedEntity>>,
  46. /// 任务名和版本到调度实体的映射
  47. name_version_2_entity: BTreeMap<(String, String), Rc<SchedEntity>>,
  48. }
  49. impl SchedEntities {
  50. pub fn new() -> Self {
  51. Self {
  52. entities: Vec::new(),
  53. id2entity: BTreeMap::new(),
  54. name_version_2_entity: BTreeMap::new(),
  55. }
  56. }
  57. pub fn add(&mut self, entity: Rc<SchedEntity>) {
  58. self.entities.push(entity.clone());
  59. self.id2entity.insert(entity.id, entity.clone());
  60. self.name_version_2_entity.insert(
  61. (entity.task.name.clone(), entity.task.version.clone()),
  62. entity,
  63. );
  64. }
  65. #[allow(dead_code)]
  66. pub fn get(&self, id: i32) -> Option<Rc<SchedEntity>> {
  67. self.id2entity.get(&id).cloned()
  68. }
  69. pub fn get_by_name_version(&self, name: &str, version: &str) -> Option<Rc<SchedEntity>> {
  70. self.name_version_2_entity
  71. .get(&(name.to_string(), version.to_string()))
  72. .cloned()
  73. }
  74. pub fn iter(&self) -> impl Iterator<Item = &Rc<SchedEntity>> {
  75. self.entities.iter()
  76. }
  77. #[allow(dead_code)]
  78. pub fn len(&self) -> usize {
  79. self.entities.len()
  80. }
  81. #[allow(dead_code)]
  82. pub fn is_empty(&self) -> bool {
  83. self.entities.is_empty()
  84. }
  85. #[allow(dead_code)]
  86. pub fn clear(&mut self) {
  87. self.entities.clear();
  88. self.id2entity.clear();
  89. self.name_version_2_entity.clear();
  90. }
  91. pub fn topo_sort(&self) -> Vec<Rc<SchedEntity>> {
  92. let mut result = Vec::new();
  93. let mut visited = BTreeMap::new();
  94. for entity in self.entities.iter() {
  95. if !visited.contains_key(&entity.id) {
  96. let r = self.dfs(entity, &mut visited, &mut result);
  97. if r.is_err() {
  98. let err = r.unwrap_err();
  99. error!("{}", err.display());
  100. println!("Please fix the errors above and try again.");
  101. std::process::exit(1);
  102. }
  103. }
  104. }
  105. return result;
  106. }
  107. fn dfs(
  108. &self,
  109. entity: &Rc<SchedEntity>,
  110. visited: &mut BTreeMap<i32, bool>,
  111. result: &mut Vec<Rc<SchedEntity>>,
  112. ) -> Result<(), DependencyCycleError> {
  113. visited.insert(entity.id, false);
  114. for dep in entity.task.depends.iter() {
  115. if let Some(dep_entity) = self.get_by_name_version(&dep.name, &dep.version) {
  116. if let Some(&false) = visited.get(&dep_entity.id) {
  117. // 输出完整环形依赖
  118. let mut err = DependencyCycleError::new();
  119. err.add(
  120. entity.file_path.clone(),
  121. format!(
  122. "{} ({})",
  123. dep_entity.task.name_version(),
  124. dep_entity.file_path.display()
  125. ),
  126. );
  127. return Err(err);
  128. }
  129. if !visited.contains_key(&dep_entity.id) {
  130. let r = self.dfs(&dep_entity, visited, result);
  131. if r.is_err() {
  132. let mut err = r.unwrap_err();
  133. err.add(
  134. entity.file_path.clone(),
  135. format!(
  136. "{} ({})",
  137. dep_entity.task.name_version(),
  138. dep_entity.file_path.display()
  139. ),
  140. );
  141. return Err(err);
  142. }
  143. }
  144. } else {
  145. error!(
  146. "Dependency not found: {} -> {}",
  147. entity.task.name_version(),
  148. dep.name_version()
  149. );
  150. std::process::exit(1);
  151. }
  152. }
  153. visited.insert(entity.id, true);
  154. result.push(entity.clone());
  155. return Ok(());
  156. }
  157. }
  158. /// # 任务调度器
  159. #[derive(Debug)]
  160. pub struct Scheduler {
  161. /// DragonOS sysroot在主机上的路径
  162. dragonos_dir: PathBuf,
  163. /// 要执行的操作
  164. action: Action,
  165. /// 调度实体列表
  166. target: SchedEntities,
  167. }
  168. pub enum SchedulerError {
  169. TaskError(String),
  170. DependencyNotFound(Rc<SchedEntity>, String),
  171. RunError(String),
  172. }
  173. impl Debug for SchedulerError {
  174. fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
  175. match self {
  176. Self::TaskError(arg0) => {
  177. write!(f, "TaskError: {}", arg0)
  178. }
  179. SchedulerError::DependencyNotFound(current, msg) => {
  180. write!(
  181. f,
  182. "For task {}, dependency not found: {}. Please check file: {}",
  183. current.task.name_version(),
  184. msg,
  185. current.file_path.display()
  186. )
  187. }
  188. SchedulerError::RunError(msg) => {
  189. write!(f, "RunError: {}", msg)
  190. }
  191. }
  192. }
  193. }
  194. impl Scheduler {
  195. pub fn new(
  196. dragonos_dir: PathBuf,
  197. action: Action,
  198. tasks: Vec<(PathBuf, DADKTask)>,
  199. ) -> Result<Self, SchedulerError> {
  200. let entities = SchedEntities::new();
  201. let mut scheduler = Scheduler {
  202. dragonos_dir,
  203. action,
  204. target: entities,
  205. };
  206. let r = scheduler.add_tasks(tasks);
  207. if r.is_err() {
  208. error!("Error while adding tasks: {:?}", r);
  209. return Err(r.err().unwrap());
  210. }
  211. return Ok(scheduler);
  212. }
  213. /// # 添加多个任务
  214. ///
  215. /// 添加任务到调度器中,如果任务已经存在,则返回错误
  216. pub fn add_tasks(&mut self, tasks: Vec<(PathBuf, DADKTask)>) -> Result<(), SchedulerError> {
  217. for task in tasks {
  218. self.add_task(task.0, task.1)?;
  219. }
  220. return Ok(());
  221. }
  222. /// # 添加一个任务
  223. ///
  224. /// 添加任务到调度器中,如果任务已经存在,则返回错误
  225. pub fn add_task(&mut self, path: PathBuf, task: DADKTask) -> Result<(), SchedulerError> {
  226. let id: i32 = self.generate_task_id();
  227. let entity = Rc::new(SchedEntity {
  228. id,
  229. task,
  230. file_path: path.clone(),
  231. });
  232. let name_version = (entity.task.name.clone(), entity.task.version.clone());
  233. if self
  234. .target
  235. .get_by_name_version(&name_version.0, &name_version.1)
  236. .is_some()
  237. {
  238. return Err(SchedulerError::TaskError(format!(
  239. "Task with name [{}] and version [{}] already exists. Config file: {}",
  240. name_version.0,
  241. name_version.1,
  242. path.display()
  243. )));
  244. }
  245. self.target.add(entity.clone());
  246. info!("Task added: {}", entity.task.name_version());
  247. return Ok(());
  248. }
  249. fn generate_task_id(&self) -> i32 {
  250. static TASK_ID: AtomicI32 = AtomicI32::new(0);
  251. return TASK_ID.fetch_add(1, Ordering::SeqCst);
  252. }
  253. /// # 执行调度器中的所有任务
  254. pub fn run(&self) -> Result<(), SchedulerError> {
  255. // 检查是否有不存在的依赖
  256. let r = self.check_not_exists_dependency();
  257. if r.is_err() {
  258. error!("Error while checking tasks: {:?}", r);
  259. return r;
  260. }
  261. // 对调度实体进行拓扑排序
  262. let r: Vec<Rc<SchedEntity>> = self.target.topo_sort();
  263. crate::executor::prepare_env(&self.target)
  264. .map_err(|e| SchedulerError::RunError(format!("{:?}", e)))?;
  265. for entity in r.iter() {
  266. let executor = Executor::new(entity.clone()).map_err(|e| {
  267. error!(
  268. "Error while creating executor for task {} : {:?}",
  269. entity.task().name_version(),
  270. e
  271. );
  272. exit(-1);
  273. }).unwrap();
  274. executor.execute().map_err(|e| {
  275. error!(
  276. "Error while executing task {} : {:?}",
  277. entity.task().name_version(),
  278. e
  279. );
  280. exit(-1);
  281. }).unwrap();
  282. }
  283. return Ok(());
  284. }
  285. /// # 检查是否有不存在的依赖
  286. ///
  287. /// 如果某个任务的dependency中的任务不存在,则返回错误
  288. fn check_not_exists_dependency(&self) -> Result<(), SchedulerError> {
  289. for entity in self.target.iter() {
  290. for dependency in entity.task.depends.iter() {
  291. let name_version = (dependency.name.clone(), dependency.version.clone());
  292. if !self
  293. .target
  294. .get_by_name_version(&name_version.0, &name_version.1)
  295. .is_some()
  296. {
  297. return Err(SchedulerError::DependencyNotFound(
  298. entity.clone(),
  299. format!("name:{}, version:{}", name_version.0, name_version.1,),
  300. ));
  301. }
  302. }
  303. }
  304. return Ok(());
  305. }
  306. }
  307. /// # 环形依赖错误路径
  308. ///
  309. /// 本结构体用于在回溯过程中记录环形依赖的路径。
  310. ///
  311. /// 例如,假设有如下依赖关系:
  312. ///
  313. /// ```text
  314. /// A -> B -> C -> D -> A
  315. /// ```
  316. ///
  317. /// 则在DFS回溯过程中,会依次记录如下路径:
  318. ///
  319. /// ```text
  320. /// D -> A
  321. /// C -> D
  322. /// B -> C
  323. /// A -> B
  324. pub struct DependencyCycleError {
  325. dependencies: Vec<(PathBuf, String)>,
  326. }
  327. impl DependencyCycleError {
  328. pub fn new() -> Self {
  329. Self {
  330. dependencies: Vec::new(),
  331. }
  332. }
  333. pub fn add(&mut self, path: PathBuf, dependency: String) {
  334. self.dependencies.push((path, dependency));
  335. }
  336. #[allow(dead_code)]
  337. pub fn dependencies(&self) -> &Vec<(PathBuf, String)> {
  338. &self.dependencies
  339. }
  340. pub fn display(&self) -> String {
  341. let mut ret = format!("Dependency cycle detected: \nStart ->\n");
  342. for entity in self.dependencies.iter() {
  343. ret.push_str(&format!(
  344. "->\t{}\t--depends-->\t{}\n",
  345. entity.0.display(),
  346. entity.1
  347. ));
  348. }
  349. ret.push_str("-> End");
  350. return ret;
  351. }
  352. }