xref: /DADK/src/executor/mod.rs (revision bc00399460bdb00511befa808b0423cd809fe2a5)
1 use std::{
2     collections::BTreeMap,
3     env::Vars,
4     path::PathBuf,
5     process::{Command, Stdio},
6     sync::{Arc, RwLock},
7 };
8 
9 use log::{debug, error, info, warn};
10 
11 use crate::{
12     console::{clean::CleanLevel, Action},
13     executor::cache::CacheDir,
14     parser::task::{CodeSource, PrebuiltSource, TaskEnv, TaskType},
15     scheduler::{SchedEntities, SchedEntity},
16     utils::file::FileUtils,
17 };
18 
19 use self::cache::CacheDirType;
20 
21 pub mod cache;
22 pub mod source;
23 pub mod target;
24 
25 lazy_static! {
26     // 全局环境变量的列表
27     pub static ref ENV_LIST: RwLock<EnvMap> = RwLock::new(EnvMap::new());
28 }
29 
30 #[derive(Debug, Clone)]
31 pub struct Executor {
32     entity: Arc<SchedEntity>,
33     action: Action,
34     local_envs: EnvMap,
35     /// 任务构建结果输出到的目录
36     build_dir: CacheDir,
37     /// 如果任务需要源文件缓存,则此字段为 Some(CacheDir),否则为 None(使用本地源文件路径)
38     source_dir: Option<CacheDir>,
39     /// DragonOS sysroot的路径
40     dragonos_sysroot: PathBuf,
41 }
42 
43 impl Executor {
44     /// # 创建执行器
45     ///
46     /// 用于执行一个任务
47     ///
48     /// ## 参数
49     ///
50     /// * `entity` - 任务调度实体
51     ///
52     /// ## 返回值
53     ///
54     /// * `Ok(Executor)` - 创建成功
55     /// * `Err(ExecutorError)` - 创建失败
56     pub fn new(
57         entity: Arc<SchedEntity>,
58         action: Action,
59         dragonos_sysroot: PathBuf,
60     ) -> Result<Self, ExecutorError> {
61         let local_envs = EnvMap::new();
62         let build_dir = CacheDir::new(entity.clone(), CacheDirType::Build)?;
63 
64         let source_dir = if CacheDir::need_source_cache(&entity) {
65             Some(CacheDir::new(entity.clone(), CacheDirType::Source)?)
66         } else {
67             None
68         };
69 
70         let result: Executor = Self {
71             action,
72             entity,
73             local_envs,
74             build_dir,
75             source_dir,
76             dragonos_sysroot,
77         };
78 
79         return Ok(result);
80     }
81 
82     /// # 执行任务
83     ///
84     /// 创建执行器后,调用此方法执行任务。
85     /// 该方法会执行以下步骤:
86     ///
87     /// 1. 创建工作线程
88     /// 2. 准备环境变量
89     /// 3. 拉取数据(可选)
90     /// 4. 执行构建
91     pub fn execute(&mut self) -> Result<(), ExecutorError> {
92         info!("Execute task: {}", self.entity.task().name_version());
93 
94         // 准备本地环境变量
95         self.prepare_local_env()?;
96 
97         match self.action {
98             Action::Build => {
99                 // 构建任务
100                 self.build()?;
101             }
102             Action::Install => {
103                 // 把构建结果安装到DragonOS
104                 self.install()?;
105             }
106             Action::Clean(_) => {
107                 // 清理构建结果
108                 let r = self.clean();
109                 if let Err(e) = r {
110                     error!(
111                         "Failed to clean task {}: {:?}",
112                         self.entity.task().name_version(),
113                         e
114                     );
115                 }
116             }
117             _ => {
118                 error!("Unsupported action: {:?}", self.action);
119             }
120         }
121         info!("Task {} finished", self.entity.task().name_version());
122         return Ok(());
123     }
124 
125     /// # 执行build操作
126     fn build(&mut self) -> Result<(), ExecutorError> {
127         self.mv_target_to_tmp()?;
128 
129         // 确认源文件就绪
130         self.prepare_input()?;
131 
132         let command: Option<Command> = self.create_command()?;
133         if let Some(cmd) = command {
134             self.run_command(cmd)?;
135         }
136 
137         // 检查构建结果,如果为空,则抛出警告
138         if self.build_dir.is_empty()? {
139             warn!(
140                 "Task {}: build result is empty, do you forget to copy the result to [${}]?",
141                 self.entity.task().name_version(),
142                 CacheDir::build_dir_env_key(&self.entity)?
143             );
144         }
145         return Ok(());
146     }
147 
148     /// # 执行安装操作,把构建结果安装到DragonOS
149     fn install(&self) -> Result<(), ExecutorError> {
150         let binding = self.entity.task();
151         let in_dragonos_path = binding.install.in_dragonos_path.as_ref();
152         // 如果没有指定安装路径,则不执行安装
153         if in_dragonos_path.is_none() {
154             return Ok(());
155         }
156         info!("Installing task: {}", self.entity.task().name_version());
157         let mut in_dragonos_path = in_dragonos_path.unwrap().to_string_lossy().to_string();
158 
159         debug!("in_dragonos_path: {}", in_dragonos_path);
160         // 去除开头的斜杠
161         {
162             let count_leading_slashes = in_dragonos_path.chars().take_while(|c| *c == '/').count();
163             in_dragonos_path = in_dragonos_path[count_leading_slashes..].to_string();
164         }
165         // 拼接最终的安装路径
166         let install_path = self.dragonos_sysroot.join(in_dragonos_path);
167         debug!("install_path: {:?}", install_path);
168         // 创建安装路径
169         std::fs::create_dir_all(&install_path).map_err(|e| {
170             ExecutorError::InstallError(format!("Failed to create install path: {}", e.to_string()))
171         })?;
172 
173         // 拷贝构建结果到安装路径
174         let build_dir: PathBuf = self.build_dir.path.clone();
175         FileUtils::copy_dir_all(&build_dir, &install_path)
176             .map_err(|e| ExecutorError::InstallError(e))?;
177         info!("Task {} installed.", self.entity.task().name_version());
178 
179         // 安装完后,删除临时target文件
180         if let Some(target) = self.entity.target() {
181             target.clean_tmpdadk()?;
182         }
183 
184         return Ok(());
185     }
186 
187     fn clean(&self) -> Result<(), ExecutorError> {
188         let level = if let Action::Clean(l) = self.action {
189             l.level
190         } else {
191             panic!(
192                 "BUG: clean() called with non-clean action. executor details: {:?}",
193                 self
194             );
195         };
196         info!(
197             "Cleaning task: {}, level={level}",
198             self.entity.task().name_version()
199         );
200 
201         let r: Result<(), ExecutorError> = match level {
202             CleanLevel::All => self.clean_all(),
203             CleanLevel::Src => self.clean_src(),
204             CleanLevel::Target => self.clean_target(),
205             CleanLevel::Cache => self.clean_cache(),
206         };
207 
208         if let Err(e) = r {
209             error!(
210                 "Failed to clean task: {}, error message: {:?}",
211                 self.entity.task().name_version(),
212                 e
213             );
214             return Err(e);
215         }
216 
217         return Ok(());
218     }
219 
220     fn clean_all(&self) -> Result<(), ExecutorError> {
221         // 在源文件目录执行清理
222         self.clean_src()?;
223         // 清理构建结果
224         self.clean_target()?;
225         // 清理缓存
226         self.clean_cache()?;
227         return Ok(());
228     }
229 
230     /// 在源文件目录执行清理
231     fn clean_src(&self) -> Result<(), ExecutorError> {
232         let cmd: Option<Command> = self.create_command()?;
233         if cmd.is_none() {
234             // 如果这里没有命令,则认为用户不需要在源文件目录执行清理
235             return Ok(());
236         }
237         info!(
238             "{}: Cleaning in source directory: {:?}",
239             self.entity.task().name_version(),
240             self.src_work_dir()
241         );
242 
243         let cmd = cmd.unwrap();
244         self.run_command(cmd)?;
245         return Ok(());
246     }
247 
248     /// 清理构建输出目录
249     fn clean_target(&self) -> Result<(), ExecutorError> {
250         info!(
251             "{}: Cleaning build target directory: {:?}",
252             self.entity.task().name_version(),
253             self.build_dir.path
254         );
255 
256         return self.build_dir.remove_self_recursive();
257     }
258 
259     /// 清理下载缓存
260     fn clean_cache(&self) -> Result<(), ExecutorError> {
261         let cache_dir = self.source_dir.as_ref();
262         if cache_dir.is_none() {
263             // 如果没有缓存目录,则认为用户不需要清理缓存
264             return Ok(());
265         }
266         info!(
267             "{}: Cleaning cache directory: {}",
268             self.entity.task().name_version(),
269             self.src_work_dir().display()
270         );
271         return cache_dir.unwrap().remove_self_recursive();
272     }
273 
274     /// 获取源文件的工作目录
275     fn src_work_dir(&self) -> PathBuf {
276         if let Some(local_path) = self.entity.task().source_path() {
277             return local_path;
278         }
279         return self.source_dir.as_ref().unwrap().path.clone();
280     }
281 
282     /// 为任务创建命令
283     fn create_command(&self) -> Result<Option<Command>, ExecutorError> {
284         // 获取命令
285         let raw_cmd = match self.entity.task().task_type {
286             TaskType::BuildFromSource(_) => match self.action {
287                 Action::Build => self.entity.task().build.build_command.clone(),
288                 Action::Clean(_) => self.entity.task().clean.clean_command.clone(),
289                 _ => unimplemented!(
290                     "create_command: Action {:?} not supported yet.",
291                     self.action
292                 ),
293             },
294 
295             TaskType::InstallFromPrebuilt(_) => match self.action {
296                 Action::Build => self.entity.task().build.build_command.clone(),
297                 Action::Clean(_) => self.entity.task().clean.clean_command.clone(),
298                 _ => unimplemented!(
299                     "create_command: Action {:?} not supported yet.",
300                     self.action
301                 ),
302             },
303         };
304 
305         if raw_cmd.is_none() {
306             return Ok(None);
307         }
308 
309         let raw_cmd = raw_cmd.unwrap();
310 
311         let mut command = Command::new("bash");
312         command.current_dir(self.src_work_dir());
313 
314         // 设置参数
315         command.arg("-c");
316         command.arg(raw_cmd);
317 
318         // 设置环境变量
319         let env_list = ENV_LIST.read().unwrap();
320         for (key, value) in env_list.envs.iter() {
321             // if key.starts_with("DADK") {
322             //     debug!("DADK env found: {}={}", key, value.value);
323             // }
324             command.env(key, value.value.clone());
325         }
326         drop(env_list);
327         for (key, value) in self.local_envs.envs.iter() {
328             command.env(key, value.value.clone());
329         }
330 
331         return Ok(Some(command));
332     }
333 
334     /// # 准备工作线程本地环境变量
335     fn prepare_local_env(&mut self) -> Result<(), ExecutorError> {
336         // 设置本地环境变量
337         self.prepare_target_env()?;
338 
339         let binding = self.entity.task();
340         let task_envs: Option<&Vec<TaskEnv>> = binding.envs.as_ref();
341         if task_envs.is_none() {
342             return Ok(());
343         }
344 
345         let task_envs = task_envs.unwrap();
346         for tv in task_envs.iter() {
347             self.local_envs
348                 .add(EnvVar::new(tv.key().to_string(), tv.value().to_string()));
349         }
350 
351         // 添加`DADK_CURRENT_BUILD_DIR`环境变量,便于构建脚本把构建结果拷贝到这里
352         self.local_envs.add(EnvVar::new(
353             "DADK_CURRENT_BUILD_DIR".to_string(),
354             self.build_dir.path.to_str().unwrap().to_string(),
355         ));
356 
357         return Ok(());
358     }
359 
360     fn prepare_input(&self) -> Result<(), ExecutorError> {
361         // 拉取源文件
362         let task = self.entity.task();
363         match &task.task_type {
364             TaskType::BuildFromSource(cs) => {
365                 if self.source_dir.is_none() {
366                     return Ok(());
367                 }
368                 let source_dir = self.source_dir.as_ref().unwrap();
369                 match cs {
370                     CodeSource::Git(git) => {
371                         git.prepare(source_dir)
372                             .map_err(|e| ExecutorError::PrepareEnvError(e))?;
373                     }
374                     // 本地源文件,不需要拉取
375                     CodeSource::Local(_) => return Ok(()),
376                     // 在线压缩包,需要下载
377                     CodeSource::Archive(archive) => {
378                         archive
379                             .download_unzip(source_dir)
380                             .map_err(|e| ExecutorError::PrepareEnvError(e))?;
381                     }
382                 }
383             }
384             TaskType::InstallFromPrebuilt(pb) => {
385                 match pb {
386                     // 本地源文件,不需要拉取
387                     PrebuiltSource::Local(local_source) => {
388                         let local_path = local_source.path();
389                         let target_path = &self.build_dir.path;
390                         FileUtils::copy_dir_all(&local_path, &target_path)
391                             .map_err(|e| ExecutorError::TaskFailed(e))?; // let mut cmd = "cp -r ".to_string();
392                         return Ok(());
393                     }
394                     // 在线压缩包,需要下载
395                     PrebuiltSource::Archive(archive) => {
396                         archive
397                             .download_unzip(&self.build_dir)
398                             .map_err(|e| ExecutorError::PrepareEnvError(e))?;
399                     }
400                 }
401             }
402         }
403 
404         return Ok(());
405     }
406 
407     fn run_command(&self, mut command: Command) -> Result<(), ExecutorError> {
408         let mut child = command
409             .stdin(Stdio::inherit())
410             .spawn()
411             .map_err(|e| ExecutorError::IoError(e))?;
412 
413         // 等待子进程结束
414         let r = child.wait().map_err(|e| ExecutorError::IoError(e));
415         if r.is_ok() {
416             let r = r.unwrap();
417             if r.success() {
418                 return Ok(());
419             } else {
420                 // 执行失败,获取最后100行stderr输出
421                 let errmsg = format!(
422                     "Task {} failed, exit code = {}",
423                     self.entity.task().name_version(),
424                     r.code().unwrap()
425                 );
426                 error!("{errmsg}");
427                 let command_opt = command.output();
428                 if command_opt.is_err() {
429                     return Err(ExecutorError::TaskFailed(
430                         "Failed to get command output".to_string(),
431                     ));
432                 }
433                 let command_opt = command_opt.unwrap();
434                 let command_output = String::from_utf8_lossy(&command_opt.stderr);
435                 let mut last_100_outputs = command_output
436                     .lines()
437                     .rev()
438                     .take(100)
439                     .collect::<Vec<&str>>();
440                 last_100_outputs.reverse();
441                 error!("Last 100 lines msg of stderr:");
442                 for line in last_100_outputs {
443                     error!("{}", line);
444                 }
445                 return Err(ExecutorError::TaskFailed(errmsg));
446             }
447         } else {
448             let errmsg = format!(
449                 "Task {} failed, msg = {:?}",
450                 self.entity.task().name_version(),
451                 r.err().unwrap()
452             );
453             error!("{errmsg}");
454             return Err(ExecutorError::TaskFailed(errmsg));
455         }
456     }
457 
458     pub fn mv_target_to_tmp(&mut self) -> Result<(), ExecutorError> {
459         if let Some(rust_target) = self.entity.task().rust_target.clone() {
460             // 将target文件拷贝至 /tmp 下对应的dadk文件的临时target文件中
461             self.entity
462                 .target()
463                 .as_ref()
464                 .unwrap()
465                 .cp_to_tmp(&rust_target)?;
466         }
467         return Ok(());
468     }
469 
470     pub fn prepare_target_env(&mut self) -> Result<(), ExecutorError> {
471         if self.entity.task().rust_target.is_some() {
472             // 如果有dadk任务有rust_target字段,需要设置DADK_RUST_TARGET_FILE环境变量,值为临时target文件路径
473             self.entity
474                 .target()
475                 .as_ref()
476                 .unwrap()
477                 .prepare_env(&mut self.local_envs);
478         }
479         return Ok(());
480     }
481 }
482 
483 #[derive(Debug, Clone)]
484 pub struct EnvMap {
485     pub envs: BTreeMap<String, EnvVar>,
486 }
487 
488 impl EnvMap {
489     pub fn new() -> Self {
490         Self {
491             envs: BTreeMap::new(),
492         }
493     }
494 
495     pub fn add(&mut self, env: EnvVar) {
496         self.envs.insert(env.key.clone(), env);
497     }
498 
499     #[allow(dead_code)]
500     pub fn get(&self, key: &str) -> Option<&EnvVar> {
501         self.envs.get(key)
502     }
503 
504     pub fn add_vars(&mut self, vars: Vars) {
505         for (key, value) in vars {
506             self.add(EnvVar::new(key, value));
507         }
508     }
509 }
510 
511 /// # 环境变量
512 #[derive(Debug, PartialEq, PartialOrd, Eq, Ord, Clone)]
513 pub struct EnvVar {
514     pub key: String,
515     pub value: String,
516 }
517 
518 impl EnvVar {
519     pub fn new(key: String, value: String) -> Self {
520         Self { key, value }
521     }
522 }
523 
524 /// # 任务执行器错误枚举
525 #[allow(dead_code)]
526 #[derive(Debug)]
527 pub enum ExecutorError {
528     /// 准备执行环境错误
529     PrepareEnvError(String),
530     IoError(std::io::Error),
531     /// 构建执行错误
532     TaskFailed(String),
533     /// 安装错误
534     InstallError(String),
535     /// 清理错误
536     CleanError(String),
537 }
538 
539 /// # 准备全局环境变量
540 pub fn prepare_env(sched_entities: &SchedEntities) -> Result<(), ExecutorError> {
541     info!("Preparing environment variables...");
542     // 获取当前全局环境变量列表
543     let mut env_list = ENV_LIST.write().unwrap();
544     let envs: Vars = std::env::vars();
545     env_list.add_vars(envs);
546 
547     // 为每个任务创建特定的环境变量
548     for entity in sched_entities.entities().iter() {
549         // 导出任务的构建目录环境变量
550         let build_dir = CacheDir::build_dir(entity.clone())?;
551 
552         let build_dir_key = CacheDir::build_dir_env_key(&entity)?;
553         env_list.add(EnvVar::new(
554             build_dir_key,
555             build_dir.to_str().unwrap().to_string(),
556         ));
557 
558         // 如果需要源码缓存目录,则导出
559         if CacheDir::need_source_cache(entity) {
560             let source_dir = CacheDir::source_dir(entity.clone())?;
561             let source_dir_key = CacheDir::source_dir_env_key(&entity)?;
562             env_list.add(EnvVar::new(
563                 source_dir_key,
564                 source_dir.to_str().unwrap().to_string(),
565             ));
566         }
567     }
568 
569     // 查看环境变量列表
570     // debug!("Environment variables:");
571 
572     // for (key, value) in env_list.envs.iter() {
573     //     debug!("{}: {}", key, value.value);
574     // }
575 
576     return Ok(());
577 }
578