1 use std::{ 2 collections::{BTreeMap, HashMap}, 3 fmt::Debug, 4 path::PathBuf, 5 process::exit, 6 sync::{ 7 atomic::{AtomicI32, Ordering}, 8 Arc, Mutex, RwLock, 9 }, 10 thread::ThreadId, 11 }; 12 13 use log::{error, info}; 14 15 use crate::{ 16 console::Action, 17 executor::{target::Target, Executor}, 18 parser::task::DADKTask, 19 }; 20 21 use self::task_deque::TASK_DEQUE; 22 23 pub mod task_deque; 24 25 lazy_static! { 26 // 线程id与任务实体id映射表 27 pub static ref TID_EID: Mutex<HashMap<ThreadId,i32>> = Mutex::new(HashMap::new()); 28 } 29 30 /// # 调度实体内部结构 31 #[derive(Debug, Clone)] 32 pub struct InnerEntity { 33 /// 任务ID 34 id: i32, 35 file_path: PathBuf, 36 /// 任务 37 task: DADKTask, 38 /// 入度 39 indegree: usize, 40 /// 子节点 41 children: Vec<Arc<SchedEntity>>, 42 /// target管理 43 target: Option<Target>, 44 } 45 46 /// # 调度实体 47 #[derive(Debug)] 48 pub struct SchedEntity { 49 inner: Mutex<InnerEntity>, 50 } 51 52 impl PartialEq for SchedEntity { 53 fn eq(&self, other: &Self) -> bool { 54 self.inner.lock().unwrap().id == other.inner.lock().unwrap().id 55 } 56 } 57 58 impl SchedEntity { 59 #[allow(dead_code)] 60 pub fn id(&self) -> i32 { 61 self.inner.lock().unwrap().id 62 } 63 64 #[allow(dead_code)] 65 pub fn file_path(&self) -> PathBuf { 66 self.inner.lock().unwrap().file_path.clone() 67 } 68 69 #[allow(dead_code)] 70 pub fn task(&self) -> DADKTask { 71 self.inner.lock().unwrap().task.clone() 72 } 73 74 /// 入度加1 75 pub fn add_indegree(&self) { 76 self.inner.lock().unwrap().indegree += 1; 77 } 78 79 /// 入度减1 80 pub fn sub_indegree(&self) -> usize { 81 self.inner.lock().unwrap().indegree -= 1; 82 return self.inner.lock().unwrap().indegree; 83 } 84 85 /// 增加子节点 86 pub fn add_child(&self, entity: Arc<SchedEntity>) { 87 self.inner.lock().unwrap().children.push(entity); 88 } 89 90 /// 获取入度 91 pub fn indegree(&self) -> usize { 92 self.inner.lock().unwrap().indegree 93 } 94 95 /// 获取target 96 pub fn target(&self) -> Option<Target> { 97 self.inner.lock().unwrap().target.clone() 98 } 99 100 /// 当前任务完成后,所有子节点入度减1 101 /// 102 /// ## 参数 103 /// 104 /// 无 105 /// 106 /// ## 返回值 107 /// 108 /// 所有入度为0的子节点集合 109 pub fn sub_children_indegree(&self) -> Vec<Arc<SchedEntity>> { 110 let mut zero_child = Vec::new(); 111 let children = &self.inner.lock().unwrap().children; 112 for child in children.iter() { 113 if child.sub_indegree() == 0 { 114 zero_child.push(child.clone()); 115 } 116 } 117 return zero_child; 118 } 119 } 120 121 /// # 调度实体列表 122 /// 123 /// 用于存储所有的调度实体 124 #[derive(Debug)] 125 pub struct SchedEntities { 126 /// 任务ID到调度实体的映射 127 id2entity: RwLock<BTreeMap<i32, Arc<SchedEntity>>>, 128 } 129 130 impl SchedEntities { 131 pub fn new() -> Self { 132 Self { 133 id2entity: RwLock::new(BTreeMap::new()), 134 } 135 } 136 137 pub fn add(&mut self, entity: Arc<SchedEntity>) { 138 self.id2entity 139 .write() 140 .unwrap() 141 .insert(entity.id(), entity.clone()); 142 } 143 144 #[allow(dead_code)] 145 pub fn get(&self, id: i32) -> Option<Arc<SchedEntity>> { 146 self.id2entity.read().unwrap().get(&id).cloned() 147 } 148 149 pub fn get_by_name_version(&self, name: &str, version: &str) -> Option<Arc<SchedEntity>> { 150 for e in self.id2entity.read().unwrap().iter() { 151 if e.1.task().name_version_env() == DADKTask::name_version_uppercase(name, version) { 152 return Some(e.1.clone()); 153 } 154 } 155 return None; 156 } 157 158 pub fn entities(&self) -> Vec<Arc<SchedEntity>> { 159 let mut v = Vec::new(); 160 for e in self.id2entity.read().unwrap().iter() { 161 v.push(e.1.clone()); 162 } 163 return v; 164 } 165 166 pub fn id2entity(&self) -> BTreeMap<i32, Arc<SchedEntity>> { 167 self.id2entity.read().unwrap().clone() 168 } 169 170 #[allow(dead_code)] 171 pub fn len(&self) -> usize { 172 self.id2entity.read().unwrap().len() 173 } 174 175 #[allow(dead_code)] 176 pub fn is_empty(&self) -> bool { 177 self.id2entity.read().unwrap().is_empty() 178 } 179 180 #[allow(dead_code)] 181 pub fn clear(&mut self) { 182 self.id2entity.write().unwrap().clear(); 183 } 184 185 pub fn topo_sort(&self) -> Vec<Arc<SchedEntity>> { 186 let mut result = Vec::new(); 187 let mut visited = BTreeMap::new(); 188 let btree = self.id2entity.write().unwrap().clone(); 189 for entity in btree.iter() { 190 if !visited.contains_key(entity.0) { 191 let r = self.dfs(entity.1, &mut visited, &mut result); 192 if r.is_err() { 193 let err = r.unwrap_err(); 194 error!("{}", err.display()); 195 println!("Please fix the errors above and try again."); 196 std::process::exit(1); 197 } 198 } 199 } 200 return result; 201 } 202 203 fn dfs( 204 &self, 205 entity: &Arc<SchedEntity>, 206 visited: &mut BTreeMap<i32, bool>, 207 result: &mut Vec<Arc<SchedEntity>>, 208 ) -> Result<(), DependencyCycleError> { 209 visited.insert(entity.id(), false); 210 for dep in entity.task().depends.iter() { 211 if let Some(dep_entity) = self.get_by_name_version(&dep.name, &dep.version) { 212 let guard = self.id2entity.write().unwrap(); 213 let e = guard.get(&entity.id()).unwrap(); 214 let d = guard.get(&dep_entity.id()).unwrap(); 215 e.add_indegree(); 216 d.add_child(e.clone()); 217 if let Some(&false) = visited.get(&dep_entity.id()) { 218 // 输出完整环形依赖 219 let mut err = DependencyCycleError::new(dep_entity.clone()); 220 221 err.add(entity.clone(), dep_entity); 222 return Err(err); 223 } 224 if !visited.contains_key(&dep_entity.id()) { 225 drop(guard); 226 let r = self.dfs(&dep_entity, visited, result); 227 if r.is_err() { 228 let mut err: DependencyCycleError = r.unwrap_err(); 229 // 如果错误已经停止传播,则直接返回 230 if err.stop_propagation { 231 return Err(err); 232 } 233 // 如果当前实体是错误的起始实体,则停止传播 234 if entity == &err.head_entity { 235 err.stop_propagation(); 236 } 237 err.add(entity.clone(), dep_entity); 238 return Err(err); 239 } 240 } 241 } else { 242 error!( 243 "Dependency not found: {} -> {}", 244 entity.task().name_version(), 245 dep.name_version() 246 ); 247 std::process::exit(1); 248 } 249 } 250 visited.insert(entity.id(), true); 251 result.push(entity.clone()); 252 return Ok(()); 253 } 254 } 255 256 /// # 任务调度器 257 #[derive(Debug)] 258 pub struct Scheduler { 259 /// DragonOS sysroot在主机上的路径 260 dragonos_dir: PathBuf, 261 /// 要执行的操作 262 action: Action, 263 /// 调度实体列表 264 target: SchedEntities, 265 } 266 267 pub enum SchedulerError { 268 TaskError(String), 269 DependencyNotFound(Arc<SchedEntity>, String), 270 RunError(String), 271 } 272 273 impl Debug for SchedulerError { 274 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { 275 match self { 276 Self::TaskError(arg0) => { 277 write!(f, "TaskError: {}", arg0) 278 } 279 SchedulerError::DependencyNotFound(current, msg) => { 280 write!( 281 f, 282 "For task {}, dependency not found: {}. Please check file: {}", 283 current.task().name_version(), 284 msg, 285 current.file_path().display() 286 ) 287 } 288 SchedulerError::RunError(msg) => { 289 write!(f, "RunError: {}", msg) 290 } 291 } 292 } 293 } 294 295 impl Scheduler { 296 pub fn new( 297 dragonos_dir: PathBuf, 298 action: Action, 299 tasks: Vec<(PathBuf, DADKTask)>, 300 ) -> Result<Self, SchedulerError> { 301 let entities = SchedEntities::new(); 302 303 let mut scheduler = Scheduler { 304 dragonos_dir, 305 action, 306 target: entities, 307 }; 308 309 let r = scheduler.add_tasks(tasks); 310 if r.is_err() { 311 error!("Error while adding tasks: {:?}", r); 312 return Err(r.err().unwrap()); 313 } 314 315 return Ok(scheduler); 316 } 317 318 /// # 添加多个任务 319 /// 320 /// 添加任务到调度器中,如果任务已经存在,则返回错误 321 pub fn add_tasks(&mut self, tasks: Vec<(PathBuf, DADKTask)>) -> Result<(), SchedulerError> { 322 for task in tasks { 323 self.add_task(task.0, task.1)?; 324 } 325 326 return Ok(()); 327 } 328 329 /// # 添加一个任务 330 /// 331 /// 添加任务到调度器中,如果任务已经存在,则返回错误 332 pub fn add_task( 333 &mut self, 334 path: PathBuf, 335 task: DADKTask, 336 ) -> Result<Arc<SchedEntity>, SchedulerError> { 337 let id: i32 = self.generate_task_id(); 338 let indegree: usize = 0; 339 let children = Vec::new(); 340 let target = self.generate_task_target(&path, &task.rust_target)?; 341 let entity = Arc::new(SchedEntity { 342 inner: Mutex::new(InnerEntity { 343 id, 344 task, 345 file_path: path.clone(), 346 indegree, 347 children, 348 target, 349 }), 350 }); 351 let name_version = (entity.task().name.clone(), entity.task().version.clone()); 352 353 if self 354 .target 355 .get_by_name_version(&name_version.0, &name_version.1) 356 .is_some() 357 { 358 return Err(SchedulerError::TaskError(format!( 359 "Task with name [{}] and version [{}] already exists. Config file: {}", 360 name_version.0, 361 name_version.1, 362 path.display() 363 ))); 364 } 365 366 self.target.add(entity.clone()); 367 368 info!("Task added: {}", entity.task().name_version()); 369 return Ok(entity); 370 } 371 372 fn generate_task_id(&self) -> i32 { 373 static TASK_ID: AtomicI32 = AtomicI32::new(0); 374 return TASK_ID.fetch_add(1, Ordering::SeqCst); 375 } 376 377 fn generate_task_target( 378 &self, 379 path: &PathBuf, 380 rust_target: &Option<String>, 381 ) -> Result<Option<Target>, SchedulerError> { 382 if let Some(rust_target) = rust_target { 383 // 如果rust_target字段不为none,说明需要target管理 384 // 获取dadk任务路径,用于生成临时dadk文件名 385 let file_str = path.as_path().to_str().unwrap(); 386 let tmp_dadk_path = Target::tmp_dadk(file_str); 387 let tmp_dadk_str = tmp_dadk_path.as_path().to_str().unwrap(); 388 389 if Target::is_user_target(rust_target) { 390 // 如果target文件是用户自己的 391 if let Ok(target_path) = Target::user_target_path(rust_target) { 392 let target_path_str = target_path.as_path().to_str().unwrap(); 393 let index = target_path_str.rfind('/').unwrap(); 394 let target_name = target_path_str[index + 1..].to_string(); 395 let tmp_target = PathBuf::from(format!("{}{}", tmp_dadk_str, target_name)); 396 return Ok(Some(Target::new(tmp_target))); 397 } else { 398 return Err(SchedulerError::TaskError( 399 "The path of target file is invalid.".to_string(), 400 )); 401 } 402 } else { 403 // 如果target文件是内置的 404 let tmp_target = PathBuf::from(format!("{}{}.json", tmp_dadk_str, rust_target)); 405 return Ok(Some(Target::new(tmp_target))); 406 } 407 } 408 return Ok(None); 409 } 410 411 /// # 执行调度器中的所有任务 412 pub fn run(&self) -> Result<(), SchedulerError> { 413 // 准备全局环境变量 414 crate::executor::prepare_env(&self.target) 415 .map_err(|e| SchedulerError::RunError(format!("{:?}", e)))?; 416 417 match self.action { 418 Action::Build | Action::Install => { 419 self.run_with_topo_sort()?; 420 } 421 Action::Clean(_) => self.run_without_topo_sort()?, 422 _ => unimplemented!(), 423 } 424 425 return Ok(()); 426 } 427 428 /// Action需要按照拓扑序执行 429 /// 430 /// Action::Build | Action::Install 431 fn run_with_topo_sort(&self) -> Result<(), SchedulerError> { 432 // 检查是否有不存在的依赖 433 let r = self.check_not_exists_dependency(); 434 if r.is_err() { 435 error!("Error while checking tasks: {:?}", r); 436 return r; 437 } 438 439 // 对调度实体进行拓扑排序 440 let r: Vec<Arc<SchedEntity>> = self.target.topo_sort(); 441 442 let action = self.action.clone(); 443 let dragonos_dir = self.dragonos_dir.clone(); 444 let id2entity = self.target.id2entity(); 445 let count = r.len(); 446 447 // 启动守护线程 448 let handler = std::thread::spawn(move || { 449 Self::build_install_daemon(action, dragonos_dir, id2entity, count, &r) 450 }); 451 452 handler.join().expect("Could not join deamon"); 453 454 return Ok(()); 455 } 456 457 /// Action不需要按照拓扑序执行 458 fn run_without_topo_sort(&self) -> Result<(), SchedulerError> { 459 // 启动守护线程 460 let action = self.action.clone(); 461 let dragonos_dir = self.dragonos_dir.clone(); 462 let mut r = self.target.entities(); 463 let handler = std::thread::spawn(move || { 464 Self::clean_daemon(action, dragonos_dir, &mut r); 465 }); 466 467 handler.join().expect("Could not join deamon"); 468 return Ok(()); 469 } 470 471 pub fn execute(action: Action, dragonos_dir: PathBuf, entity: Arc<SchedEntity>) { 472 let mut executor = Executor::new(entity.clone(), action.clone(), dragonos_dir.clone()) 473 .map_err(|e| { 474 error!( 475 "Error while creating executor for task {} : {:?}", 476 entity.task().name_version(), 477 e 478 ); 479 exit(-1); 480 }) 481 .unwrap(); 482 483 executor 484 .execute() 485 .map_err(|e| { 486 error!( 487 "Error while executing task {} : {:?}", 488 entity.task().name_version(), 489 e 490 ); 491 exit(-1); 492 }) 493 .unwrap(); 494 } 495 496 /// 构建和安装DADK任务的守护线程 497 /// 498 /// ## 参数 499 /// 500 /// - `action` : 要执行的操作 501 /// - `dragonos_dir` : DragonOS sysroot在主机上的路径 502 /// - `id2entity` : DADK任务id与实体映射表 503 /// - `count` : 当前剩余任务数 504 /// - `r` : 总任务实体表 505 /// 506 /// ## 返回值 507 /// 508 /// 无 509 pub fn build_install_daemon( 510 action: Action, 511 dragonos_dir: PathBuf, 512 id2entity: BTreeMap<i32, Arc<SchedEntity>>, 513 mut count: usize, 514 r: &Vec<Arc<SchedEntity>>, 515 ) { 516 let mut guard = TASK_DEQUE.lock().unwrap(); 517 // 初始化0入度的任务实体 518 let mut zero_entity: Vec<Arc<SchedEntity>> = Vec::new(); 519 for e in r.iter() { 520 if e.indegree() == 0 { 521 zero_entity.push(e.clone()); 522 } 523 } 524 525 while count > 0 { 526 // 将入度为0的任务实体加入任务队列中,直至没有入度为0的任务实体 或 任务队列满了 527 while !zero_entity.is_empty() 528 && guard.build_install_task( 529 action.clone(), 530 dragonos_dir.clone(), 531 zero_entity.last().unwrap().clone(), 532 ) 533 { 534 zero_entity.pop(); 535 } 536 537 let queue = guard.queue_mut(); 538 // 如果任务线程已完成,将其从任务队列中删除,并把它的子节点入度减1,如果有0入度子节点,则加入zero_entity,后续可以加入任务队列中 539 queue.retain(|x| { 540 if x.is_finished() { 541 count -= 1; 542 let tid = x.thread().id(); 543 let eid = *TID_EID.lock().unwrap().get(&tid).unwrap(); 544 let entity = id2entity.get(&eid).unwrap(); 545 let zero = entity.sub_children_indegree(); 546 for e in zero.iter() { 547 zero_entity.push(e.clone()); 548 } 549 return false; 550 } 551 return true; 552 }) 553 } 554 } 555 556 /// 清理DADK任务的守护线程 557 /// 558 /// ## 参数 559 /// 560 /// - `action` : 要执行的操作 561 /// - `dragonos_dir` : DragonOS sysroot在主机上的路径 562 /// - `r` : 总任务实体表 563 /// 564 /// ## 返回值 565 /// 566 /// 无 567 pub fn clean_daemon(action: Action, dragonos_dir: PathBuf, r: &mut Vec<Arc<SchedEntity>>) { 568 let mut guard = TASK_DEQUE.lock().unwrap(); 569 while !guard.queue().is_empty() && !r.is_empty() { 570 guard.clean_task(action, dragonos_dir.clone(), r.pop().unwrap().clone()); 571 } 572 } 573 574 /// # 检查是否有不存在的依赖 575 /// 576 /// 如果某个任务的dependency中的任务不存在,则返回错误 577 fn check_not_exists_dependency(&self) -> Result<(), SchedulerError> { 578 for entity in self.target.entities().iter() { 579 for dependency in entity.task().depends.iter() { 580 let name_version = (dependency.name.clone(), dependency.version.clone()); 581 if !self 582 .target 583 .get_by_name_version(&name_version.0, &name_version.1) 584 .is_some() 585 { 586 return Err(SchedulerError::DependencyNotFound( 587 entity.clone(), 588 format!("name:{}, version:{}", name_version.0, name_version.1,), 589 )); 590 } 591 } 592 } 593 594 return Ok(()); 595 } 596 } 597 598 /// # 环形依赖错误路径 599 /// 600 /// 本结构体用于在回溯过程中记录环形依赖的路径。 601 /// 602 /// 例如,假设有如下依赖关系: 603 /// 604 /// ```text 605 /// A -> B -> C -> D -> A 606 /// ``` 607 /// 608 /// 则在DFS回溯过程中,会依次记录如下路径: 609 /// 610 /// ```text 611 /// D -> A 612 /// C -> D 613 /// B -> C 614 /// A -> B 615 pub struct DependencyCycleError { 616 /// # 起始实体 617 /// 本错误的起始实体,即环形依赖的起点 618 head_entity: Arc<SchedEntity>, 619 /// 是否停止传播 620 stop_propagation: bool, 621 /// 依赖关系 622 dependencies: Vec<(Arc<SchedEntity>, Arc<SchedEntity>)>, 623 } 624 625 impl DependencyCycleError { 626 pub fn new(head_entity: Arc<SchedEntity>) -> Self { 627 Self { 628 head_entity, 629 stop_propagation: false, 630 dependencies: Vec::new(), 631 } 632 } 633 634 pub fn add(&mut self, current: Arc<SchedEntity>, dependency: Arc<SchedEntity>) { 635 self.dependencies.push((current, dependency)); 636 } 637 638 pub fn stop_propagation(&mut self) { 639 self.stop_propagation = true; 640 } 641 642 #[allow(dead_code)] 643 pub fn dependencies(&self) -> &Vec<(Arc<SchedEntity>, Arc<SchedEntity>)> { 644 &self.dependencies 645 } 646 647 pub fn display(&self) -> String { 648 let mut tmp = self.dependencies.clone(); 649 tmp.reverse(); 650 651 let mut ret = format!("Dependency cycle detected: \nStart ->\n"); 652 for (current, dep) in tmp.iter() { 653 ret.push_str(&format!( 654 "->\t{} ({})\t--depends-->\t{} ({})\n", 655 current.task().name_version(), 656 current.file_path().display(), 657 dep.task().name_version(), 658 dep.file_path().display() 659 )); 660 } 661 ret.push_str("-> End"); 662 return ret; 663 } 664 } 665