xref: /DADK/src/executor/mod.rs (revision 60f366c9cf663fab2c851abaf55f1b1f011b3bb9)
1 use std::{
2     collections::BTreeMap,
3     env::Vars,
4     path::PathBuf,
5     process::{Command, Stdio},
6     sync::{Arc, RwLock},
7 };
8 
9 use log::{debug, error, info, warn};
10 
11 use crate::{
12     console::{clean::CleanLevel, Action},
13     executor::cache::CacheDir,
14     parser::{
15         task::{CodeSource, PrebuiltSource, TaskEnv, TaskType},
16         task_log::{BuildStatus, InstallStatus, TaskLog},
17     },
18     scheduler::{SchedEntities, SchedEntity},
19     utils::file::FileUtils,
20 };
21 
22 use self::cache::{CacheDirType, TaskDataDir};
23 
24 pub mod cache;
25 pub mod source;
26 pub mod target;
27 
28 lazy_static! {
29     // 全局环境变量的列表
30     pub static ref ENV_LIST: RwLock<EnvMap> = RwLock::new(EnvMap::new());
31 }
32 
33 #[derive(Debug, Clone)]
34 pub struct Executor {
35     entity: Arc<SchedEntity>,
36     action: Action,
37     local_envs: EnvMap,
38     /// 任务构建结果输出到的目录
39     build_dir: CacheDir,
40     /// 如果任务需要源文件缓存,则此字段为 Some(CacheDir),否则为 None(使用本地源文件路径)
41     source_dir: Option<CacheDir>,
42     /// 任务数据目录
43     task_data_dir: TaskDataDir,
44     /// DragonOS sysroot的路径
45     dragonos_sysroot: PathBuf,
46 }
47 
48 impl Executor {
49     /// # 创建执行器
50     ///
51     /// 用于执行一个任务
52     ///
53     /// ## 参数
54     ///
55     /// * `entity` - 任务调度实体
56     ///
57     /// ## 返回值
58     ///
59     /// * `Ok(Executor)` - 创建成功
60     /// * `Err(ExecutorError)` - 创建失败
61     pub fn new(
62         entity: Arc<SchedEntity>,
63         action: Action,
64         dragonos_sysroot: PathBuf,
65     ) -> Result<Self, ExecutorError> {
66         let local_envs = EnvMap::new();
67         let build_dir = CacheDir::new(entity.clone(), CacheDirType::Build)?;
68         let task_data_dir = TaskDataDir::new(entity.clone())?;
69 
70         let source_dir = if CacheDir::need_source_cache(&entity) {
71             Some(CacheDir::new(entity.clone(), CacheDirType::Source)?)
72         } else {
73             None
74         };
75 
76         let result: Executor = Self {
77             action,
78             entity,
79             local_envs,
80             build_dir,
81             source_dir,
82             task_data_dir,
83             dragonos_sysroot,
84         };
85 
86         return Ok(result);
87     }
88 
89     /// # 执行任务
90     ///
91     /// 创建执行器后,调用此方法执行任务。
92     /// 该方法会执行以下步骤:
93     ///
94     /// 1. 创建工作线程
95     /// 2. 准备环境变量
96     /// 3. 拉取数据(可选)
97     /// 4. 执行构建
98     pub fn execute(&mut self) -> Result<(), ExecutorError> {
99         info!("Execute task: {}", self.entity.task().name_version());
100 
101         let r = self.do_execute();
102         self.save_task_data(r);
103         info!("Task {} finished", self.entity.task().name_version());
104         return Ok(());
105     }
106 
107     /// # 保存任务数据
108     fn save_task_data(&self, r: Result<(), ExecutorError>) {
109         let mut task_log = self.task_data_dir.task_log();
110         match self.action {
111             Action::Build => {
112                 if r.is_ok() {
113                     task_log.set_build_status(BuildStatus::Success);
114                 } else {
115                     task_log.set_build_status(BuildStatus::Failed);
116                 }
117 
118                 task_log.set_build_time_now();
119             }
120 
121             Action::Install => {
122                 if r.is_ok() {
123                     task_log.set_install_status(InstallStatus::Success);
124                 } else {
125                     task_log.set_install_status(InstallStatus::Failed);
126                 }
127             }
128 
129             Action::Clean(_) => {
130                 task_log.clean_build_status();
131                 task_log.clean_install_status();
132             }
133 
134             _ => {}
135         }
136 
137         self.task_data_dir
138             .save_task_log(&task_log)
139             .expect("Failed to save task log");
140     }
141 
142     fn do_execute(&mut self) -> Result<(), ExecutorError> {
143         // 准备本地环境变量
144         self.prepare_local_env()?;
145 
146         match self.action {
147             Action::Build => {
148                 // 构建任务
149                 self.build()?;
150             }
151             Action::Install => {
152                 // 把构建结果安装到DragonOS
153                 self.install()?;
154             }
155             Action::Clean(_) => {
156                 // 清理构建结果
157                 let r = self.clean();
158                 if let Err(e) = r {
159                     error!(
160                         "Failed to clean task {}: {:?}",
161                         self.entity.task().name_version(),
162                         e
163                     );
164                 }
165             }
166             _ => {
167                 error!("Unsupported action: {:?}", self.action);
168             }
169         }
170 
171         return Ok(());
172     }
173 
174     /// # 执行build操作
175     fn build(&mut self) -> Result<(), ExecutorError> {
176         if let Some(status) = self.task_log().build_status() {
177             if *status == BuildStatus::Success && self.entity.task().build_once {
178                 info!(
179                     "Task {} has been built successfully, skip build.",
180                     self.entity.task().name_version()
181                 );
182                 return Ok(());
183             }
184         }
185 
186         self.mv_target_to_tmp()?;
187 
188         // 确认源文件就绪
189         self.prepare_input()?;
190 
191         let command: Option<Command> = self.create_command()?;
192         if let Some(cmd) = command {
193             self.run_command(cmd)?;
194         }
195 
196         // 检查构建结果,如果为空,则抛出警告
197         if self.build_dir.is_empty()? {
198             warn!(
199                 "Task {}: build result is empty, do you forget to copy the result to [$DADK_CURRENT_BUILD_DIR]?",
200                 self.entity.task().name_version(),
201             );
202         }
203         return Ok(());
204     }
205 
206     /// # 执行安装操作,把构建结果安装到DragonOS
207     fn install(&self) -> Result<(), ExecutorError> {
208         if let Some(status) = self.task_log().install_status() {
209             if *status == InstallStatus::Success && self.entity.task().install_once {
210                 info!(
211                     "Task {} has been installed successfully, skip install.",
212                     self.entity.task().name_version()
213                 );
214                 return Ok(());
215             }
216         }
217 
218         let binding = self.entity.task();
219         let in_dragonos_path = binding.install.in_dragonos_path.as_ref();
220         // 如果没有指定安装路径,则不执行安装
221         if in_dragonos_path.is_none() {
222             return Ok(());
223         }
224         info!("Installing task: {}", self.entity.task().name_version());
225         let mut in_dragonos_path = in_dragonos_path.unwrap().to_string_lossy().to_string();
226 
227         debug!("in_dragonos_path: {}", in_dragonos_path);
228         // 去除开头的斜杠
229         {
230             let count_leading_slashes = in_dragonos_path.chars().take_while(|c| *c == '/').count();
231             in_dragonos_path = in_dragonos_path[count_leading_slashes..].to_string();
232         }
233         // 拼接最终的安装路径
234         let install_path = self.dragonos_sysroot.join(in_dragonos_path);
235         debug!("install_path: {:?}", install_path);
236         // 创建安装路径
237         std::fs::create_dir_all(&install_path).map_err(|e| {
238             ExecutorError::InstallError(format!("Failed to create install path: {}", e.to_string()))
239         })?;
240 
241         // 拷贝构建结果到安装路径
242         let build_dir: PathBuf = self.build_dir.path.clone();
243         FileUtils::copy_dir_all(&build_dir, &install_path)
244             .map_err(|e| ExecutorError::InstallError(e))?;
245         info!("Task {} installed.", self.entity.task().name_version());
246 
247         // 安装完后,删除临时target文件
248         if let Some(target) = self.entity.target() {
249             target.clean_tmpdadk()?;
250         }
251 
252         return Ok(());
253     }
254 
255     fn clean(&self) -> Result<(), ExecutorError> {
256         let level = if let Action::Clean(l) = self.action {
257             l.level
258         } else {
259             panic!(
260                 "BUG: clean() called with non-clean action. executor details: {:?}",
261                 self
262             );
263         };
264         info!(
265             "Cleaning task: {}, level={level}",
266             self.entity.task().name_version()
267         );
268 
269         let r: Result<(), ExecutorError> = match level {
270             CleanLevel::All => self.clean_all(),
271             CleanLevel::Src => self.clean_src(),
272             CleanLevel::Target => self.clean_target(),
273             CleanLevel::Cache => self.clean_cache(),
274         };
275 
276         if let Err(e) = r {
277             error!(
278                 "Failed to clean task: {}, error message: {:?}",
279                 self.entity.task().name_version(),
280                 e
281             );
282             return Err(e);
283         }
284 
285         return Ok(());
286     }
287 
288     fn clean_all(&self) -> Result<(), ExecutorError> {
289         // 在源文件目录执行清理
290         self.clean_src()?;
291         // 清理构建结果
292         self.clean_target()?;
293         // 清理缓存
294         self.clean_cache()?;
295         return Ok(());
296     }
297 
298     /// 在源文件目录执行清理
299     fn clean_src(&self) -> Result<(), ExecutorError> {
300         let cmd: Option<Command> = self.create_command()?;
301         if cmd.is_none() {
302             // 如果这里没有命令,则认为用户不需要在源文件目录执行清理
303             return Ok(());
304         }
305         info!(
306             "{}: Cleaning in source directory: {:?}",
307             self.entity.task().name_version(),
308             self.src_work_dir()
309         );
310 
311         let cmd = cmd.unwrap();
312         self.run_command(cmd)?;
313         return Ok(());
314     }
315 
316     /// 清理构建输出目录
317     fn clean_target(&self) -> Result<(), ExecutorError> {
318         info!(
319             "{}: Cleaning build target directory: {:?}",
320             self.entity.task().name_version(),
321             self.build_dir.path
322         );
323 
324         return self.build_dir.remove_self_recursive();
325     }
326 
327     /// 清理下载缓存
328     fn clean_cache(&self) -> Result<(), ExecutorError> {
329         let cache_dir = self.source_dir.as_ref();
330         if cache_dir.is_none() {
331             // 如果没有缓存目录,则认为用户不需要清理缓存
332             return Ok(());
333         }
334         info!(
335             "{}: Cleaning cache directory: {}",
336             self.entity.task().name_version(),
337             self.src_work_dir().display()
338         );
339         return cache_dir.unwrap().remove_self_recursive();
340     }
341 
342     /// 获取源文件的工作目录
343     fn src_work_dir(&self) -> PathBuf {
344         if let Some(local_path) = self.entity.task().source_path() {
345             return local_path;
346         }
347         return self.source_dir.as_ref().unwrap().path.clone();
348     }
349 
350     fn task_log(&self) -> TaskLog {
351         return self.task_data_dir.task_log();
352     }
353 
354     /// 为任务创建命令
355     fn create_command(&self) -> Result<Option<Command>, ExecutorError> {
356         // 获取命令
357         let raw_cmd = match self.entity.task().task_type {
358             TaskType::BuildFromSource(_) => match self.action {
359                 Action::Build => self.entity.task().build.build_command.clone(),
360                 Action::Clean(_) => self.entity.task().clean.clean_command.clone(),
361                 _ => unimplemented!(
362                     "create_command: Action {:?} not supported yet.",
363                     self.action
364                 ),
365             },
366 
367             TaskType::InstallFromPrebuilt(_) => match self.action {
368                 Action::Build => self.entity.task().build.build_command.clone(),
369                 Action::Clean(_) => self.entity.task().clean.clean_command.clone(),
370                 _ => unimplemented!(
371                     "create_command: Action {:?} not supported yet.",
372                     self.action
373                 ),
374             },
375         };
376 
377         if raw_cmd.is_none() {
378             return Ok(None);
379         }
380 
381         let raw_cmd = raw_cmd.unwrap();
382 
383         let mut command = Command::new("bash");
384         command.current_dir(self.src_work_dir());
385 
386         // 设置参数
387         command.arg("-c");
388         command.arg(raw_cmd);
389 
390         // 设置环境变量
391         let env_list = ENV_LIST.read().unwrap();
392         for (key, value) in env_list.envs.iter() {
393             // if key.starts_with("DADK") {
394             //     debug!("DADK env found: {}={}", key, value.value);
395             // }
396             command.env(key, value.value.clone());
397         }
398         drop(env_list);
399         for (key, value) in self.local_envs.envs.iter() {
400             debug!("Local env found: {}={}", key, value.value);
401             command.env(key, value.value.clone());
402         }
403 
404         return Ok(Some(command));
405     }
406 
407     /// # 准备工作线程本地环境变量
408     fn prepare_local_env(&mut self) -> Result<(), ExecutorError> {
409         // 设置本地环境变量
410         self.prepare_target_env()?;
411 
412         let binding = self.entity.task();
413         let task_envs: Option<&Vec<TaskEnv>> = binding.envs.as_ref();
414 
415         if let Some(task_envs) = task_envs {
416             for tv in task_envs.iter() {
417                 self.local_envs
418                     .add(EnvVar::new(tv.key().to_string(), tv.value().to_string()));
419             }
420         }
421 
422         // 添加`DADK_CURRENT_BUILD_DIR`环境变量,便于构建脚本把构建结果拷贝到这里
423         self.local_envs.add(EnvVar::new(
424             "DADK_CURRENT_BUILD_DIR".to_string(),
425             self.build_dir.path.to_str().unwrap().to_string(),
426         ));
427 
428         return Ok(());
429     }
430 
431     fn prepare_input(&self) -> Result<(), ExecutorError> {
432         // 拉取源文件
433         let task = self.entity.task();
434         match &task.task_type {
435             TaskType::BuildFromSource(cs) => {
436                 if self.source_dir.is_none() {
437                     return Ok(());
438                 }
439                 let source_dir = self.source_dir.as_ref().unwrap();
440                 match cs {
441                     CodeSource::Git(git) => {
442                         git.prepare(source_dir)
443                             .map_err(|e| ExecutorError::PrepareEnvError(e))?;
444                     }
445                     // 本地源文件,不需要拉取
446                     CodeSource::Local(_) => return Ok(()),
447                     // 在线压缩包,需要下载
448                     CodeSource::Archive(archive) => {
449                         archive
450                             .download_unzip(source_dir)
451                             .map_err(|e| ExecutorError::PrepareEnvError(e))?;
452                     }
453                 }
454             }
455             TaskType::InstallFromPrebuilt(pb) => {
456                 match pb {
457                     // 本地源文件,不需要拉取
458                     PrebuiltSource::Local(local_source) => {
459                         let local_path = local_source.path();
460                         let target_path = &self.build_dir.path;
461                         FileUtils::copy_dir_all(&local_path, &target_path)
462                             .map_err(|e| ExecutorError::TaskFailed(e))?; // let mut cmd = "cp -r ".to_string();
463                         return Ok(());
464                     }
465                     // 在线压缩包,需要下载
466                     PrebuiltSource::Archive(archive) => {
467                         archive
468                             .download_unzip(&self.build_dir)
469                             .map_err(|e| ExecutorError::PrepareEnvError(e))?;
470                     }
471                 }
472             }
473         }
474 
475         return Ok(());
476     }
477 
478     fn run_command(&self, mut command: Command) -> Result<(), ExecutorError> {
479         let mut child = command
480             .stdin(Stdio::inherit())
481             .spawn()
482             .map_err(|e| ExecutorError::IoError(e))?;
483 
484         // 等待子进程结束
485         let r = child.wait().map_err(|e| ExecutorError::IoError(e));
486         if r.is_ok() {
487             let r = r.unwrap();
488             if r.success() {
489                 return Ok(());
490             } else {
491                 // 执行失败,获取最后100行stderr输出
492                 let errmsg = format!(
493                     "Task {} failed, exit code = {}",
494                     self.entity.task().name_version(),
495                     r.code().unwrap()
496                 );
497                 error!("{errmsg}");
498                 let command_opt = command.output();
499                 if command_opt.is_err() {
500                     return Err(ExecutorError::TaskFailed(
501                         "Failed to get command output".to_string(),
502                     ));
503                 }
504                 let command_opt = command_opt.unwrap();
505                 let command_output = String::from_utf8_lossy(&command_opt.stderr);
506                 let mut last_100_outputs = command_output
507                     .lines()
508                     .rev()
509                     .take(100)
510                     .collect::<Vec<&str>>();
511                 last_100_outputs.reverse();
512                 error!("Last 100 lines msg of stderr:");
513                 for line in last_100_outputs {
514                     error!("{}", line);
515                 }
516                 return Err(ExecutorError::TaskFailed(errmsg));
517             }
518         } else {
519             let errmsg = format!(
520                 "Task {} failed, msg = {:?}",
521                 self.entity.task().name_version(),
522                 r.err().unwrap()
523             );
524             error!("{errmsg}");
525             return Err(ExecutorError::TaskFailed(errmsg));
526         }
527     }
528 
529     pub fn mv_target_to_tmp(&mut self) -> Result<(), ExecutorError> {
530         if let Some(rust_target) = self.entity.task().rust_target.clone() {
531             // 将target文件拷贝至 /tmp 下对应的dadk文件的临时target文件中
532             self.entity
533                 .target()
534                 .as_ref()
535                 .unwrap()
536                 .cp_to_tmp(&rust_target)?;
537         }
538         return Ok(());
539     }
540 
541     pub fn prepare_target_env(&mut self) -> Result<(), ExecutorError> {
542         if self.entity.task().rust_target.is_some() {
543             // 如果有dadk任务有rust_target字段,需要设置DADK_RUST_TARGET_FILE环境变量,值为临时target文件路径
544             self.entity
545                 .target()
546                 .as_ref()
547                 .unwrap()
548                 .prepare_env(&mut self.local_envs);
549         }
550         return Ok(());
551     }
552 }
553 
554 #[derive(Debug, Clone)]
555 pub struct EnvMap {
556     pub envs: BTreeMap<String, EnvVar>,
557 }
558 
559 impl EnvMap {
560     pub fn new() -> Self {
561         Self {
562             envs: BTreeMap::new(),
563         }
564     }
565 
566     pub fn add(&mut self, env: EnvVar) {
567         self.envs.insert(env.key.clone(), env);
568     }
569 
570     #[allow(dead_code)]
571     pub fn get(&self, key: &str) -> Option<&EnvVar> {
572         self.envs.get(key)
573     }
574 
575     pub fn add_vars(&mut self, vars: Vars) {
576         for (key, value) in vars {
577             self.add(EnvVar::new(key, value));
578         }
579     }
580 }
581 
582 /// # 环境变量
583 #[derive(Debug, PartialEq, PartialOrd, Eq, Ord, Clone)]
584 pub struct EnvVar {
585     pub key: String,
586     pub value: String,
587 }
588 
589 impl EnvVar {
590     pub fn new(key: String, value: String) -> Self {
591         Self { key, value }
592     }
593 }
594 
595 /// # 任务执行器错误枚举
596 #[allow(dead_code)]
597 #[derive(Debug)]
598 pub enum ExecutorError {
599     /// 准备执行环境错误
600     PrepareEnvError(String),
601     IoError(std::io::Error),
602     /// 构建执行错误
603     TaskFailed(String),
604     /// 安装错误
605     InstallError(String),
606     /// 清理错误
607     CleanError(String),
608 }
609 
610 /// # 准备全局环境变量
611 pub fn prepare_env(sched_entities: &SchedEntities) -> Result<(), ExecutorError> {
612     info!("Preparing environment variables...");
613     // 获取当前全局环境变量列表
614     let mut env_list = ENV_LIST.write().unwrap();
615     let envs: Vars = std::env::vars();
616     env_list.add_vars(envs);
617 
618     // 为每个任务创建特定的环境变量
619     for entity in sched_entities.entities().iter() {
620         // 导出任务的构建目录环境变量
621         let build_dir = CacheDir::build_dir(entity.clone())?;
622 
623         let build_dir_key = CacheDir::build_dir_env_key(&entity)?;
624         env_list.add(EnvVar::new(
625             build_dir_key,
626             build_dir.to_str().unwrap().to_string(),
627         ));
628 
629         // 如果需要源码缓存目录,则导出
630         if CacheDir::need_source_cache(entity) {
631             let source_dir = CacheDir::source_dir(entity.clone())?;
632             let source_dir_key = CacheDir::source_dir_env_key(&entity)?;
633             env_list.add(EnvVar::new(
634                 source_dir_key,
635                 source_dir.to_str().unwrap().to_string(),
636             ));
637         }
638     }
639 
640     // 查看环境变量列表
641     // debug!("Environment variables:");
642 
643     // for (key, value) in env_list.envs.iter() {
644     //     debug!("{}: {}", key, value.value);
645     // }
646 
647     return Ok(());
648 }
649