1 use std::{ 2 collections::BTreeMap, 3 env::Vars, 4 path::PathBuf, 5 process::{Command, Stdio}, 6 sync::{Arc, RwLock}, 7 }; 8 9 use log::{debug, error, info, warn}; 10 11 use crate::{ 12 console::{clean::CleanLevel, Action}, 13 executor::cache::CacheDir, 14 parser::{ 15 task::{CodeSource, PrebuiltSource, TaskEnv, TaskType}, 16 task_log::{BuildStatus, InstallStatus, TaskLog}, 17 }, 18 scheduler::{SchedEntities, SchedEntity}, 19 utils::file::FileUtils, 20 }; 21 22 use self::cache::{CacheDirType, TaskDataDir}; 23 24 pub mod cache; 25 pub mod source; 26 pub mod target; 27 28 lazy_static! { 29 // 全局环境变量的列表 30 pub static ref ENV_LIST: RwLock<EnvMap> = RwLock::new(EnvMap::new()); 31 } 32 33 #[derive(Debug, Clone)] 34 pub struct Executor { 35 entity: Arc<SchedEntity>, 36 action: Action, 37 local_envs: EnvMap, 38 /// 任务构建结果输出到的目录 39 build_dir: CacheDir, 40 /// 如果任务需要源文件缓存,则此字段为 Some(CacheDir),否则为 None(使用本地源文件路径) 41 source_dir: Option<CacheDir>, 42 /// 任务数据目录 43 task_data_dir: TaskDataDir, 44 /// DragonOS sysroot的路径 45 dragonos_sysroot: PathBuf, 46 } 47 48 impl Executor { 49 /// # 创建执行器 50 /// 51 /// 用于执行一个任务 52 /// 53 /// ## 参数 54 /// 55 /// * `entity` - 任务调度实体 56 /// 57 /// ## 返回值 58 /// 59 /// * `Ok(Executor)` - 创建成功 60 /// * `Err(ExecutorError)` - 创建失败 61 pub fn new( 62 entity: Arc<SchedEntity>, 63 action: Action, 64 dragonos_sysroot: PathBuf, 65 ) -> Result<Self, ExecutorError> { 66 let local_envs = EnvMap::new(); 67 let build_dir = CacheDir::new(entity.clone(), CacheDirType::Build)?; 68 let task_data_dir = TaskDataDir::new(entity.clone())?; 69 70 let source_dir = if CacheDir::need_source_cache(&entity) { 71 Some(CacheDir::new(entity.clone(), CacheDirType::Source)?) 72 } else { 73 None 74 }; 75 76 let result: Executor = Self { 77 action, 78 entity, 79 local_envs, 80 build_dir, 81 source_dir, 82 task_data_dir, 83 dragonos_sysroot, 84 }; 85 86 return Ok(result); 87 } 88 89 /// # 执行任务 90 /// 91 /// 创建执行器后,调用此方法执行任务。 92 /// 该方法会执行以下步骤: 93 /// 94 /// 1. 创建工作线程 95 /// 2. 准备环境变量 96 /// 3. 拉取数据(可选) 97 /// 4. 执行构建 98 pub fn execute(&mut self) -> Result<(), ExecutorError> { 99 info!("Execute task: {}", self.entity.task().name_version()); 100 101 let r = self.do_execute(); 102 self.save_task_data(r); 103 info!("Task {} finished", self.entity.task().name_version()); 104 return Ok(()); 105 } 106 107 /// # 保存任务数据 108 fn save_task_data(&self, r: Result<(), ExecutorError>) { 109 let mut task_log = self.task_data_dir.task_log(); 110 match self.action { 111 Action::Build => { 112 if r.is_ok() { 113 task_log.set_build_status(BuildStatus::Success); 114 } else { 115 task_log.set_build_status(BuildStatus::Failed); 116 } 117 118 task_log.set_build_time_now(); 119 } 120 121 Action::Install => { 122 if r.is_ok() { 123 task_log.set_install_status(InstallStatus::Success); 124 } else { 125 task_log.set_install_status(InstallStatus::Failed); 126 } 127 } 128 129 Action::Clean(_) => { 130 task_log.clean_build_status(); 131 task_log.clean_install_status(); 132 } 133 134 _ => {} 135 } 136 137 self.task_data_dir 138 .save_task_log(&task_log) 139 .expect("Failed to save task log"); 140 } 141 142 fn do_execute(&mut self) -> Result<(), ExecutorError> { 143 // 准备本地环境变量 144 self.prepare_local_env()?; 145 146 match self.action { 147 Action::Build => { 148 // 构建任务 149 self.build()?; 150 } 151 Action::Install => { 152 // 把构建结果安装到DragonOS 153 self.install()?; 154 } 155 Action::Clean(_) => { 156 // 清理构建结果 157 let r = self.clean(); 158 if let Err(e) = r { 159 error!( 160 "Failed to clean task {}: {:?}", 161 self.entity.task().name_version(), 162 e 163 ); 164 } 165 } 166 _ => { 167 error!("Unsupported action: {:?}", self.action); 168 } 169 } 170 171 return Ok(()); 172 } 173 174 /// # 执行build操作 175 fn build(&mut self) -> Result<(), ExecutorError> { 176 if let Some(status) = self.task_log().build_status() { 177 if *status == BuildStatus::Success && self.entity.task().build_once { 178 info!( 179 "Task {} has been built successfully, skip build.", 180 self.entity.task().name_version() 181 ); 182 return Ok(()); 183 } 184 } 185 186 self.mv_target_to_tmp()?; 187 188 // 确认源文件就绪 189 self.prepare_input()?; 190 191 let command: Option<Command> = self.create_command()?; 192 if let Some(cmd) = command { 193 self.run_command(cmd)?; 194 } 195 196 // 检查构建结果,如果为空,则抛出警告 197 if self.build_dir.is_empty()? { 198 warn!( 199 "Task {}: build result is empty, do you forget to copy the result to [$DADK_CURRENT_BUILD_DIR]?", 200 self.entity.task().name_version(), 201 ); 202 } 203 return Ok(()); 204 } 205 206 /// # 执行安装操作,把构建结果安装到DragonOS 207 fn install(&self) -> Result<(), ExecutorError> { 208 if let Some(status) = self.task_log().install_status() { 209 if *status == InstallStatus::Success && self.entity.task().install_once { 210 info!( 211 "Task {} has been installed successfully, skip install.", 212 self.entity.task().name_version() 213 ); 214 return Ok(()); 215 } 216 } 217 218 let binding = self.entity.task(); 219 let in_dragonos_path = binding.install.in_dragonos_path.as_ref(); 220 // 如果没有指定安装路径,则不执行安装 221 if in_dragonos_path.is_none() { 222 return Ok(()); 223 } 224 info!("Installing task: {}", self.entity.task().name_version()); 225 let mut in_dragonos_path = in_dragonos_path.unwrap().to_string_lossy().to_string(); 226 227 debug!("in_dragonos_path: {}", in_dragonos_path); 228 // 去除开头的斜杠 229 { 230 let count_leading_slashes = in_dragonos_path.chars().take_while(|c| *c == '/').count(); 231 in_dragonos_path = in_dragonos_path[count_leading_slashes..].to_string(); 232 } 233 // 拼接最终的安装路径 234 let install_path = self.dragonos_sysroot.join(in_dragonos_path); 235 debug!("install_path: {:?}", install_path); 236 // 创建安装路径 237 std::fs::create_dir_all(&install_path).map_err(|e| { 238 ExecutorError::InstallError(format!("Failed to create install path: {}", e.to_string())) 239 })?; 240 241 // 拷贝构建结果到安装路径 242 let build_dir: PathBuf = self.build_dir.path.clone(); 243 FileUtils::copy_dir_all(&build_dir, &install_path) 244 .map_err(|e| ExecutorError::InstallError(e))?; 245 info!("Task {} installed.", self.entity.task().name_version()); 246 247 // 安装完后,删除临时target文件 248 if let Some(target) = self.entity.target() { 249 target.clean_tmpdadk()?; 250 } 251 252 return Ok(()); 253 } 254 255 fn clean(&self) -> Result<(), ExecutorError> { 256 let level = if let Action::Clean(l) = self.action { 257 l.level 258 } else { 259 panic!( 260 "BUG: clean() called with non-clean action. executor details: {:?}", 261 self 262 ); 263 }; 264 info!( 265 "Cleaning task: {}, level={level}", 266 self.entity.task().name_version() 267 ); 268 269 let r: Result<(), ExecutorError> = match level { 270 CleanLevel::All => self.clean_all(), 271 CleanLevel::Src => self.clean_src(), 272 CleanLevel::Target => self.clean_target(), 273 CleanLevel::Cache => self.clean_cache(), 274 }; 275 276 if let Err(e) = r { 277 error!( 278 "Failed to clean task: {}, error message: {:?}", 279 self.entity.task().name_version(), 280 e 281 ); 282 return Err(e); 283 } 284 285 return Ok(()); 286 } 287 288 fn clean_all(&self) -> Result<(), ExecutorError> { 289 // 在源文件目录执行清理 290 self.clean_src()?; 291 // 清理构建结果 292 self.clean_target()?; 293 // 清理缓存 294 self.clean_cache()?; 295 return Ok(()); 296 } 297 298 /// 在源文件目录执行清理 299 fn clean_src(&self) -> Result<(), ExecutorError> { 300 let cmd: Option<Command> = self.create_command()?; 301 if cmd.is_none() { 302 // 如果这里没有命令,则认为用户不需要在源文件目录执行清理 303 return Ok(()); 304 } 305 info!( 306 "{}: Cleaning in source directory: {:?}", 307 self.entity.task().name_version(), 308 self.src_work_dir() 309 ); 310 311 let cmd = cmd.unwrap(); 312 self.run_command(cmd)?; 313 return Ok(()); 314 } 315 316 /// 清理构建输出目录 317 fn clean_target(&self) -> Result<(), ExecutorError> { 318 info!( 319 "{}: Cleaning build target directory: {:?}", 320 self.entity.task().name_version(), 321 self.build_dir.path 322 ); 323 324 return self.build_dir.remove_self_recursive(); 325 } 326 327 /// 清理下载缓存 328 fn clean_cache(&self) -> Result<(), ExecutorError> { 329 let cache_dir = self.source_dir.as_ref(); 330 if cache_dir.is_none() { 331 // 如果没有缓存目录,则认为用户不需要清理缓存 332 return Ok(()); 333 } 334 info!( 335 "{}: Cleaning cache directory: {}", 336 self.entity.task().name_version(), 337 self.src_work_dir().display() 338 ); 339 return cache_dir.unwrap().remove_self_recursive(); 340 } 341 342 /// 获取源文件的工作目录 343 fn src_work_dir(&self) -> PathBuf { 344 if let Some(local_path) = self.entity.task().source_path() { 345 return local_path; 346 } 347 return self.source_dir.as_ref().unwrap().path.clone(); 348 } 349 350 fn task_log(&self) -> TaskLog { 351 return self.task_data_dir.task_log(); 352 } 353 354 /// 为任务创建命令 355 fn create_command(&self) -> Result<Option<Command>, ExecutorError> { 356 // 获取命令 357 let raw_cmd = match self.entity.task().task_type { 358 TaskType::BuildFromSource(_) => match self.action { 359 Action::Build => self.entity.task().build.build_command.clone(), 360 Action::Clean(_) => self.entity.task().clean.clean_command.clone(), 361 _ => unimplemented!( 362 "create_command: Action {:?} not supported yet.", 363 self.action 364 ), 365 }, 366 367 TaskType::InstallFromPrebuilt(_) => match self.action { 368 Action::Build => self.entity.task().build.build_command.clone(), 369 Action::Clean(_) => self.entity.task().clean.clean_command.clone(), 370 _ => unimplemented!( 371 "create_command: Action {:?} not supported yet.", 372 self.action 373 ), 374 }, 375 }; 376 377 if raw_cmd.is_none() { 378 return Ok(None); 379 } 380 381 let raw_cmd = raw_cmd.unwrap(); 382 383 let mut command = Command::new("bash"); 384 command.current_dir(self.src_work_dir()); 385 386 // 设置参数 387 command.arg("-c"); 388 command.arg(raw_cmd); 389 390 // 设置环境变量 391 let env_list = ENV_LIST.read().unwrap(); 392 for (key, value) in env_list.envs.iter() { 393 // if key.starts_with("DADK") { 394 // debug!("DADK env found: {}={}", key, value.value); 395 // } 396 command.env(key, value.value.clone()); 397 } 398 drop(env_list); 399 for (key, value) in self.local_envs.envs.iter() { 400 debug!("Local env found: {}={}", key, value.value); 401 command.env(key, value.value.clone()); 402 } 403 404 return Ok(Some(command)); 405 } 406 407 /// # 准备工作线程本地环境变量 408 fn prepare_local_env(&mut self) -> Result<(), ExecutorError> { 409 // 设置本地环境变量 410 self.prepare_target_env()?; 411 412 let binding = self.entity.task(); 413 let task_envs: Option<&Vec<TaskEnv>> = binding.envs.as_ref(); 414 415 if let Some(task_envs) = task_envs { 416 for tv in task_envs.iter() { 417 self.local_envs 418 .add(EnvVar::new(tv.key().to_string(), tv.value().to_string())); 419 } 420 } 421 422 // 添加`DADK_CURRENT_BUILD_DIR`环境变量,便于构建脚本把构建结果拷贝到这里 423 self.local_envs.add(EnvVar::new( 424 "DADK_CURRENT_BUILD_DIR".to_string(), 425 self.build_dir.path.to_str().unwrap().to_string(), 426 )); 427 428 return Ok(()); 429 } 430 431 fn prepare_input(&self) -> Result<(), ExecutorError> { 432 // 拉取源文件 433 let task = self.entity.task(); 434 match &task.task_type { 435 TaskType::BuildFromSource(cs) => { 436 if self.source_dir.is_none() { 437 return Ok(()); 438 } 439 let source_dir = self.source_dir.as_ref().unwrap(); 440 match cs { 441 CodeSource::Git(git) => { 442 git.prepare(source_dir) 443 .map_err(|e| ExecutorError::PrepareEnvError(e))?; 444 } 445 // 本地源文件,不需要拉取 446 CodeSource::Local(_) => return Ok(()), 447 // 在线压缩包,需要下载 448 CodeSource::Archive(archive) => { 449 archive 450 .download_unzip(source_dir) 451 .map_err(|e| ExecutorError::PrepareEnvError(e))?; 452 } 453 } 454 } 455 TaskType::InstallFromPrebuilt(pb) => { 456 match pb { 457 // 本地源文件,不需要拉取 458 PrebuiltSource::Local(local_source) => { 459 let local_path = local_source.path(); 460 let target_path = &self.build_dir.path; 461 FileUtils::copy_dir_all(&local_path, &target_path) 462 .map_err(|e| ExecutorError::TaskFailed(e))?; // let mut cmd = "cp -r ".to_string(); 463 return Ok(()); 464 } 465 // 在线压缩包,需要下载 466 PrebuiltSource::Archive(archive) => { 467 archive 468 .download_unzip(&self.build_dir) 469 .map_err(|e| ExecutorError::PrepareEnvError(e))?; 470 } 471 } 472 } 473 } 474 475 return Ok(()); 476 } 477 478 fn run_command(&self, mut command: Command) -> Result<(), ExecutorError> { 479 let mut child = command 480 .stdin(Stdio::inherit()) 481 .spawn() 482 .map_err(|e| ExecutorError::IoError(e))?; 483 484 // 等待子进程结束 485 let r = child.wait().map_err(|e| ExecutorError::IoError(e)); 486 if r.is_ok() { 487 let r = r.unwrap(); 488 if r.success() { 489 return Ok(()); 490 } else { 491 // 执行失败,获取最后100行stderr输出 492 let errmsg = format!( 493 "Task {} failed, exit code = {}", 494 self.entity.task().name_version(), 495 r.code().unwrap() 496 ); 497 error!("{errmsg}"); 498 let command_opt = command.output(); 499 if command_opt.is_err() { 500 return Err(ExecutorError::TaskFailed( 501 "Failed to get command output".to_string(), 502 )); 503 } 504 let command_opt = command_opt.unwrap(); 505 let command_output = String::from_utf8_lossy(&command_opt.stderr); 506 let mut last_100_outputs = command_output 507 .lines() 508 .rev() 509 .take(100) 510 .collect::<Vec<&str>>(); 511 last_100_outputs.reverse(); 512 error!("Last 100 lines msg of stderr:"); 513 for line in last_100_outputs { 514 error!("{}", line); 515 } 516 return Err(ExecutorError::TaskFailed(errmsg)); 517 } 518 } else { 519 let errmsg = format!( 520 "Task {} failed, msg = {:?}", 521 self.entity.task().name_version(), 522 r.err().unwrap() 523 ); 524 error!("{errmsg}"); 525 return Err(ExecutorError::TaskFailed(errmsg)); 526 } 527 } 528 529 pub fn mv_target_to_tmp(&mut self) -> Result<(), ExecutorError> { 530 if let Some(rust_target) = self.entity.task().rust_target.clone() { 531 // 将target文件拷贝至 /tmp 下对应的dadk文件的临时target文件中 532 self.entity 533 .target() 534 .as_ref() 535 .unwrap() 536 .cp_to_tmp(&rust_target)?; 537 } 538 return Ok(()); 539 } 540 541 pub fn prepare_target_env(&mut self) -> Result<(), ExecutorError> { 542 if self.entity.task().rust_target.is_some() { 543 // 如果有dadk任务有rust_target字段,需要设置DADK_RUST_TARGET_FILE环境变量,值为临时target文件路径 544 self.entity 545 .target() 546 .as_ref() 547 .unwrap() 548 .prepare_env(&mut self.local_envs); 549 } 550 return Ok(()); 551 } 552 } 553 554 #[derive(Debug, Clone)] 555 pub struct EnvMap { 556 pub envs: BTreeMap<String, EnvVar>, 557 } 558 559 impl EnvMap { 560 pub fn new() -> Self { 561 Self { 562 envs: BTreeMap::new(), 563 } 564 } 565 566 pub fn add(&mut self, env: EnvVar) { 567 self.envs.insert(env.key.clone(), env); 568 } 569 570 #[allow(dead_code)] 571 pub fn get(&self, key: &str) -> Option<&EnvVar> { 572 self.envs.get(key) 573 } 574 575 pub fn add_vars(&mut self, vars: Vars) { 576 for (key, value) in vars { 577 self.add(EnvVar::new(key, value)); 578 } 579 } 580 } 581 582 /// # 环境变量 583 #[derive(Debug, PartialEq, PartialOrd, Eq, Ord, Clone)] 584 pub struct EnvVar { 585 pub key: String, 586 pub value: String, 587 } 588 589 impl EnvVar { 590 pub fn new(key: String, value: String) -> Self { 591 Self { key, value } 592 } 593 } 594 595 /// # 任务执行器错误枚举 596 #[allow(dead_code)] 597 #[derive(Debug)] 598 pub enum ExecutorError { 599 /// 准备执行环境错误 600 PrepareEnvError(String), 601 IoError(std::io::Error), 602 /// 构建执行错误 603 TaskFailed(String), 604 /// 安装错误 605 InstallError(String), 606 /// 清理错误 607 CleanError(String), 608 } 609 610 /// # 准备全局环境变量 611 pub fn prepare_env(sched_entities: &SchedEntities) -> Result<(), ExecutorError> { 612 info!("Preparing environment variables..."); 613 // 获取当前全局环境变量列表 614 let mut env_list = ENV_LIST.write().unwrap(); 615 let envs: Vars = std::env::vars(); 616 env_list.add_vars(envs); 617 618 // 为每个任务创建特定的环境变量 619 for entity in sched_entities.entities().iter() { 620 // 导出任务的构建目录环境变量 621 let build_dir = CacheDir::build_dir(entity.clone())?; 622 623 let build_dir_key = CacheDir::build_dir_env_key(&entity)?; 624 env_list.add(EnvVar::new( 625 build_dir_key, 626 build_dir.to_str().unwrap().to_string(), 627 )); 628 629 // 如果需要源码缓存目录,则导出 630 if CacheDir::need_source_cache(entity) { 631 let source_dir = CacheDir::source_dir(entity.clone())?; 632 let source_dir_key = CacheDir::source_dir_env_key(&entity)?; 633 env_list.add(EnvVar::new( 634 source_dir_key, 635 source_dir.to_str().unwrap().to_string(), 636 )); 637 } 638 } 639 640 // 查看环境变量列表 641 // debug!("Environment variables:"); 642 643 // for (key, value) in env_list.envs.iter() { 644 // debug!("{}: {}", key, value.value); 645 // } 646 647 return Ok(()); 648 } 649