mod.rs 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576
  1. use std::{
  2. collections::BTreeMap,
  3. env::Vars,
  4. path::PathBuf,
  5. process::{Command, Stdio},
  6. sync::{Arc, RwLock},
  7. };
  8. use log::{debug, error, info, warn};
  9. use crate::{
  10. console::{clean::CleanLevel, Action},
  11. executor::cache::CacheDir,
  12. parser::task::{CodeSource, PrebuiltSource, TaskEnv, TaskType},
  13. scheduler::{SchedEntities, SchedEntity},
  14. utils::file::FileUtils,
  15. };
  16. use self::cache::CacheDirType;
  17. pub mod cache;
  18. pub mod source;
  19. pub mod target;
  20. lazy_static! {
  21. // 全局环境变量的列表
  22. pub static ref ENV_LIST: RwLock<EnvMap> = RwLock::new(EnvMap::new());
  23. }
  24. #[derive(Debug, Clone)]
  25. pub struct Executor {
  26. entity: Arc<SchedEntity>,
  27. action: Action,
  28. local_envs: EnvMap,
  29. /// 任务构建结果输出到的目录
  30. build_dir: CacheDir,
  31. /// 如果任务需要源文件缓存,则此字段为 Some(CacheDir),否则为 None(使用本地源文件路径)
  32. source_dir: Option<CacheDir>,
  33. /// DragonOS sysroot的路径
  34. dragonos_sysroot: PathBuf,
  35. }
  36. impl Executor {
  37. /// # 创建执行器
  38. ///
  39. /// 用于执行一个任务
  40. ///
  41. /// ## 参数
  42. ///
  43. /// * `entity` - 任务调度实体
  44. ///
  45. /// ## 返回值
  46. ///
  47. /// * `Ok(Executor)` - 创建成功
  48. /// * `Err(ExecutorError)` - 创建失败
  49. pub fn new(
  50. entity: Arc<SchedEntity>,
  51. action: Action,
  52. dragonos_sysroot: PathBuf,
  53. ) -> Result<Self, ExecutorError> {
  54. let local_envs = EnvMap::new();
  55. let build_dir = CacheDir::new(entity.clone(), CacheDirType::Build)?;
  56. let source_dir = if CacheDir::need_source_cache(&entity) {
  57. Some(CacheDir::new(entity.clone(), CacheDirType::Source)?)
  58. } else {
  59. None
  60. };
  61. let result: Executor = Self {
  62. action,
  63. entity,
  64. local_envs,
  65. build_dir,
  66. source_dir,
  67. dragonos_sysroot,
  68. };
  69. return Ok(result);
  70. }
  71. /// # 执行任务
  72. ///
  73. /// 创建执行器后,调用此方法执行任务。
  74. /// 该方法会执行以下步骤:
  75. ///
  76. /// 1. 创建工作线程
  77. /// 2. 准备环境变量
  78. /// 3. 拉取数据(可选)
  79. /// 4. 执行构建
  80. pub fn execute(&mut self) -> Result<(), ExecutorError> {
  81. info!("Execute task: {}", self.entity.task().name_version());
  82. // 准备本地环境变量
  83. self.prepare_local_env()?;
  84. match self.action {
  85. Action::Build => {
  86. // 构建任务
  87. self.build()?;
  88. }
  89. Action::Install => {
  90. // 把构建结果安装到DragonOS
  91. self.install()?;
  92. }
  93. Action::Clean(_) => {
  94. // 清理构建结果
  95. let r = self.clean();
  96. if let Err(e) = r {
  97. error!(
  98. "Failed to clean task {}: {:?}",
  99. self.entity.task().name_version(),
  100. e
  101. );
  102. }
  103. }
  104. _ => {
  105. error!("Unsupported action: {:?}", self.action);
  106. }
  107. }
  108. info!("Task {} finished", self.entity.task().name_version());
  109. return Ok(());
  110. }
  111. /// # 执行build操作
  112. fn build(&mut self) -> Result<(), ExecutorError> {
  113. self.mv_target_to_tmp()?;
  114. // 确认源文件就绪
  115. self.prepare_input()?;
  116. let command: Option<Command> = self.create_command()?;
  117. if let Some(cmd) = command {
  118. self.run_command(cmd)?;
  119. }
  120. // 检查构建结果,如果为空,则抛出警告
  121. if self.build_dir.is_empty()? {
  122. warn!(
  123. "Task {}: build result is empty, do you forget to copy the result to [${}]?",
  124. self.entity.task().name_version(),
  125. CacheDir::build_dir_env_key(&self.entity)?
  126. );
  127. }
  128. return Ok(());
  129. }
  130. /// # 执行安装操作,把构建结果安装到DragonOS
  131. fn install(&self) -> Result<(), ExecutorError> {
  132. let binding = self.entity.task();
  133. let in_dragonos_path = binding.install.in_dragonos_path.as_ref();
  134. // 如果没有指定安装路径,则不执行安装
  135. if in_dragonos_path.is_none() {
  136. return Ok(());
  137. }
  138. info!("Installing task: {}", self.entity.task().name_version());
  139. let mut in_dragonos_path = in_dragonos_path.unwrap().to_string_lossy().to_string();
  140. debug!("in_dragonos_path: {}", in_dragonos_path);
  141. // 去除开头的斜杠
  142. {
  143. let count_leading_slashes = in_dragonos_path.chars().take_while(|c| *c == '/').count();
  144. in_dragonos_path = in_dragonos_path[count_leading_slashes..].to_string();
  145. }
  146. // 拼接最终的安装路径
  147. let install_path = self.dragonos_sysroot.join(in_dragonos_path);
  148. debug!("install_path: {:?}", install_path);
  149. // 创建安装路径
  150. std::fs::create_dir_all(&install_path).map_err(|e| {
  151. ExecutorError::InstallError(format!("Failed to create install path: {}", e.to_string()))
  152. })?;
  153. // 拷贝构建结果到安装路径
  154. let build_dir: PathBuf = self.build_dir.path.clone();
  155. FileUtils::copy_dir_all(&build_dir, &install_path)
  156. .map_err(|e| ExecutorError::InstallError(e))?;
  157. info!("Task {} installed.", self.entity.task().name_version());
  158. // 安装完后,删除临时target文件
  159. if let Some(target) = self.entity.target() {
  160. target.clean_tmpdadk()?;
  161. }
  162. return Ok(());
  163. }
  164. fn clean(&self) -> Result<(), ExecutorError> {
  165. let level = if let Action::Clean(l) = self.action {
  166. l.level
  167. } else {
  168. panic!(
  169. "BUG: clean() called with non-clean action. executor details: {:?}",
  170. self
  171. );
  172. };
  173. info!(
  174. "Cleaning task: {}, level={level}",
  175. self.entity.task().name_version()
  176. );
  177. let r: Result<(), ExecutorError> = match level {
  178. CleanLevel::All => self.clean_all(),
  179. CleanLevel::Src => self.clean_src(),
  180. CleanLevel::Target => self.clean_target(),
  181. CleanLevel::Cache => self.clean_cache(),
  182. };
  183. if let Err(e) = r {
  184. error!(
  185. "Failed to clean task: {}, error message: {:?}",
  186. self.entity.task().name_version(),
  187. e
  188. );
  189. return Err(e);
  190. }
  191. return Ok(());
  192. }
  193. fn clean_all(&self) -> Result<(), ExecutorError> {
  194. // 在源文件目录执行清理
  195. self.clean_src()?;
  196. // 清理构建结果
  197. self.clean_target()?;
  198. // 清理缓存
  199. self.clean_cache()?;
  200. return Ok(());
  201. }
  202. /// 在源文件目录执行清理
  203. fn clean_src(&self) -> Result<(), ExecutorError> {
  204. let cmd: Option<Command> = self.create_command()?;
  205. if cmd.is_none() {
  206. // 如果这里没有命令,则认为用户不需要在源文件目录执行清理
  207. return Ok(());
  208. }
  209. info!(
  210. "{}: Cleaning in source directory: {:?}",
  211. self.entity.task().name_version(),
  212. self.src_work_dir()
  213. );
  214. let cmd = cmd.unwrap();
  215. self.run_command(cmd)?;
  216. return Ok(());
  217. }
  218. /// 清理构建输出目录
  219. fn clean_target(&self) -> Result<(), ExecutorError> {
  220. info!(
  221. "{}: Cleaning build target directory: {:?}",
  222. self.entity.task().name_version(),
  223. self.build_dir.path
  224. );
  225. return self.build_dir.remove_self_recursive();
  226. }
  227. /// 清理下载缓存
  228. fn clean_cache(&self) -> Result<(), ExecutorError> {
  229. let cache_dir = self.source_dir.as_ref();
  230. if cache_dir.is_none() {
  231. // 如果没有缓存目录,则认为用户不需要清理缓存
  232. return Ok(());
  233. }
  234. info!(
  235. "{}: Cleaning cache directory: {}",
  236. self.entity.task().name_version(),
  237. self.src_work_dir().display()
  238. );
  239. return cache_dir.unwrap().remove_self_recursive();
  240. }
  241. /// 获取源文件的工作目录
  242. fn src_work_dir(&self) -> PathBuf {
  243. if let Some(local_path) = self.entity.task().source_path() {
  244. return local_path;
  245. }
  246. return self.source_dir.as_ref().unwrap().path.clone();
  247. }
  248. /// 为任务创建命令
  249. fn create_command(&self) -> Result<Option<Command>, ExecutorError> {
  250. // 获取命令
  251. let raw_cmd = match self.entity.task().task_type {
  252. TaskType::BuildFromSource(_) => match self.action {
  253. Action::Build => self.entity.task().build.build_command.clone(),
  254. Action::Clean(_) => self.entity.task().clean.clean_command.clone(),
  255. _ => unimplemented!(
  256. "create_command: Action {:?} not supported yet.",
  257. self.action
  258. ),
  259. },
  260. TaskType::InstallFromPrebuilt(_) => match self.action {
  261. Action::Build => self.entity.task().build.build_command.clone(),
  262. Action::Clean(_) => self.entity.task().clean.clean_command.clone(),
  263. _ => unimplemented!(
  264. "create_command: Action {:?} not supported yet.",
  265. self.action
  266. ),
  267. },
  268. };
  269. if raw_cmd.is_none() {
  270. return Ok(None);
  271. }
  272. let raw_cmd = raw_cmd.unwrap();
  273. let mut command = Command::new("bash");
  274. command.current_dir(self.src_work_dir());
  275. // 设置参数
  276. command.arg("-c");
  277. command.arg(raw_cmd);
  278. // 设置环境变量
  279. let env_list = ENV_LIST.read().unwrap();
  280. for (key, value) in env_list.envs.iter() {
  281. // if key.starts_with("DADK") {
  282. // debug!("DADK env found: {}={}", key, value.value);
  283. // }
  284. command.env(key, value.value.clone());
  285. }
  286. drop(env_list);
  287. for (key, value) in self.local_envs.envs.iter() {
  288. command.env(key, value.value.clone());
  289. }
  290. return Ok(Some(command));
  291. }
  292. /// # 准备工作线程本地环境变量
  293. fn prepare_local_env(&mut self) -> Result<(), ExecutorError> {
  294. // 设置本地环境变量
  295. self.prepare_target_env()?;
  296. let binding = self.entity.task();
  297. let task_envs: Option<&Vec<TaskEnv>> = binding.envs.as_ref();
  298. if task_envs.is_none() {
  299. return Ok(());
  300. }
  301. let task_envs = task_envs.unwrap();
  302. for tv in task_envs.iter() {
  303. self.local_envs
  304. .add(EnvVar::new(tv.key().to_string(), tv.value().to_string()));
  305. }
  306. return Ok(());
  307. }
  308. fn prepare_input(&self) -> Result<(), ExecutorError> {
  309. // 拉取源文件
  310. let task = self.entity.task();
  311. match &task.task_type {
  312. TaskType::BuildFromSource(cs) => {
  313. if self.source_dir.is_none() {
  314. return Ok(());
  315. }
  316. let source_dir = self.source_dir.as_ref().unwrap();
  317. match cs {
  318. CodeSource::Git(git) => {
  319. git.prepare(source_dir)
  320. .map_err(|e| ExecutorError::PrepareEnvError(e))?;
  321. }
  322. // 本地源文件,不需要拉取
  323. CodeSource::Local(_) => return Ok(()),
  324. // 在线压缩包,需要下载
  325. CodeSource::Archive(archive) => {
  326. archive
  327. .download_unzip(source_dir)
  328. .map_err(|e| ExecutorError::PrepareEnvError(e))?;
  329. }
  330. }
  331. }
  332. TaskType::InstallFromPrebuilt(pb) => {
  333. match pb {
  334. // 本地源文件,不需要拉取
  335. PrebuiltSource::Local(local_source) => {
  336. let local_path = local_source.path();
  337. let target_path = &self.build_dir.path;
  338. FileUtils::copy_dir_all(&local_path, &target_path)
  339. .map_err(|e| ExecutorError::TaskFailed(e))?; // let mut cmd = "cp -r ".to_string();
  340. return Ok(());
  341. }
  342. // 在线压缩包,需要下载
  343. PrebuiltSource::Archive(archive) => {
  344. archive
  345. .download_unzip(&self.build_dir)
  346. .map_err(|e| ExecutorError::PrepareEnvError(e))?;
  347. }
  348. }
  349. }
  350. }
  351. return Ok(());
  352. }
  353. fn run_command(&self, mut command: Command) -> Result<(), ExecutorError> {
  354. let mut child = command
  355. .stdin(Stdio::inherit())
  356. .spawn()
  357. .map_err(|e| ExecutorError::IoError(e))?;
  358. // 等待子进程结束
  359. let r = child.wait().map_err(|e| ExecutorError::IoError(e));
  360. if r.is_ok() {
  361. let r = r.unwrap();
  362. if r.success() {
  363. return Ok(());
  364. } else {
  365. // 执行失败,获取最后100行stderr输出
  366. let errmsg = format!(
  367. "Task {} failed, exit code = {}",
  368. self.entity.task().name_version(),
  369. r.code().unwrap()
  370. );
  371. error!("{errmsg}");
  372. let command_opt = command.output();
  373. if command_opt.is_err() {
  374. return Err(ExecutorError::TaskFailed(
  375. "Failed to get command output".to_string(),
  376. ));
  377. }
  378. let command_opt = command_opt.unwrap();
  379. let command_output = String::from_utf8_lossy(&command_opt.stderr);
  380. let mut last_100_outputs = command_output
  381. .lines()
  382. .rev()
  383. .take(100)
  384. .collect::<Vec<&str>>();
  385. last_100_outputs.reverse();
  386. error!("Last 100 lines msg of stderr:");
  387. for line in last_100_outputs {
  388. error!("{}", line);
  389. }
  390. return Err(ExecutorError::TaskFailed(errmsg));
  391. }
  392. } else {
  393. let errmsg = format!(
  394. "Task {} failed, msg = {:?}",
  395. self.entity.task().name_version(),
  396. r.err().unwrap()
  397. );
  398. error!("{errmsg}");
  399. return Err(ExecutorError::TaskFailed(errmsg));
  400. }
  401. }
  402. pub fn mv_target_to_tmp(&mut self) -> Result<(), ExecutorError> {
  403. if let Some(rust_target) = self.entity.task().rust_target.clone() {
  404. // 将target文件拷贝至 /tmp 下对应的dadk文件的临时target文件中
  405. self.entity
  406. .target()
  407. .as_ref()
  408. .unwrap()
  409. .cp_to_tmp(&rust_target)?;
  410. }
  411. return Ok(());
  412. }
  413. pub fn prepare_target_env(&mut self) -> Result<(), ExecutorError> {
  414. if self.entity.task().rust_target.is_some() {
  415. // 如果有dadk任务有rust_target字段,需要设置DADK_RUST_TARGET_FILE环境变量,值为临时target文件路径
  416. self.entity
  417. .target()
  418. .as_ref()
  419. .unwrap()
  420. .prepare_env(&mut self.local_envs);
  421. }
  422. return Ok(());
  423. }
  424. }
  425. #[derive(Debug, Clone)]
  426. pub struct EnvMap {
  427. pub envs: BTreeMap<String, EnvVar>,
  428. }
  429. impl EnvMap {
  430. pub fn new() -> Self {
  431. Self {
  432. envs: BTreeMap::new(),
  433. }
  434. }
  435. pub fn add(&mut self, env: EnvVar) {
  436. self.envs.insert(env.key.clone(), env);
  437. }
  438. #[allow(dead_code)]
  439. pub fn get(&self, key: &str) -> Option<&EnvVar> {
  440. self.envs.get(key)
  441. }
  442. pub fn add_vars(&mut self, vars: Vars) {
  443. for (key, value) in vars {
  444. self.add(EnvVar::new(key, value));
  445. }
  446. }
  447. }
  448. /// # 环境变量
  449. #[derive(Debug, PartialEq, PartialOrd, Eq, Ord, Clone)]
  450. pub struct EnvVar {
  451. pub key: String,
  452. pub value: String,
  453. }
  454. impl EnvVar {
  455. pub fn new(key: String, value: String) -> Self {
  456. Self { key, value }
  457. }
  458. }
  459. /// # 任务执行器错误枚举
  460. #[allow(dead_code)]
  461. #[derive(Debug)]
  462. pub enum ExecutorError {
  463. /// 准备执行环境错误
  464. PrepareEnvError(String),
  465. IoError(std::io::Error),
  466. /// 构建执行错误
  467. TaskFailed(String),
  468. /// 安装错误
  469. InstallError(String),
  470. /// 清理错误
  471. CleanError(String),
  472. }
  473. /// # 准备全局环境变量
  474. pub fn prepare_env(sched_entities: &SchedEntities) -> Result<(), ExecutorError> {
  475. info!("Preparing environment variables...");
  476. // 获取当前全局环境变量列表
  477. let mut env_list = ENV_LIST.write().unwrap();
  478. let envs: Vars = std::env::vars();
  479. env_list.add_vars(envs);
  480. // 为每个任务创建特定的环境变量
  481. for entity in sched_entities.entities().iter() {
  482. // 导出任务的构建目录环境变量
  483. let build_dir = CacheDir::build_dir(entity.clone())?;
  484. let build_dir_key = CacheDir::build_dir_env_key(&entity)?;
  485. env_list.add(EnvVar::new(
  486. build_dir_key,
  487. build_dir.to_str().unwrap().to_string(),
  488. ));
  489. // 添加`DADK_CURRENT_BUILD_DIR`环境变量,便于构建脚本把构建结果拷贝到这里
  490. env_list.add(EnvVar::new(
  491. "DADK_CURRENT_BUILD_DIR".to_string(),
  492. build_dir.to_str().unwrap().to_string(),
  493. ));
  494. // 如果需要源码缓存目录,则导出
  495. if CacheDir::need_source_cache(entity) {
  496. let source_dir = CacheDir::source_dir(entity.clone())?;
  497. let source_dir_key = CacheDir::source_dir_env_key(&entity)?;
  498. env_list.add(EnvVar::new(
  499. source_dir_key,
  500. source_dir.to_str().unwrap().to_string(),
  501. ));
  502. }
  503. }
  504. // 查看环境变量列表
  505. // debug!("Environment variables:");
  506. // for (key, value) in env_list.envs.iter() {
  507. // debug!("{}: {}", key, value.value);
  508. // }
  509. return Ok(());
  510. }