1 use std::{ 2 collections::{BTreeMap, HashMap}, 3 fmt::Debug, 4 path::PathBuf, 5 process::exit, 6 sync::{ 7 atomic::{AtomicI32, Ordering}, 8 Arc, Mutex, RwLock, 9 }, 10 thread::ThreadId, 11 }; 12 13 use log::{error, info}; 14 15 use crate::{ 16 console::Action, 17 executor::{target::Target, Executor}, 18 parser::task::DADKTask, 19 }; 20 21 use self::task_deque::TASK_DEQUE; 22 23 pub mod task_deque; 24 25 lazy_static! { 26 // 线程id与任务实体id映射表 27 pub static ref TID_EID: Mutex<HashMap<ThreadId,i32>> = Mutex::new(HashMap::new()); 28 } 29 30 /// # 调度实体内部结构 31 #[derive(Debug, Clone)] 32 pub struct InnerEntity { 33 /// 任务ID 34 id: i32, 35 file_path: PathBuf, 36 /// 任务 37 task: DADKTask, 38 /// 入度 39 indegree: usize, 40 /// 子节点 41 children: Vec<Arc<SchedEntity>>, 42 /// target管理 43 target: Option<Target>, 44 } 45 46 /// # 调度实体 47 #[derive(Debug)] 48 pub struct SchedEntity { 49 inner: Mutex<InnerEntity>, 50 } 51 52 impl PartialEq for SchedEntity { 53 fn eq(&self, other: &Self) -> bool { 54 self.inner.lock().unwrap().id == other.inner.lock().unwrap().id 55 } 56 } 57 58 impl SchedEntity { 59 #[allow(dead_code)] 60 pub fn id(&self) -> i32 { 61 self.inner.lock().unwrap().id 62 } 63 64 #[allow(dead_code)] 65 pub fn file_path(&self) -> PathBuf { 66 self.inner.lock().unwrap().file_path.clone() 67 } 68 69 #[allow(dead_code)] 70 pub fn task(&self) -> DADKTask { 71 self.inner.lock().unwrap().task.clone() 72 } 73 74 /// 入度加1 75 pub fn add_indegree(&self) { 76 self.inner.lock().unwrap().indegree += 1; 77 } 78 79 /// 入度减1 80 pub fn sub_indegree(&self) -> usize { 81 self.inner.lock().unwrap().indegree -= 1; 82 return self.inner.lock().unwrap().indegree; 83 } 84 85 /// 增加子节点 86 pub fn add_child(&self, entity: Arc<SchedEntity>) { 87 self.inner.lock().unwrap().children.push(entity); 88 } 89 90 /// 获取入度 91 pub fn indegree(&self) -> usize { 92 self.inner.lock().unwrap().indegree 93 } 94 95 /// 获取target 96 pub fn target(&self) -> Option<Target> { 97 self.inner.lock().unwrap().target.clone() 98 } 99 100 /// 当前任务完成后,所有子节点入度减1 101 /// 102 /// ## 参数 103 /// 104 /// 无 105 /// 106 /// ## 返回值 107 /// 108 /// 所有入度为0的子节点集合 109 pub fn sub_children_indegree(&self) -> Vec<Arc<SchedEntity>> { 110 let mut zero_child = Vec::new(); 111 let children = &self.inner.lock().unwrap().children; 112 for child in children.iter() { 113 if child.sub_indegree() == 0 { 114 zero_child.push(child.clone()); 115 } 116 } 117 return zero_child; 118 } 119 } 120 121 /// # 调度实体列表 122 /// 123 /// 用于存储所有的调度实体 124 #[derive(Debug)] 125 pub struct SchedEntities { 126 /// 任务ID到调度实体的映射 127 id2entity: RwLock<BTreeMap<i32, Arc<SchedEntity>>>, 128 } 129 130 impl SchedEntities { 131 pub fn new() -> Self { 132 Self { 133 id2entity: RwLock::new(BTreeMap::new()), 134 } 135 } 136 137 pub fn add(&mut self, entity: Arc<SchedEntity>) { 138 self.id2entity 139 .write() 140 .unwrap() 141 .insert(entity.id(), entity.clone()); 142 } 143 144 #[allow(dead_code)] 145 pub fn get(&self, id: i32) -> Option<Arc<SchedEntity>> { 146 self.id2entity.read().unwrap().get(&id).cloned() 147 } 148 149 pub fn get_by_name_version(&self, name: &str, version: &str) -> Option<Arc<SchedEntity>> { 150 for e in self.id2entity.read().unwrap().iter() { 151 if e.1.task().name_version_env() == DADKTask::name_version_uppercase(name, version) { 152 return Some(e.1.clone()); 153 } 154 } 155 return None; 156 } 157 158 pub fn entities(&self) -> Vec<Arc<SchedEntity>> { 159 let mut v = Vec::new(); 160 for e in self.id2entity.read().unwrap().iter() { 161 v.push(e.1.clone()); 162 } 163 return v; 164 } 165 166 pub fn id2entity(&self) -> BTreeMap<i32, Arc<SchedEntity>> { 167 self.id2entity.read().unwrap().clone() 168 } 169 170 #[allow(dead_code)] 171 pub fn len(&self) -> usize { 172 self.id2entity.read().unwrap().len() 173 } 174 175 #[allow(dead_code)] 176 pub fn is_empty(&self) -> bool { 177 self.id2entity.read().unwrap().is_empty() 178 } 179 180 #[allow(dead_code)] 181 pub fn clear(&mut self) { 182 self.id2entity.write().unwrap().clear(); 183 } 184 185 pub fn topo_sort(&self) -> Vec<Arc<SchedEntity>> { 186 let mut result = Vec::new(); 187 let mut visited = BTreeMap::new(); 188 let btree = self.id2entity.write().unwrap().clone(); 189 for entity in btree.iter() { 190 if !visited.contains_key(entity.0) { 191 let r = self.dfs(entity.1, &mut visited, &mut result); 192 if r.is_err() { 193 let err = r.unwrap_err(); 194 error!("{}", err.display()); 195 println!("Please fix the errors above and try again."); 196 std::process::exit(1); 197 } 198 } 199 } 200 return result; 201 } 202 203 fn dfs( 204 &self, 205 entity: &Arc<SchedEntity>, 206 visited: &mut BTreeMap<i32, bool>, 207 result: &mut Vec<Arc<SchedEntity>>, 208 ) -> Result<(), DependencyCycleError> { 209 visited.insert(entity.id(), false); 210 for dep in entity.task().depends.iter() { 211 if let Some(dep_entity) = self.get_by_name_version(&dep.name, &dep.version) { 212 let guard = self.id2entity.write().unwrap(); 213 let e = guard.get(&entity.id()).unwrap(); 214 let d = guard.get(&dep_entity.id()).unwrap(); 215 e.add_indegree(); 216 d.add_child(e.clone()); 217 if let Some(&false) = visited.get(&dep_entity.id()) { 218 // 输出完整环形依赖 219 let mut err = DependencyCycleError::new(dep_entity.clone()); 220 221 err.add(entity.clone(), dep_entity); 222 return Err(err); 223 } 224 if !visited.contains_key(&dep_entity.id()) { 225 drop(guard); 226 let r = self.dfs(&dep_entity, visited, result); 227 if r.is_err() { 228 let mut err: DependencyCycleError = r.unwrap_err(); 229 // 如果错误已经停止传播,则直接返回 230 if err.stop_propagation { 231 return Err(err); 232 } 233 // 如果当前实体是错误的起始实体,则停止传播 234 if entity == &err.head_entity { 235 err.stop_propagation(); 236 } 237 err.add(entity.clone(), dep_entity); 238 return Err(err); 239 } 240 } 241 } else { 242 error!( 243 "Dependency not found: {} -> {}", 244 entity.task().name_version(), 245 dep.name_version() 246 ); 247 std::process::exit(1); 248 } 249 } 250 visited.insert(entity.id(), true); 251 result.push(entity.clone()); 252 return Ok(()); 253 } 254 } 255 256 /// # 任务调度器 257 #[derive(Debug)] 258 pub struct Scheduler { 259 /// DragonOS sysroot在主机上的路径 260 dragonos_dir: PathBuf, 261 /// 要执行的操作 262 action: Action, 263 /// 调度实体列表 264 target: SchedEntities, 265 } 266 267 pub enum SchedulerError { 268 TaskError(String), 269 DependencyNotFound(Arc<SchedEntity>, String), 270 RunError(String), 271 } 272 273 impl Debug for SchedulerError { 274 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { 275 match self { 276 Self::TaskError(arg0) => { 277 write!(f, "TaskError: {}", arg0) 278 } 279 SchedulerError::DependencyNotFound(current, msg) => { 280 write!( 281 f, 282 "For task {}, dependency not found: {}. Please check file: {}", 283 current.task().name_version(), 284 msg, 285 current.file_path().display() 286 ) 287 } 288 SchedulerError::RunError(msg) => { 289 write!(f, "RunError: {}", msg) 290 } 291 } 292 } 293 } 294 295 impl Scheduler { 296 pub fn new( 297 dragonos_dir: PathBuf, 298 action: Action, 299 tasks: Vec<(PathBuf, DADKTask)>, 300 ) -> Result<Self, SchedulerError> { 301 let entities = SchedEntities::new(); 302 303 let mut scheduler = Scheduler { 304 dragonos_dir, 305 action, 306 target: entities, 307 }; 308 309 let r = scheduler.add_tasks(tasks); 310 if r.is_err() { 311 error!("Error while adding tasks: {:?}", r); 312 return Err(r.err().unwrap()); 313 } 314 315 return Ok(scheduler); 316 } 317 318 /// # 添加多个任务 319 /// 320 /// 添加任务到调度器中,如果任务已经存在,则返回错误 321 pub fn add_tasks(&mut self, tasks: Vec<(PathBuf, DADKTask)>) -> Result<(), SchedulerError> { 322 for task in tasks { 323 self.add_task(task.0, task.1)?; 324 } 325 326 return Ok(()); 327 } 328 329 /// # 添加一个任务 330 /// 331 /// 添加任务到调度器中,如果任务已经存在,则返回错误 332 pub fn add_task(&mut self, path: PathBuf, task: DADKTask) -> Result<(), SchedulerError> { 333 let id: i32 = self.generate_task_id(); 334 let indegree: usize = 0; 335 let children = Vec::new(); 336 let target = self.generate_task_target(&path, &task.rust_target)?; 337 let entity = Arc::new(SchedEntity { 338 inner: Mutex::new(InnerEntity { 339 id, 340 task, 341 file_path: path.clone(), 342 indegree, 343 children, 344 target, 345 }), 346 }); 347 let name_version = (entity.task().name.clone(), entity.task().version.clone()); 348 349 if self 350 .target 351 .get_by_name_version(&name_version.0, &name_version.1) 352 .is_some() 353 { 354 return Err(SchedulerError::TaskError(format!( 355 "Task with name [{}] and version [{}] already exists. Config file: {}", 356 name_version.0, 357 name_version.1, 358 path.display() 359 ))); 360 } 361 362 self.target.add(entity.clone()); 363 364 info!("Task added: {}", entity.task().name_version()); 365 return Ok(()); 366 } 367 368 fn generate_task_id(&self) -> i32 { 369 static TASK_ID: AtomicI32 = AtomicI32::new(0); 370 return TASK_ID.fetch_add(1, Ordering::SeqCst); 371 } 372 373 fn generate_task_target( 374 &self, 375 path: &PathBuf, 376 rust_target: &Option<String>, 377 ) -> Result<Option<Target>, SchedulerError> { 378 if let Some(rust_target) = rust_target { 379 // 如果rust_target字段不为none,说明需要target管理 380 // 获取dadk任务路径,用于生成临时dadk文件名 381 let file_str = path.as_path().to_str().unwrap(); 382 let tmp_dadk_path = Target::tmp_dadk(file_str); 383 let tmp_dadk_str = tmp_dadk_path.as_path().to_str().unwrap(); 384 385 if Target::is_user_target(rust_target) { 386 // 如果target文件是用户自己的 387 if let Ok(target_path) = Target::user_target_path(rust_target) { 388 let target_path_str = target_path.as_path().to_str().unwrap(); 389 let index = target_path_str.rfind('/').unwrap(); 390 let target_name = target_path_str[index + 1..].to_string(); 391 let tmp_target = PathBuf::from(format!("{}{}", tmp_dadk_str, target_name)); 392 return Ok(Some(Target::new(tmp_target))); 393 } else { 394 return Err(SchedulerError::TaskError( 395 "The path of target file is invalid.".to_string(), 396 )); 397 } 398 } else { 399 // 如果target文件是内置的 400 let tmp_target = PathBuf::from(format!("{}{}.json", tmp_dadk_str, rust_target)); 401 return Ok(Some(Target::new(tmp_target))); 402 } 403 } 404 return Ok(None); 405 } 406 407 /// # 执行调度器中的所有任务 408 pub fn run(&self) -> Result<(), SchedulerError> { 409 // 准备全局环境变量 410 crate::executor::prepare_env(&self.target) 411 .map_err(|e| SchedulerError::RunError(format!("{:?}", e)))?; 412 413 match self.action { 414 Action::Build | Action::Install => { 415 self.run_with_topo_sort()?; 416 } 417 Action::Clean(_) => self.run_without_topo_sort()?, 418 _ => unimplemented!(), 419 } 420 421 return Ok(()); 422 } 423 424 /// Action需要按照拓扑序执行 425 /// 426 /// Action::Build | Action::Install 427 fn run_with_topo_sort(&self) -> Result<(), SchedulerError> { 428 // 检查是否有不存在的依赖 429 let r = self.check_not_exists_dependency(); 430 if r.is_err() { 431 error!("Error while checking tasks: {:?}", r); 432 return r; 433 } 434 435 // 对调度实体进行拓扑排序 436 let r: Vec<Arc<SchedEntity>> = self.target.topo_sort(); 437 438 let action = self.action.clone(); 439 let dragonos_dir = self.dragonos_dir.clone(); 440 let id2entity = self.target.id2entity(); 441 let count = r.len(); 442 443 // 启动守护线程 444 let handler = std::thread::spawn(move || { 445 Self::build_install_daemon(action, dragonos_dir, id2entity, count, &r) 446 }); 447 448 handler.join().expect("Could not join deamon"); 449 450 return Ok(()); 451 } 452 453 /// Action不需要按照拓扑序执行 454 fn run_without_topo_sort(&self) -> Result<(), SchedulerError> { 455 // 启动守护线程 456 let action = self.action.clone(); 457 let dragonos_dir = self.dragonos_dir.clone(); 458 let mut r = self.target.entities(); 459 let handler = std::thread::spawn(move || { 460 Self::clean_daemon(action, dragonos_dir, &mut r); 461 }); 462 463 handler.join().expect("Could not join deamon"); 464 return Ok(()); 465 } 466 467 pub fn execute(action: Action, dragonos_dir: PathBuf, entity: Arc<SchedEntity>) { 468 let mut executor = Executor::new(entity.clone(), action.clone(), dragonos_dir.clone()) 469 .map_err(|e| { 470 error!( 471 "Error while creating executor for task {} : {:?}", 472 entity.task().name_version(), 473 e 474 ); 475 exit(-1); 476 }) 477 .unwrap(); 478 479 executor 480 .execute() 481 .map_err(|e| { 482 error!( 483 "Error while executing task {} : {:?}", 484 entity.task().name_version(), 485 e 486 ); 487 exit(-1); 488 }) 489 .unwrap(); 490 } 491 492 /// 构建和安装DADK任务的守护线程 493 /// 494 /// ## 参数 495 /// 496 /// - `action` : 要执行的操作 497 /// - `dragonos_dir` : DragonOS sysroot在主机上的路径 498 /// - `id2entity` : DADK任务id与实体映射表 499 /// - `count` : 当前剩余任务数 500 /// - `r` : 总任务实体表 501 /// 502 /// ## 返回值 503 /// 504 /// 无 505 pub fn build_install_daemon( 506 action: Action, 507 dragonos_dir: PathBuf, 508 id2entity: BTreeMap<i32, Arc<SchedEntity>>, 509 mut count: usize, 510 r: &Vec<Arc<SchedEntity>>, 511 ) { 512 let mut guard = TASK_DEQUE.lock().unwrap(); 513 // 初始化0入度的任务实体 514 let mut zero_entity: Vec<Arc<SchedEntity>> = Vec::new(); 515 for e in r.iter() { 516 if e.indegree() == 0 { 517 zero_entity.push(e.clone()); 518 } 519 } 520 521 while count > 0 { 522 // 将入度为0的任务实体加入任务队列中,直至没有入度为0的任务实体 或 任务队列满了 523 while !zero_entity.is_empty() 524 && guard.build_install_task( 525 action.clone(), 526 dragonos_dir.clone(), 527 zero_entity.last().unwrap().clone(), 528 ) 529 { 530 zero_entity.pop(); 531 } 532 533 let queue = guard.queue_mut(); 534 // 如果任务线程已完成,将其从任务队列中删除,并把它的子节点入度减1,如果有0入度子节点,则加入zero_entity,后续可以加入任务队列中 535 queue.retain(|x| { 536 if x.is_finished() { 537 count -= 1; 538 let tid = x.thread().id(); 539 let eid = *TID_EID.lock().unwrap().get(&tid).unwrap(); 540 let entity = id2entity.get(&eid).unwrap(); 541 let zero = entity.sub_children_indegree(); 542 for e in zero.iter() { 543 zero_entity.push(e.clone()); 544 } 545 return false; 546 } 547 return true; 548 }) 549 } 550 } 551 552 /// 清理DADK任务的守护线程 553 /// 554 /// ## 参数 555 /// 556 /// - `action` : 要执行的操作 557 /// - `dragonos_dir` : DragonOS sysroot在主机上的路径 558 /// - `r` : 总任务实体表 559 /// 560 /// ## 返回值 561 /// 562 /// 无 563 pub fn clean_daemon(action: Action, dragonos_dir: PathBuf, r: &mut Vec<Arc<SchedEntity>>) { 564 let mut guard = TASK_DEQUE.lock().unwrap(); 565 while !guard.queue().is_empty() && !r.is_empty() { 566 guard.clean_task(action, dragonos_dir.clone(), r.pop().unwrap().clone()); 567 } 568 } 569 570 /// # 检查是否有不存在的依赖 571 /// 572 /// 如果某个任务的dependency中的任务不存在,则返回错误 573 fn check_not_exists_dependency(&self) -> Result<(), SchedulerError> { 574 for entity in self.target.entities().iter() { 575 for dependency in entity.task().depends.iter() { 576 let name_version = (dependency.name.clone(), dependency.version.clone()); 577 if !self 578 .target 579 .get_by_name_version(&name_version.0, &name_version.1) 580 .is_some() 581 { 582 return Err(SchedulerError::DependencyNotFound( 583 entity.clone(), 584 format!("name:{}, version:{}", name_version.0, name_version.1,), 585 )); 586 } 587 } 588 } 589 590 return Ok(()); 591 } 592 } 593 594 /// # 环形依赖错误路径 595 /// 596 /// 本结构体用于在回溯过程中记录环形依赖的路径。 597 /// 598 /// 例如,假设有如下依赖关系: 599 /// 600 /// ```text 601 /// A -> B -> C -> D -> A 602 /// ``` 603 /// 604 /// 则在DFS回溯过程中,会依次记录如下路径: 605 /// 606 /// ```text 607 /// D -> A 608 /// C -> D 609 /// B -> C 610 /// A -> B 611 pub struct DependencyCycleError { 612 /// # 起始实体 613 /// 本错误的起始实体,即环形依赖的起点 614 head_entity: Arc<SchedEntity>, 615 /// 是否停止传播 616 stop_propagation: bool, 617 /// 依赖关系 618 dependencies: Vec<(Arc<SchedEntity>, Arc<SchedEntity>)>, 619 } 620 621 impl DependencyCycleError { 622 pub fn new(head_entity: Arc<SchedEntity>) -> Self { 623 Self { 624 head_entity, 625 stop_propagation: false, 626 dependencies: Vec::new(), 627 } 628 } 629 630 pub fn add(&mut self, current: Arc<SchedEntity>, dependency: Arc<SchedEntity>) { 631 self.dependencies.push((current, dependency)); 632 } 633 634 pub fn stop_propagation(&mut self) { 635 self.stop_propagation = true; 636 } 637 638 #[allow(dead_code)] 639 pub fn dependencies(&self) -> &Vec<(Arc<SchedEntity>, Arc<SchedEntity>)> { 640 &self.dependencies 641 } 642 643 pub fn display(&self) -> String { 644 let mut tmp = self.dependencies.clone(); 645 tmp.reverse(); 646 647 let mut ret = format!("Dependency cycle detected: \nStart ->\n"); 648 for (current, dep) in tmp.iter() { 649 ret.push_str(&format!( 650 "->\t{} ({})\t--depends-->\t{} ({})\n", 651 current.task().name_version(), 652 current.file_path().display(), 653 dep.task().name_version(), 654 dep.file_path().display() 655 )); 656 } 657 ret.push_str("-> End"); 658 return ret; 659 } 660 } 661