mod.rs 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570
  1. use std::{
  2. collections::BTreeMap,
  3. env::Vars,
  4. path::PathBuf,
  5. process::{Command, Stdio},
  6. rc::Rc,
  7. sync::RwLock,
  8. };
  9. use log::{debug, error, info, warn};
  10. use crate::{
  11. console::{clean::CleanLevel, Action},
  12. executor::cache::CacheDir,
  13. parser::task::{CodeSource, PrebuiltSource, TaskEnv, TaskType},
  14. scheduler::{SchedEntities, SchedEntity},
  15. utils::file::FileUtils,
  16. };
  17. use self::cache::CacheDirType;
  18. pub mod cache;
  19. pub mod source;
  20. pub mod target;
  21. lazy_static! {
  22. // 全局环境变量的列表
  23. pub static ref ENV_LIST: RwLock<EnvMap> = RwLock::new(EnvMap::new());
  24. }
  25. #[derive(Debug, Clone)]
  26. pub struct Executor {
  27. entity: Rc<SchedEntity>,
  28. action: Action,
  29. local_envs: EnvMap,
  30. /// 任务构建结果输出到的目录
  31. build_dir: CacheDir,
  32. /// 如果任务需要源文件缓存,则此字段为 Some(CacheDir),否则为 None(使用本地源文件路径)
  33. source_dir: Option<CacheDir>,
  34. /// DragonOS sysroot的路径
  35. dragonos_sysroot: PathBuf,
  36. }
  37. impl Executor {
  38. /// # 创建执行器
  39. ///
  40. /// 用于执行一个任务
  41. ///
  42. /// ## 参数
  43. ///
  44. /// * `entity` - 任务调度实体
  45. ///
  46. /// ## 返回值
  47. ///
  48. /// * `Ok(Executor)` - 创建成功
  49. /// * `Err(ExecutorError)` - 创建失败
  50. pub fn new(
  51. entity: Rc<SchedEntity>,
  52. action: Action,
  53. dragonos_sysroot: PathBuf,
  54. ) -> Result<Self, ExecutorError> {
  55. let local_envs = EnvMap::new();
  56. let build_dir = CacheDir::new(entity.clone(), CacheDirType::Build)?;
  57. let source_dir = if CacheDir::need_source_cache(&entity) {
  58. Some(CacheDir::new(entity.clone(), CacheDirType::Source)?)
  59. } else {
  60. None
  61. };
  62. let result: Executor = Self {
  63. action,
  64. entity,
  65. local_envs,
  66. build_dir,
  67. source_dir,
  68. dragonos_sysroot,
  69. };
  70. return Ok(result);
  71. }
  72. /// # 执行任务
  73. ///
  74. /// 创建执行器后,调用此方法执行任务。
  75. /// 该方法会执行以下步骤:
  76. ///
  77. /// 1. 创建工作线程
  78. /// 2. 准备环境变量
  79. /// 3. 拉取数据(可选)
  80. /// 4. 执行构建
  81. pub fn execute(&mut self) -> Result<(), ExecutorError> {
  82. info!("Execute task: {}", self.entity.task().name_version());
  83. // 准备本地环境变量
  84. self.prepare_local_env()?;
  85. match self.action {
  86. Action::Build => {
  87. // 构建任务
  88. self.build()?;
  89. }
  90. Action::Install => {
  91. // 把构建结果安装到DragonOS
  92. self.install()?;
  93. }
  94. Action::Clean(_) => {
  95. // 清理构建结果
  96. let r = self.clean();
  97. if let Err(e) = r {
  98. error!(
  99. "Failed to clean task {}: {:?}",
  100. self.entity.task().name_version(),
  101. e
  102. );
  103. }
  104. }
  105. _ => {
  106. error!("Unsupported action: {:?}", self.action);
  107. }
  108. }
  109. info!("Task {} finished", self.entity.task().name_version());
  110. return Ok(());
  111. }
  112. /// # 执行build操作
  113. fn build(&mut self) -> Result<(), ExecutorError> {
  114. self.mv_target_to_tmp()?;
  115. // 确认源文件就绪
  116. self.prepare_input()?;
  117. let command: Option<Command> = self.create_command()?;
  118. if let Some(cmd) = command {
  119. self.run_command(cmd)?;
  120. }
  121. // 检查构建结果,如果为空,则抛出警告
  122. if self.build_dir.is_empty()? {
  123. warn!(
  124. "Task {}: build result is empty, do you forget to copy the result to [${}]?",
  125. self.entity.task().name_version(),
  126. CacheDir::build_dir_env_key(&self.entity)?
  127. );
  128. }
  129. return Ok(());
  130. }
  131. /// # 执行安装操作,把构建结果安装到DragonOS
  132. fn install(&self) -> Result<(), ExecutorError> {
  133. let in_dragonos_path = self.entity.task().install.in_dragonos_path.as_ref();
  134. // 如果没有指定安装路径,则不执行安装
  135. if in_dragonos_path.is_none() {
  136. return Ok(());
  137. }
  138. info!("Installing task: {}", self.entity.task().name_version());
  139. let mut in_dragonos_path = in_dragonos_path.unwrap().to_string_lossy().to_string();
  140. debug!("in_dragonos_path: {}", in_dragonos_path);
  141. // 去除开头的斜杠
  142. {
  143. let count_leading_slashes = in_dragonos_path.chars().take_while(|c| *c == '/').count();
  144. in_dragonos_path = in_dragonos_path[count_leading_slashes..].to_string();
  145. }
  146. // 拼接最终的安装路径
  147. let install_path = self.dragonos_sysroot.join(in_dragonos_path);
  148. debug!("install_path: {:?}", install_path);
  149. // 创建安装路径
  150. std::fs::create_dir_all(&install_path).map_err(|e| {
  151. ExecutorError::InstallError(format!("Failed to create install path: {}", e.to_string()))
  152. })?;
  153. // 拷贝构建结果到安装路径
  154. let build_dir: PathBuf = self.build_dir.path.clone();
  155. FileUtils::copy_dir_all(&build_dir, &install_path)
  156. .map_err(|e| ExecutorError::InstallError(e))?;
  157. info!("Task {} installed.", self.entity.task().name_version());
  158. // 安装完后,删除临时target文件
  159. if let Some(target) = self.entity.target() {
  160. target.clean_tmpdadk()?;
  161. }
  162. return Ok(());
  163. }
  164. fn clean(&self) -> Result<(), ExecutorError> {
  165. let level = if let Action::Clean(l) = self.action {
  166. l.level
  167. } else {
  168. panic!(
  169. "BUG: clean() called with non-clean action. executor details: {:?}",
  170. self
  171. );
  172. };
  173. info!(
  174. "Cleaning task: {}, level={level}",
  175. self.entity.task().name_version()
  176. );
  177. let r: Result<(), ExecutorError> = match level {
  178. CleanLevel::All => self.clean_all(),
  179. CleanLevel::Src => self.clean_src(),
  180. CleanLevel::Target => self.clean_target(),
  181. CleanLevel::Cache => self.clean_cache(),
  182. };
  183. if let Err(e) = r {
  184. error!(
  185. "Failed to clean task: {}, error message: {:?}",
  186. self.entity.task().name_version(),
  187. e
  188. );
  189. return Err(e);
  190. }
  191. return Ok(());
  192. }
  193. fn clean_all(&self) -> Result<(), ExecutorError> {
  194. // 在源文件目录执行清理
  195. self.clean_src()?;
  196. // 清理构建结果
  197. self.clean_target()?;
  198. // 清理缓存
  199. self.clean_cache()?;
  200. return Ok(());
  201. }
  202. /// 在源文件目录执行清理
  203. fn clean_src(&self) -> Result<(), ExecutorError> {
  204. let cmd: Option<Command> = self.create_command()?;
  205. if cmd.is_none() {
  206. // 如果这里没有命令,则认为用户不需要在源文件目录执行清理
  207. return Ok(());
  208. }
  209. info!(
  210. "{}: Cleaning in source directory: {:?}",
  211. self.entity.task().name_version(),
  212. self.src_work_dir()
  213. );
  214. let cmd = cmd.unwrap();
  215. self.run_command(cmd)?;
  216. return Ok(());
  217. }
  218. /// 清理构建输出目录
  219. fn clean_target(&self) -> Result<(), ExecutorError> {
  220. info!(
  221. "{}: Cleaning build target directory: {:?}",
  222. self.entity.task().name_version(),
  223. self.build_dir.path
  224. );
  225. return self.build_dir.remove_self_recursive();
  226. }
  227. /// 清理下载缓存
  228. fn clean_cache(&self) -> Result<(), ExecutorError> {
  229. let cache_dir = self.source_dir.as_ref();
  230. if cache_dir.is_none() {
  231. // 如果没有缓存目录,则认为用户不需要清理缓存
  232. return Ok(());
  233. }
  234. info!(
  235. "{}: Cleaning cache directory: {}",
  236. self.entity.task().name_version(),
  237. self.src_work_dir().display()
  238. );
  239. return cache_dir.unwrap().remove_self_recursive();
  240. }
  241. /// 获取源文件的工作目录
  242. fn src_work_dir(&self) -> PathBuf {
  243. if let Some(local_path) = self.entity.task().source_path() {
  244. return local_path;
  245. }
  246. return self.source_dir.as_ref().unwrap().path.clone();
  247. }
  248. /// 为任务创建命令
  249. fn create_command(&self) -> Result<Option<Command>, ExecutorError> {
  250. // 获取命令
  251. let raw_cmd = match self.entity.task().task_type {
  252. TaskType::BuildFromSource(_) => match self.action {
  253. Action::Build => self.entity.task().build.build_command.clone(),
  254. Action::Clean(_) => self.entity.task().clean.clean_command.clone(),
  255. _ => unimplemented!(
  256. "create_command: Action {:?} not supported yet.",
  257. self.action
  258. ),
  259. },
  260. TaskType::InstallFromPrebuilt(_) => match self.action {
  261. Action::Build => self.entity.task().build.build_command.clone(),
  262. Action::Clean(_) => self.entity.task().clean.clean_command.clone(),
  263. _ => unimplemented!(
  264. "create_command: Action {:?} not supported yet.",
  265. self.action
  266. ),
  267. },
  268. };
  269. if raw_cmd.is_none() {
  270. return Ok(None);
  271. }
  272. let raw_cmd = raw_cmd.unwrap();
  273. let mut command = Command::new("bash");
  274. command.current_dir(self.src_work_dir());
  275. // 设置参数
  276. command.arg("-c");
  277. command.arg(raw_cmd);
  278. // 设置环境变量
  279. let env_list = ENV_LIST.read().unwrap();
  280. for (key, value) in env_list.envs.iter() {
  281. // if key.starts_with("DADK") {
  282. // debug!("DADK env found: {}={}", key, value.value);
  283. // }
  284. command.env(key, value.value.clone());
  285. }
  286. drop(env_list);
  287. for (key, value) in self.local_envs.envs.iter() {
  288. command.env(key, value.value.clone());
  289. }
  290. return Ok(Some(command));
  291. }
  292. /// # 准备工作线程本地环境变量
  293. fn prepare_local_env(&mut self) -> Result<(), ExecutorError> {
  294. // 设置本地环境变量
  295. self.prepare_target_env()?;
  296. let task_envs: Option<&Vec<TaskEnv>> = self.entity.task().envs.as_ref();
  297. if task_envs.is_none() {
  298. return Ok(());
  299. }
  300. let task_envs = task_envs.unwrap();
  301. for tv in task_envs.iter() {
  302. self.local_envs
  303. .add(EnvVar::new(tv.key().to_string(), tv.value().to_string()));
  304. }
  305. return Ok(());
  306. }
  307. fn prepare_input(&self) -> Result<(), ExecutorError> {
  308. // 拉取源文件
  309. let task = self.entity.task();
  310. match &task.task_type {
  311. TaskType::BuildFromSource(cs) => {
  312. if self.source_dir.is_none() {
  313. return Ok(());
  314. }
  315. let source_dir = self.source_dir.as_ref().unwrap();
  316. match cs {
  317. CodeSource::Git(git) => {
  318. git.prepare(source_dir)
  319. .map_err(|e| ExecutorError::PrepareEnvError(e))?;
  320. }
  321. // 本地源文件,不需要拉取
  322. CodeSource::Local(_) => return Ok(()),
  323. // 在线压缩包,需要下载
  324. CodeSource::Archive(archive) => {
  325. archive
  326. .download_unzip(source_dir)
  327. .map_err(|e| ExecutorError::PrepareEnvError(e))?;
  328. }
  329. }
  330. }
  331. TaskType::InstallFromPrebuilt(pb) => {
  332. match pb {
  333. // 本地源文件,不需要拉取
  334. PrebuiltSource::Local(local_source) => {
  335. let local_path = local_source.path();
  336. let target_path = &self.build_dir.path;
  337. FileUtils::copy_dir_all(&local_path, &target_path)
  338. .map_err(|e| ExecutorError::TaskFailed(e))?; // let mut cmd = "cp -r ".to_string();
  339. return Ok(());
  340. }
  341. // 在线压缩包,需要下载
  342. PrebuiltSource::Archive(archive) => {
  343. archive
  344. .download_unzip(&self.build_dir)
  345. .map_err(|e| ExecutorError::PrepareEnvError(e))?;
  346. }
  347. }
  348. }
  349. }
  350. return Ok(());
  351. }
  352. fn run_command(&self, mut command: Command) -> Result<(), ExecutorError> {
  353. let mut child = command
  354. .stdin(Stdio::inherit())
  355. .spawn()
  356. .map_err(|e| ExecutorError::IoError(e))?;
  357. // 等待子进程结束
  358. let r = child.wait().map_err(|e| ExecutorError::IoError(e));
  359. if r.is_ok() {
  360. let r = r.unwrap();
  361. if r.success() {
  362. return Ok(());
  363. } else {
  364. // 执行失败,获取最后100行stderr输出
  365. let errmsg = format!(
  366. "Task {} failed, exit code = {}",
  367. self.entity.task().name_version(),
  368. r.code().unwrap()
  369. );
  370. error!("{errmsg}");
  371. let command_opt = command.output();
  372. if command_opt.is_err() {
  373. return Err(ExecutorError::TaskFailed(
  374. "Failed to get command output".to_string(),
  375. ));
  376. }
  377. let command_opt = command_opt.unwrap();
  378. let command_output = String::from_utf8_lossy(&command_opt.stderr);
  379. let mut last_100_outputs = command_output
  380. .lines()
  381. .rev()
  382. .take(100)
  383. .collect::<Vec<&str>>();
  384. last_100_outputs.reverse();
  385. error!("Last 100 lines msg of stderr:");
  386. for line in last_100_outputs {
  387. error!("{}", line);
  388. }
  389. return Err(ExecutorError::TaskFailed(errmsg));
  390. }
  391. } else {
  392. let errmsg = format!(
  393. "Task {} failed, msg = {:?}",
  394. self.entity.task().name_version(),
  395. r.err().unwrap()
  396. );
  397. error!("{errmsg}");
  398. return Err(ExecutorError::TaskFailed(errmsg));
  399. }
  400. }
  401. pub fn mv_target_to_tmp(&mut self) -> Result<(), ExecutorError> {
  402. if let Some(rust_target) = self.entity.task().rust_target.clone() {
  403. // 将target文件拷贝至 /tmp 下对应的dadk文件的临时target文件中
  404. self.entity
  405. .target()
  406. .as_ref()
  407. .unwrap()
  408. .cp_to_tmp(&rust_target)?;
  409. }
  410. return Ok(());
  411. }
  412. pub fn prepare_target_env(&mut self) -> Result<(), ExecutorError> {
  413. if self.entity.task().rust_target.is_some() {
  414. // 如果有dadk任务有rust_target字段,需要设置DADK_RUST_TARGET_FILE环境变量,值为临时target文件路径
  415. self.entity
  416. .target()
  417. .as_ref()
  418. .unwrap()
  419. .prepare_env(&mut self.local_envs);
  420. }
  421. return Ok(());
  422. }
  423. }
  424. #[derive(Debug, Clone)]
  425. pub struct EnvMap {
  426. pub envs: BTreeMap<String, EnvVar>,
  427. }
  428. impl EnvMap {
  429. pub fn new() -> Self {
  430. Self {
  431. envs: BTreeMap::new(),
  432. }
  433. }
  434. pub fn add(&mut self, env: EnvVar) {
  435. self.envs.insert(env.key.clone(), env);
  436. }
  437. #[allow(dead_code)]
  438. pub fn get(&self, key: &str) -> Option<&EnvVar> {
  439. self.envs.get(key)
  440. }
  441. pub fn add_vars(&mut self, vars: Vars) {
  442. for (key, value) in vars {
  443. self.add(EnvVar::new(key, value));
  444. }
  445. }
  446. }
  447. /// # 环境变量
  448. #[derive(Debug, PartialEq, PartialOrd, Eq, Ord, Clone)]
  449. pub struct EnvVar {
  450. pub key: String,
  451. pub value: String,
  452. }
  453. impl EnvVar {
  454. pub fn new(key: String, value: String) -> Self {
  455. Self { key, value }
  456. }
  457. }
  458. /// # 任务执行器错误枚举
  459. #[allow(dead_code)]
  460. #[derive(Debug)]
  461. pub enum ExecutorError {
  462. /// 准备执行环境错误
  463. PrepareEnvError(String),
  464. IoError(std::io::Error),
  465. /// 构建执行错误
  466. TaskFailed(String),
  467. /// 安装错误
  468. InstallError(String),
  469. /// 清理错误
  470. CleanError(String),
  471. }
  472. /// # 准备全局环境变量
  473. pub fn prepare_env(sched_entities: &SchedEntities) -> Result<(), ExecutorError> {
  474. info!("Preparing environment variables...");
  475. // 获取当前全局环境变量列表
  476. let mut env_list = ENV_LIST.write().unwrap();
  477. let envs: Vars = std::env::vars();
  478. env_list.add_vars(envs);
  479. // 为每个任务创建特定的环境变量
  480. for entity in sched_entities.iter() {
  481. // 导出任务的构建目录环境变量
  482. let build_dir = CacheDir::build_dir(entity.clone())?;
  483. let build_dir_key = CacheDir::build_dir_env_key(&entity)?;
  484. env_list.add(EnvVar::new(
  485. build_dir_key,
  486. build_dir.to_str().unwrap().to_string(),
  487. ));
  488. // 如果需要源码缓存目录,则导出
  489. if CacheDir::need_source_cache(entity) {
  490. let source_dir = CacheDir::source_dir(entity.clone())?;
  491. let source_dir_key = CacheDir::source_dir_env_key(&entity)?;
  492. env_list.add(EnvVar::new(
  493. source_dir_key,
  494. source_dir.to_str().unwrap().to_string(),
  495. ));
  496. }
  497. }
  498. // 查看环境变量列表
  499. // debug!("Environment variables:");
  500. // for (key, value) in env_list.envs.iter() {
  501. // debug!("{}: {}", key, value.value);
  502. // }
  503. return Ok(());
  504. }