mod.rs 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540
  1. use std::{
  2. collections::BTreeMap,
  3. env::Vars,
  4. path::PathBuf,
  5. process::{Command, Stdio},
  6. rc::Rc,
  7. sync::RwLock,
  8. };
  9. use log::{error, info, warn, debug};
  10. use crate::{
  11. console::{clean::CleanLevel, Action},
  12. executor::cache::CacheDir,
  13. parser::task::{CodeSource, PrebuiltSource, TaskEnv, TaskType},
  14. scheduler::{SchedEntities, SchedEntity},
  15. utils::stdio::StdioUtils,
  16. };
  17. use self::cache::CacheDirType;
  18. pub mod cache;
  19. pub mod source;
  20. lazy_static! {
  21. // 全局环境变量的列表
  22. pub static ref ENV_LIST: RwLock<EnvMap> = RwLock::new(EnvMap::new());
  23. }
  24. #[derive(Debug, Clone)]
  25. pub struct Executor {
  26. entity: Rc<SchedEntity>,
  27. action: Action,
  28. local_envs: EnvMap,
  29. /// 任务构建结果输出到的目录
  30. build_dir: CacheDir,
  31. /// 如果任务需要源文件缓存,则此字段为 Some(CacheDir),否则为 None(使用本地源文件路径)
  32. source_dir: Option<CacheDir>,
  33. /// DragonOS sysroot的路径
  34. dragonos_sysroot: PathBuf,
  35. }
  36. impl Executor {
  37. /// # 创建执行器
  38. ///
  39. /// 用于执行一个任务
  40. ///
  41. /// ## 参数
  42. ///
  43. /// * `entity` - 任务调度实体
  44. ///
  45. /// ## 返回值
  46. ///
  47. /// * `Ok(Executor)` - 创建成功
  48. /// * `Err(ExecutorError)` - 创建失败
  49. pub fn new(
  50. entity: Rc<SchedEntity>,
  51. action: Action,
  52. dragonos_sysroot: PathBuf,
  53. ) -> Result<Self, ExecutorError> {
  54. let local_envs = EnvMap::new();
  55. let build_dir = CacheDir::new(entity.clone(), CacheDirType::Build)?;
  56. let source_dir = if CacheDir::need_source_cache(&entity) {
  57. Some(CacheDir::new(entity.clone(), CacheDirType::Source)?)
  58. } else {
  59. None
  60. };
  61. let result: Executor = Self {
  62. action,
  63. entity,
  64. local_envs,
  65. build_dir,
  66. source_dir,
  67. dragonos_sysroot,
  68. };
  69. return Ok(result);
  70. }
  71. /// # 执行任务
  72. ///
  73. /// 创建执行器后,调用此方法执行任务。
  74. /// 该方法会执行以下步骤:
  75. ///
  76. /// 1. 创建工作线程
  77. /// 2. 准备环境变量
  78. /// 3. 拉取数据(可选)
  79. /// 4. 执行构建
  80. pub fn execute(&mut self) -> Result<(), ExecutorError> {
  81. info!("Execute task: {}", self.entity.task().name_version());
  82. // 准备本地环境变量
  83. self.prepare_local_env()?;
  84. match self.action {
  85. Action::Build => {
  86. // 构建任务
  87. self.build()?;
  88. }
  89. Action::Install => {
  90. // 把构建结果安装到DragonOS
  91. self.install()?;
  92. }
  93. Action::Clean(_) => {
  94. // 清理构建结果
  95. let r = self.clean();
  96. if let Err(e) = r {
  97. error!("Failed to clean task {}: {:?}", self.entity.task().name_version(), e);
  98. }
  99. }
  100. _ => {
  101. error!("Unsupported action: {:?}", self.action);
  102. }
  103. }
  104. info!("Task {} finished", self.entity.task().name_version());
  105. return Ok(());
  106. }
  107. /// # 执行build操作
  108. fn build(&mut self) -> Result<(), ExecutorError> {
  109. // 确认源文件就绪
  110. self.prepare_input()?;
  111. let command: Option<Command> = self.create_command()?;
  112. if let Some(cmd) = command {
  113. self.run_command(cmd)?;
  114. }
  115. // 检查构建结果,如果为空,则抛出警告
  116. if self.build_dir.is_empty()? {
  117. warn!(
  118. "Task {}: build result is empty, do you forget to copy the result to [${}]?",
  119. self.entity.task().name_version(),
  120. CacheDir::build_dir_env_key(&self.entity)?
  121. );
  122. }
  123. return Ok(());
  124. }
  125. /// # 执行安装操作,把构建结果安装到DragonOS
  126. fn install(&self) -> Result<(), ExecutorError> {
  127. let in_dragonos_path = self.entity.task().install.in_dragonos_path.as_ref();
  128. // 如果没有指定安装路径,则不执行安装
  129. if in_dragonos_path.is_none() {
  130. return Ok(());
  131. }
  132. info!("Installing task: {}", self.entity.task().name_version());
  133. let mut in_dragonos_path = in_dragonos_path.unwrap().to_string_lossy().to_string();
  134. debug!("in_dragonos_path: {}", in_dragonos_path);
  135. // 去除开头的斜杠
  136. {
  137. let count_leading_slashes = in_dragonos_path.chars().take_while(|c| *c == '/').count();
  138. in_dragonos_path = in_dragonos_path[count_leading_slashes..].to_string();
  139. }
  140. // 拼接最终的安装路径
  141. let install_path = self.dragonos_sysroot.join(in_dragonos_path);
  142. debug!("install_path: {:?}", install_path);
  143. // 创建安装路径
  144. std::fs::create_dir_all(&install_path).map_err(|e| {
  145. ExecutorError::InstallError(format!("Failed to create install path: {}", e.to_string()))
  146. })?;
  147. // 拷贝构建结果到安装路径
  148. let build_dir: PathBuf = self.build_dir.path.clone();
  149. let cmd = Command::new("cp")
  150. .arg("-r")
  151. .arg(build_dir.to_string_lossy().to_string() + "/.")
  152. .arg(install_path)
  153. .stdout(Stdio::null())
  154. .stderr(Stdio::piped())
  155. .spawn()
  156. .map_err(|e| {
  157. ExecutorError::InstallError(format!(
  158. "Failed to install, error message: {}",
  159. e.to_string()
  160. ))
  161. })?;
  162. let output = cmd.wait_with_output().map_err(|e| {
  163. ExecutorError::InstallError(format!(
  164. "Failed to install, error message: {}",
  165. e.to_string()
  166. ))
  167. })?;
  168. if !output.status.success() {
  169. let err_msg = StdioUtils::tail_n_str(StdioUtils::stderr_to_lines(&output.stderr), 10);
  170. return Err(ExecutorError::InstallError(format!(
  171. "Failed to install, error message: {}",
  172. err_msg
  173. )));
  174. }
  175. info!("Task {} installed.", self.entity.task().name_version());
  176. return Ok(());
  177. }
  178. fn clean(&self) -> Result<(), ExecutorError> {
  179. let level = if let Action::Clean(l) = self.action {
  180. l.level
  181. } else {
  182. panic!(
  183. "BUG: clean() called with non-clean action. executor details: {:?}",
  184. self
  185. );
  186. };
  187. info!(
  188. "Cleaning task: {}, level={level}",
  189. self.entity.task().name_version()
  190. );
  191. let r: Result<(), ExecutorError> = match level {
  192. CleanLevel::All => self.clean_all(),
  193. CleanLevel::Src => self.clean_src(),
  194. CleanLevel::Target => self.clean_target(),
  195. CleanLevel::Cache => self.clean_cache(),
  196. };
  197. if let Err(e) = r {
  198. error!(
  199. "Failed to clean task: {}, error message: {:?}",
  200. self.entity.task().name_version(),
  201. e
  202. );
  203. return Err(e);
  204. }
  205. return Ok(());
  206. }
  207. fn clean_all(&self) -> Result<(), ExecutorError> {
  208. // 在源文件目录执行清理
  209. self.clean_src()?;
  210. // 清理构建结果
  211. self.clean_target()?;
  212. // 清理缓存
  213. self.clean_cache()?;
  214. return Ok(());
  215. }
  216. /// 在源文件目录执行清理
  217. fn clean_src(&self) -> Result<(), ExecutorError> {
  218. let cmd: Option<Command> = self.create_command()?;
  219. if cmd.is_none() {
  220. // 如果这里没有命令,则认为用户不需要在源文件目录执行清理
  221. return Ok(());
  222. }
  223. info!(
  224. "{}: Cleaning in source directory: {:?}",
  225. self.entity.task().name_version(),
  226. self.src_work_dir()
  227. );
  228. let cmd = cmd.unwrap();
  229. self.run_command(cmd)?;
  230. return Ok(());
  231. }
  232. /// 清理构建输出目录
  233. fn clean_target(&self) -> Result<(), ExecutorError> {
  234. info!(
  235. "{}: Cleaning build target directory: {:?}",
  236. self.entity.task().name_version(),
  237. self.build_dir.path
  238. );
  239. return self.build_dir.remove_self_recursive();
  240. }
  241. /// 清理下载缓存
  242. fn clean_cache(&self) -> Result<(), ExecutorError> {
  243. let cache_dir = self.source_dir.as_ref();
  244. if cache_dir.is_none() {
  245. // 如果没有缓存目录,则认为用户不需要清理缓存
  246. return Ok(());
  247. }
  248. info!(
  249. "{}: Cleaning cache directory: {}",
  250. self.entity.task().name_version(),
  251. self.src_work_dir().display()
  252. );
  253. return cache_dir.unwrap().remove_self_recursive();
  254. }
  255. /// 获取源文件的工作目录
  256. fn src_work_dir(&self) -> PathBuf {
  257. if let Some(local_path) = self.entity.task().source_path() {
  258. return local_path;
  259. }
  260. return self.source_dir.as_ref().unwrap().path.clone();
  261. }
  262. /// 为任务创建命令
  263. fn create_command(&self) -> Result<Option<Command>, ExecutorError> {
  264. // 获取命令
  265. let raw_cmd = match self.entity.task().task_type {
  266. TaskType::BuildFromSource(_) => match self.action {
  267. Action::Build => self.entity.task().build.build_command.clone(),
  268. Action::Clean(_) => self.entity.task().clean.clean_command.clone(),
  269. _ => unimplemented!(
  270. "create_command: Action {:?} not supported yet.",
  271. self.action
  272. ),
  273. },
  274. _ => None,
  275. };
  276. if raw_cmd.is_none() {
  277. return Ok(None);
  278. }
  279. let raw_cmd = raw_cmd.unwrap();
  280. let mut command = Command::new("bash");
  281. command.current_dir(self.src_work_dir());
  282. // 设置参数
  283. command.arg("-c");
  284. command.arg(raw_cmd);
  285. // 设置环境变量
  286. let env_list = ENV_LIST.read().unwrap();
  287. for (key, value) in env_list.envs.iter() {
  288. // if key.starts_with("DADK") {
  289. // debug!("DADK env found: {}={}", key, value.value);
  290. // }
  291. command.env(key, value.value.clone());
  292. }
  293. drop(env_list);
  294. for (key, value) in self.local_envs.envs.iter() {
  295. command.env(key, value.value.clone());
  296. }
  297. return Ok(Some(command));
  298. }
  299. /// # 准备工作线程本地环境变量
  300. fn prepare_local_env(&mut self) -> Result<(), ExecutorError> {
  301. // 设置本地环境变量
  302. let task_envs: Option<&Vec<TaskEnv>> = self.entity.task().envs.as_ref();
  303. if task_envs.is_none() {
  304. return Ok(());
  305. }
  306. let task_envs = task_envs.unwrap();
  307. for tv in task_envs.iter() {
  308. self.local_envs
  309. .add(EnvVar::new(tv.key().to_string(), tv.value().to_string()));
  310. }
  311. return Ok(());
  312. }
  313. fn prepare_input(&self) -> Result<(), ExecutorError> {
  314. // 拉取源文件
  315. if self.source_dir.is_none() {
  316. return Ok(());
  317. }
  318. let task = self.entity.task();
  319. let source_dir = self.source_dir.as_ref().unwrap();
  320. match &task.task_type {
  321. TaskType::BuildFromSource(cs) => {
  322. match cs {
  323. CodeSource::Git(git) => {
  324. git.prepare(source_dir)
  325. .map_err(|e| ExecutorError::PrepareEnvError(e))?;
  326. }
  327. // 本地源文件,不需要拉取
  328. CodeSource::Local(_) => return Ok(()),
  329. // 在线压缩包,需要下载
  330. CodeSource::Archive(_) => todo!(),
  331. }
  332. }
  333. TaskType::InstallFromPrebuilt(pb) => {
  334. match pb {
  335. // 本地源文件,不需要拉取
  336. PrebuiltSource::Local(_) => return Ok(()),
  337. // 在线压缩包,需要下载
  338. PrebuiltSource::Archive(_) => todo!(),
  339. }
  340. }
  341. }
  342. return Ok(());
  343. }
  344. fn run_command(&self, mut command: Command) -> Result<(), ExecutorError> {
  345. let mut child = command
  346. .stdin(Stdio::inherit())
  347. .spawn()
  348. .map_err(|e| ExecutorError::IoError(e))?;
  349. // 等待子进程结束
  350. let r = child.wait().map_err(|e| ExecutorError::IoError(e));
  351. if r.is_ok() {
  352. let r = r.unwrap();
  353. if r.success() {
  354. return Ok(());
  355. } else {
  356. // 执行失败,获取最后100行stderr输出
  357. let errmsg = format!(
  358. "Task {} failed, exit code = {}",
  359. self.entity.task().name_version(),
  360. r.code().unwrap()
  361. );
  362. error!("{errmsg}");
  363. let command_opt = command.output();
  364. if command_opt.is_err() {
  365. return Err(ExecutorError::TaskFailed(
  366. "Failed to get command output".to_string(),
  367. ));
  368. }
  369. let command_opt = command_opt.unwrap();
  370. let command_output = String::from_utf8_lossy(&command_opt.stderr);
  371. let mut last_100_outputs = command_output
  372. .lines()
  373. .rev()
  374. .take(100)
  375. .collect::<Vec<&str>>();
  376. last_100_outputs.reverse();
  377. error!("Last 100 lines msg of stderr:");
  378. for line in last_100_outputs {
  379. error!("{}", line);
  380. }
  381. return Err(ExecutorError::TaskFailed(errmsg));
  382. }
  383. } else {
  384. let errmsg = format!(
  385. "Task {} failed, msg = {:?}",
  386. self.entity.task().name_version(),
  387. r.err().unwrap()
  388. );
  389. error!("{errmsg}");
  390. return Err(ExecutorError::TaskFailed(errmsg));
  391. }
  392. }
  393. }
  394. #[derive(Debug, Clone)]
  395. pub struct EnvMap {
  396. pub envs: BTreeMap<String, EnvVar>,
  397. }
  398. impl EnvMap {
  399. pub fn new() -> Self {
  400. Self {
  401. envs: BTreeMap::new(),
  402. }
  403. }
  404. pub fn add(&mut self, env: EnvVar) {
  405. self.envs.insert(env.key.clone(), env);
  406. }
  407. #[allow(dead_code)]
  408. pub fn get(&self, key: &str) -> Option<&EnvVar> {
  409. self.envs.get(key)
  410. }
  411. pub fn add_vars(&mut self, vars: Vars) {
  412. for (key, value) in vars {
  413. self.add(EnvVar::new(key, value));
  414. }
  415. }
  416. }
  417. /// # 环境变量
  418. #[derive(Debug, PartialEq, PartialOrd, Eq, Ord, Clone)]
  419. pub struct EnvVar {
  420. pub key: String,
  421. pub value: String,
  422. }
  423. impl EnvVar {
  424. pub fn new(key: String, value: String) -> Self {
  425. Self { key, value }
  426. }
  427. }
  428. /// # 任务执行器错误枚举
  429. #[allow(dead_code)]
  430. #[derive(Debug)]
  431. pub enum ExecutorError {
  432. /// 准备执行环境错误
  433. PrepareEnvError(String),
  434. IoError(std::io::Error),
  435. /// 构建执行错误
  436. TaskFailed(String),
  437. /// 安装错误
  438. InstallError(String),
  439. /// 清理错误
  440. CleanError(String),
  441. }
  442. /// # 准备全局环境变量
  443. pub fn prepare_env(sched_entities: &SchedEntities) -> Result<(), ExecutorError> {
  444. info!("Preparing environment variables...");
  445. // 获取当前全局环境变量列表
  446. let mut env_list = ENV_LIST.write().unwrap();
  447. let envs: Vars = std::env::vars();
  448. env_list.add_vars(envs);
  449. // 为每个任务创建特定的环境变量
  450. for entity in sched_entities.iter() {
  451. // 导出任务的构建目录环境变量
  452. let build_dir = CacheDir::build_dir(entity.clone())?;
  453. let build_dir_key = CacheDir::build_dir_env_key(&entity)?;
  454. env_list.add(EnvVar::new(
  455. build_dir_key,
  456. build_dir.to_str().unwrap().to_string(),
  457. ));
  458. // 如果需要源码缓存目录,则导出
  459. if CacheDir::need_source_cache(entity) {
  460. let source_dir = CacheDir::source_dir(entity.clone())?;
  461. let source_dir_key = CacheDir::source_dir_env_key(&entity)?;
  462. env_list.add(EnvVar::new(
  463. source_dir_key,
  464. source_dir.to_str().unwrap().to_string(),
  465. ));
  466. }
  467. }
  468. // 查看环境变量列表
  469. // debug!("Environment variables:");
  470. // for (key, value) in env_list.envs.iter() {
  471. // debug!("{}: {}", key, value.value);
  472. // }
  473. return Ok(());
  474. }