1 use std::{ 2 collections::BTreeMap, 3 env::Vars, 4 path::PathBuf, 5 process::{Command, Stdio}, 6 sync::{Arc, RwLock}, 7 }; 8 9 use log::{debug, error, info, warn}; 10 11 use crate::{ 12 console::{clean::CleanLevel, Action}, 13 executor::cache::CacheDir, 14 parser::task::{CodeSource, PrebuiltSource, TaskEnv, TaskType}, 15 scheduler::{SchedEntities, SchedEntity}, 16 utils::file::FileUtils, 17 }; 18 19 use self::cache::CacheDirType; 20 21 pub mod cache; 22 pub mod source; 23 pub mod target; 24 25 lazy_static! { 26 // 全局环境变量的列表 27 pub static ref ENV_LIST: RwLock<EnvMap> = RwLock::new(EnvMap::new()); 28 } 29 30 #[derive(Debug, Clone)] 31 pub struct Executor { 32 entity: Arc<SchedEntity>, 33 action: Action, 34 local_envs: EnvMap, 35 /// 任务构建结果输出到的目录 36 build_dir: CacheDir, 37 /// 如果任务需要源文件缓存,则此字段为 Some(CacheDir),否则为 None(使用本地源文件路径) 38 source_dir: Option<CacheDir>, 39 /// DragonOS sysroot的路径 40 dragonos_sysroot: PathBuf, 41 } 42 43 impl Executor { 44 /// # 创建执行器 45 /// 46 /// 用于执行一个任务 47 /// 48 /// ## 参数 49 /// 50 /// * `entity` - 任务调度实体 51 /// 52 /// ## 返回值 53 /// 54 /// * `Ok(Executor)` - 创建成功 55 /// * `Err(ExecutorError)` - 创建失败 56 pub fn new( 57 entity: Arc<SchedEntity>, 58 action: Action, 59 dragonos_sysroot: PathBuf, 60 ) -> Result<Self, ExecutorError> { 61 let local_envs = EnvMap::new(); 62 let build_dir = CacheDir::new(entity.clone(), CacheDirType::Build)?; 63 64 let source_dir = if CacheDir::need_source_cache(&entity) { 65 Some(CacheDir::new(entity.clone(), CacheDirType::Source)?) 66 } else { 67 None 68 }; 69 70 let result: Executor = Self { 71 action, 72 entity, 73 local_envs, 74 build_dir, 75 source_dir, 76 dragonos_sysroot, 77 }; 78 79 return Ok(result); 80 } 81 82 /// # 执行任务 83 /// 84 /// 创建执行器后,调用此方法执行任务。 85 /// 该方法会执行以下步骤: 86 /// 87 /// 1. 创建工作线程 88 /// 2. 准备环境变量 89 /// 3. 拉取数据(可选) 90 /// 4. 执行构建 91 pub fn execute(&mut self) -> Result<(), ExecutorError> { 92 info!("Execute task: {}", self.entity.task().name_version()); 93 94 // 准备本地环境变量 95 self.prepare_local_env()?; 96 97 match self.action { 98 Action::Build => { 99 // 构建任务 100 self.build()?; 101 } 102 Action::Install => { 103 // 把构建结果安装到DragonOS 104 self.install()?; 105 } 106 Action::Clean(_) => { 107 // 清理构建结果 108 let r = self.clean(); 109 if let Err(e) = r { 110 error!( 111 "Failed to clean task {}: {:?}", 112 self.entity.task().name_version(), 113 e 114 ); 115 } 116 } 117 _ => { 118 error!("Unsupported action: {:?}", self.action); 119 } 120 } 121 info!("Task {} finished", self.entity.task().name_version()); 122 return Ok(()); 123 } 124 125 /// # 执行build操作 126 fn build(&mut self) -> Result<(), ExecutorError> { 127 self.mv_target_to_tmp()?; 128 129 // 确认源文件就绪 130 self.prepare_input()?; 131 132 let command: Option<Command> = self.create_command()?; 133 if let Some(cmd) = command { 134 self.run_command(cmd)?; 135 } 136 137 // 检查构建结果,如果为空,则抛出警告 138 if self.build_dir.is_empty()? { 139 warn!( 140 "Task {}: build result is empty, do you forget to copy the result to [${}]?", 141 self.entity.task().name_version(), 142 CacheDir::build_dir_env_key(&self.entity)? 143 ); 144 } 145 return Ok(()); 146 } 147 148 /// # 执行安装操作,把构建结果安装到DragonOS 149 fn install(&self) -> Result<(), ExecutorError> { 150 let binding = self.entity.task(); 151 let in_dragonos_path = binding.install.in_dragonos_path.as_ref(); 152 // 如果没有指定安装路径,则不执行安装 153 if in_dragonos_path.is_none() { 154 return Ok(()); 155 } 156 info!("Installing task: {}", self.entity.task().name_version()); 157 let mut in_dragonos_path = in_dragonos_path.unwrap().to_string_lossy().to_string(); 158 159 debug!("in_dragonos_path: {}", in_dragonos_path); 160 // 去除开头的斜杠 161 { 162 let count_leading_slashes = in_dragonos_path.chars().take_while(|c| *c == '/').count(); 163 in_dragonos_path = in_dragonos_path[count_leading_slashes..].to_string(); 164 } 165 // 拼接最终的安装路径 166 let install_path = self.dragonos_sysroot.join(in_dragonos_path); 167 debug!("install_path: {:?}", install_path); 168 // 创建安装路径 169 std::fs::create_dir_all(&install_path).map_err(|e| { 170 ExecutorError::InstallError(format!("Failed to create install path: {}", e.to_string())) 171 })?; 172 173 // 拷贝构建结果到安装路径 174 let build_dir: PathBuf = self.build_dir.path.clone(); 175 FileUtils::copy_dir_all(&build_dir, &install_path) 176 .map_err(|e| ExecutorError::InstallError(e))?; 177 info!("Task {} installed.", self.entity.task().name_version()); 178 179 // 安装完后,删除临时target文件 180 if let Some(target) = self.entity.target() { 181 target.clean_tmpdadk()?; 182 } 183 184 return Ok(()); 185 } 186 187 fn clean(&self) -> Result<(), ExecutorError> { 188 let level = if let Action::Clean(l) = self.action { 189 l.level 190 } else { 191 panic!( 192 "BUG: clean() called with non-clean action. executor details: {:?}", 193 self 194 ); 195 }; 196 info!( 197 "Cleaning task: {}, level={level}", 198 self.entity.task().name_version() 199 ); 200 201 let r: Result<(), ExecutorError> = match level { 202 CleanLevel::All => self.clean_all(), 203 CleanLevel::Src => self.clean_src(), 204 CleanLevel::Target => self.clean_target(), 205 CleanLevel::Cache => self.clean_cache(), 206 }; 207 208 if let Err(e) = r { 209 error!( 210 "Failed to clean task: {}, error message: {:?}", 211 self.entity.task().name_version(), 212 e 213 ); 214 return Err(e); 215 } 216 217 return Ok(()); 218 } 219 220 fn clean_all(&self) -> Result<(), ExecutorError> { 221 // 在源文件目录执行清理 222 self.clean_src()?; 223 // 清理构建结果 224 self.clean_target()?; 225 // 清理缓存 226 self.clean_cache()?; 227 return Ok(()); 228 } 229 230 /// 在源文件目录执行清理 231 fn clean_src(&self) -> Result<(), ExecutorError> { 232 let cmd: Option<Command> = self.create_command()?; 233 if cmd.is_none() { 234 // 如果这里没有命令,则认为用户不需要在源文件目录执行清理 235 return Ok(()); 236 } 237 info!( 238 "{}: Cleaning in source directory: {:?}", 239 self.entity.task().name_version(), 240 self.src_work_dir() 241 ); 242 243 let cmd = cmd.unwrap(); 244 self.run_command(cmd)?; 245 return Ok(()); 246 } 247 248 /// 清理构建输出目录 249 fn clean_target(&self) -> Result<(), ExecutorError> { 250 info!( 251 "{}: Cleaning build target directory: {:?}", 252 self.entity.task().name_version(), 253 self.build_dir.path 254 ); 255 256 return self.build_dir.remove_self_recursive(); 257 } 258 259 /// 清理下载缓存 260 fn clean_cache(&self) -> Result<(), ExecutorError> { 261 let cache_dir = self.source_dir.as_ref(); 262 if cache_dir.is_none() { 263 // 如果没有缓存目录,则认为用户不需要清理缓存 264 return Ok(()); 265 } 266 info!( 267 "{}: Cleaning cache directory: {}", 268 self.entity.task().name_version(), 269 self.src_work_dir().display() 270 ); 271 return cache_dir.unwrap().remove_self_recursive(); 272 } 273 274 /// 获取源文件的工作目录 275 fn src_work_dir(&self) -> PathBuf { 276 if let Some(local_path) = self.entity.task().source_path() { 277 return local_path; 278 } 279 return self.source_dir.as_ref().unwrap().path.clone(); 280 } 281 282 /// 为任务创建命令 283 fn create_command(&self) -> Result<Option<Command>, ExecutorError> { 284 // 获取命令 285 let raw_cmd = match self.entity.task().task_type { 286 TaskType::BuildFromSource(_) => match self.action { 287 Action::Build => self.entity.task().build.build_command.clone(), 288 Action::Clean(_) => self.entity.task().clean.clean_command.clone(), 289 _ => unimplemented!( 290 "create_command: Action {:?} not supported yet.", 291 self.action 292 ), 293 }, 294 295 TaskType::InstallFromPrebuilt(_) => match self.action { 296 Action::Build => self.entity.task().build.build_command.clone(), 297 Action::Clean(_) => self.entity.task().clean.clean_command.clone(), 298 _ => unimplemented!( 299 "create_command: Action {:?} not supported yet.", 300 self.action 301 ), 302 }, 303 }; 304 305 if raw_cmd.is_none() { 306 return Ok(None); 307 } 308 309 let raw_cmd = raw_cmd.unwrap(); 310 311 let mut command = Command::new("bash"); 312 command.current_dir(self.src_work_dir()); 313 314 // 设置参数 315 command.arg("-c"); 316 command.arg(raw_cmd); 317 318 // 设置环境变量 319 let env_list = ENV_LIST.read().unwrap(); 320 for (key, value) in env_list.envs.iter() { 321 // if key.starts_with("DADK") { 322 // debug!("DADK env found: {}={}", key, value.value); 323 // } 324 command.env(key, value.value.clone()); 325 } 326 drop(env_list); 327 for (key, value) in self.local_envs.envs.iter() { 328 command.env(key, value.value.clone()); 329 } 330 331 return Ok(Some(command)); 332 } 333 334 /// # 准备工作线程本地环境变量 335 fn prepare_local_env(&mut self) -> Result<(), ExecutorError> { 336 // 设置本地环境变量 337 self.prepare_target_env()?; 338 339 let binding = self.entity.task(); 340 let task_envs: Option<&Vec<TaskEnv>> = binding.envs.as_ref(); 341 if task_envs.is_none() { 342 return Ok(()); 343 } 344 345 let task_envs = task_envs.unwrap(); 346 for tv in task_envs.iter() { 347 self.local_envs 348 .add(EnvVar::new(tv.key().to_string(), tv.value().to_string())); 349 } 350 351 // 添加`DADK_CURRENT_BUILD_DIR`环境变量,便于构建脚本把构建结果拷贝到这里 352 self.local_envs.add(EnvVar::new( 353 "DADK_CURRENT_BUILD_DIR".to_string(), 354 self.build_dir.path.to_str().unwrap().to_string(), 355 )); 356 357 return Ok(()); 358 } 359 360 fn prepare_input(&self) -> Result<(), ExecutorError> { 361 // 拉取源文件 362 let task = self.entity.task(); 363 match &task.task_type { 364 TaskType::BuildFromSource(cs) => { 365 if self.source_dir.is_none() { 366 return Ok(()); 367 } 368 let source_dir = self.source_dir.as_ref().unwrap(); 369 match cs { 370 CodeSource::Git(git) => { 371 git.prepare(source_dir) 372 .map_err(|e| ExecutorError::PrepareEnvError(e))?; 373 } 374 // 本地源文件,不需要拉取 375 CodeSource::Local(_) => return Ok(()), 376 // 在线压缩包,需要下载 377 CodeSource::Archive(archive) => { 378 archive 379 .download_unzip(source_dir) 380 .map_err(|e| ExecutorError::PrepareEnvError(e))?; 381 } 382 } 383 } 384 TaskType::InstallFromPrebuilt(pb) => { 385 match pb { 386 // 本地源文件,不需要拉取 387 PrebuiltSource::Local(local_source) => { 388 let local_path = local_source.path(); 389 let target_path = &self.build_dir.path; 390 FileUtils::copy_dir_all(&local_path, &target_path) 391 .map_err(|e| ExecutorError::TaskFailed(e))?; // let mut cmd = "cp -r ".to_string(); 392 return Ok(()); 393 } 394 // 在线压缩包,需要下载 395 PrebuiltSource::Archive(archive) => { 396 archive 397 .download_unzip(&self.build_dir) 398 .map_err(|e| ExecutorError::PrepareEnvError(e))?; 399 } 400 } 401 } 402 } 403 404 return Ok(()); 405 } 406 407 fn run_command(&self, mut command: Command) -> Result<(), ExecutorError> { 408 let mut child = command 409 .stdin(Stdio::inherit()) 410 .spawn() 411 .map_err(|e| ExecutorError::IoError(e))?; 412 413 // 等待子进程结束 414 let r = child.wait().map_err(|e| ExecutorError::IoError(e)); 415 if r.is_ok() { 416 let r = r.unwrap(); 417 if r.success() { 418 return Ok(()); 419 } else { 420 // 执行失败,获取最后100行stderr输出 421 let errmsg = format!( 422 "Task {} failed, exit code = {}", 423 self.entity.task().name_version(), 424 r.code().unwrap() 425 ); 426 error!("{errmsg}"); 427 let command_opt = command.output(); 428 if command_opt.is_err() { 429 return Err(ExecutorError::TaskFailed( 430 "Failed to get command output".to_string(), 431 )); 432 } 433 let command_opt = command_opt.unwrap(); 434 let command_output = String::from_utf8_lossy(&command_opt.stderr); 435 let mut last_100_outputs = command_output 436 .lines() 437 .rev() 438 .take(100) 439 .collect::<Vec<&str>>(); 440 last_100_outputs.reverse(); 441 error!("Last 100 lines msg of stderr:"); 442 for line in last_100_outputs { 443 error!("{}", line); 444 } 445 return Err(ExecutorError::TaskFailed(errmsg)); 446 } 447 } else { 448 let errmsg = format!( 449 "Task {} failed, msg = {:?}", 450 self.entity.task().name_version(), 451 r.err().unwrap() 452 ); 453 error!("{errmsg}"); 454 return Err(ExecutorError::TaskFailed(errmsg)); 455 } 456 } 457 458 pub fn mv_target_to_tmp(&mut self) -> Result<(), ExecutorError> { 459 if let Some(rust_target) = self.entity.task().rust_target.clone() { 460 // 将target文件拷贝至 /tmp 下对应的dadk文件的临时target文件中 461 self.entity 462 .target() 463 .as_ref() 464 .unwrap() 465 .cp_to_tmp(&rust_target)?; 466 } 467 return Ok(()); 468 } 469 470 pub fn prepare_target_env(&mut self) -> Result<(), ExecutorError> { 471 if self.entity.task().rust_target.is_some() { 472 // 如果有dadk任务有rust_target字段,需要设置DADK_RUST_TARGET_FILE环境变量,值为临时target文件路径 473 self.entity 474 .target() 475 .as_ref() 476 .unwrap() 477 .prepare_env(&mut self.local_envs); 478 } 479 return Ok(()); 480 } 481 } 482 483 #[derive(Debug, Clone)] 484 pub struct EnvMap { 485 pub envs: BTreeMap<String, EnvVar>, 486 } 487 488 impl EnvMap { 489 pub fn new() -> Self { 490 Self { 491 envs: BTreeMap::new(), 492 } 493 } 494 495 pub fn add(&mut self, env: EnvVar) { 496 self.envs.insert(env.key.clone(), env); 497 } 498 499 #[allow(dead_code)] 500 pub fn get(&self, key: &str) -> Option<&EnvVar> { 501 self.envs.get(key) 502 } 503 504 pub fn add_vars(&mut self, vars: Vars) { 505 for (key, value) in vars { 506 self.add(EnvVar::new(key, value)); 507 } 508 } 509 } 510 511 /// # 环境变量 512 #[derive(Debug, PartialEq, PartialOrd, Eq, Ord, Clone)] 513 pub struct EnvVar { 514 pub key: String, 515 pub value: String, 516 } 517 518 impl EnvVar { 519 pub fn new(key: String, value: String) -> Self { 520 Self { key, value } 521 } 522 } 523 524 /// # 任务执行器错误枚举 525 #[allow(dead_code)] 526 #[derive(Debug)] 527 pub enum ExecutorError { 528 /// 准备执行环境错误 529 PrepareEnvError(String), 530 IoError(std::io::Error), 531 /// 构建执行错误 532 TaskFailed(String), 533 /// 安装错误 534 InstallError(String), 535 /// 清理错误 536 CleanError(String), 537 } 538 539 /// # 准备全局环境变量 540 pub fn prepare_env(sched_entities: &SchedEntities) -> Result<(), ExecutorError> { 541 info!("Preparing environment variables..."); 542 // 获取当前全局环境变量列表 543 let mut env_list = ENV_LIST.write().unwrap(); 544 let envs: Vars = std::env::vars(); 545 env_list.add_vars(envs); 546 547 // 为每个任务创建特定的环境变量 548 for entity in sched_entities.entities().iter() { 549 // 导出任务的构建目录环境变量 550 let build_dir = CacheDir::build_dir(entity.clone())?; 551 552 let build_dir_key = CacheDir::build_dir_env_key(&entity)?; 553 env_list.add(EnvVar::new( 554 build_dir_key, 555 build_dir.to_str().unwrap().to_string(), 556 )); 557 558 // 如果需要源码缓存目录,则导出 559 if CacheDir::need_source_cache(entity) { 560 let source_dir = CacheDir::source_dir(entity.clone())?; 561 let source_dir_key = CacheDir::source_dir_env_key(&entity)?; 562 env_list.add(EnvVar::new( 563 source_dir_key, 564 source_dir.to_str().unwrap().to_string(), 565 )); 566 } 567 } 568 569 // 查看环境变量列表 570 // debug!("Environment variables:"); 571 572 // for (key, value) in env_list.envs.iter() { 573 // debug!("{}: {}", key, value.value); 574 // } 575 576 return Ok(()); 577 } 578