1 use std::{ 2 collections::BTreeMap, 3 fmt::Debug, 4 path::PathBuf, 5 process::exit, 6 rc::Rc, 7 sync::atomic::{AtomicI32, Ordering}, 8 }; 9 10 use log::{error, info}; 11 12 use crate::{console::Action, executor::Executor, parser::task::DADKTask}; 13 14 /// # 调度实体 15 #[derive(Debug, Clone)] 16 pub struct SchedEntity { 17 /// 任务ID 18 id: i32, 19 file_path: PathBuf, 20 /// 任务 21 task: DADKTask, 22 } 23 24 impl PartialEq for SchedEntity { 25 fn eq(&self, other: &Self) -> bool { 26 self.id == other.id 27 } 28 } 29 30 impl SchedEntity { 31 #[allow(dead_code)] 32 pub fn id(&self) -> i32 { 33 self.id 34 } 35 36 #[allow(dead_code)] 37 pub fn file_path(&self) -> &PathBuf { 38 &self.file_path 39 } 40 41 #[allow(dead_code)] 42 pub fn task(&self) -> &DADKTask { 43 &self.task 44 } 45 46 #[allow(dead_code)] 47 pub fn task_mut(&mut self) -> &mut DADKTask { 48 &mut self.task 49 } 50 } 51 52 /// # 调度实体列表 53 /// 54 /// 用于存储所有的调度实体 55 #[derive(Debug)] 56 pub struct SchedEntities { 57 /// 调度实体列表 58 entities: Vec<Rc<SchedEntity>>, 59 /// 任务ID到调度实体的映射 60 id2entity: BTreeMap<i32, Rc<SchedEntity>>, 61 /// 任务名和版本到调度实体的映射 62 name_version_2_entity: BTreeMap<String, Rc<SchedEntity>>, 63 } 64 65 impl SchedEntities { 66 pub fn new() -> Self { 67 Self { 68 entities: Vec::new(), 69 id2entity: BTreeMap::new(), 70 name_version_2_entity: BTreeMap::new(), 71 } 72 } 73 74 pub fn add(&mut self, entity: Rc<SchedEntity>) { 75 self.entities.push(entity.clone()); 76 self.id2entity.insert(entity.id, entity.clone()); 77 self.name_version_2_entity 78 .insert(entity.task.name_version_env(), entity); 79 } 80 81 #[allow(dead_code)] 82 pub fn get(&self, id: i32) -> Option<Rc<SchedEntity>> { 83 self.id2entity.get(&id).cloned() 84 } 85 86 pub fn get_by_name_version(&self, name: &str, version: &str) -> Option<Rc<SchedEntity>> { 87 self.name_version_2_entity 88 .get(&DADKTask::name_version_uppercase(name, version)) 89 .cloned() 90 } 91 92 pub fn iter(&self) -> impl Iterator<Item = &Rc<SchedEntity>> { 93 self.entities.iter() 94 } 95 96 #[allow(dead_code)] 97 pub fn len(&self) -> usize { 98 self.entities.len() 99 } 100 101 #[allow(dead_code)] 102 pub fn is_empty(&self) -> bool { 103 self.entities.is_empty() 104 } 105 106 #[allow(dead_code)] 107 pub fn clear(&mut self) { 108 self.entities.clear(); 109 self.id2entity.clear(); 110 self.name_version_2_entity.clear(); 111 } 112 113 pub fn topo_sort(&self) -> Vec<Rc<SchedEntity>> { 114 let mut result = Vec::new(); 115 let mut visited = BTreeMap::new(); 116 for entity in self.entities.iter() { 117 if !visited.contains_key(&entity.id) { 118 let r = self.dfs(entity, &mut visited, &mut result); 119 if r.is_err() { 120 let err = r.unwrap_err(); 121 error!("{}", err.display()); 122 println!("Please fix the errors above and try again."); 123 std::process::exit(1); 124 } 125 } 126 } 127 return result; 128 } 129 130 fn dfs( 131 &self, 132 entity: &Rc<SchedEntity>, 133 visited: &mut BTreeMap<i32, bool>, 134 result: &mut Vec<Rc<SchedEntity>>, 135 ) -> Result<(), DependencyCycleError> { 136 visited.insert(entity.id, false); 137 for dep in entity.task.depends.iter() { 138 if let Some(dep_entity) = self.get_by_name_version(&dep.name, &dep.version) { 139 if let Some(&false) = visited.get(&dep_entity.id) { 140 // 输出完整环形依赖 141 let mut err = DependencyCycleError::new(dep_entity.clone()); 142 143 err.add(entity.clone(), dep_entity); 144 return Err(err); 145 } 146 if !visited.contains_key(&dep_entity.id) { 147 let r = self.dfs(&dep_entity, visited, result); 148 if r.is_err() { 149 let mut err: DependencyCycleError = r.unwrap_err(); 150 // 如果错误已经停止传播,则直接返回 151 if err.stop_propagation { 152 return Err(err); 153 } 154 // 如果当前实体是错误的起始实体,则停止传播 155 if entity == &err.head_entity { 156 err.stop_propagation(); 157 } 158 err.add(entity.clone(), dep_entity); 159 return Err(err); 160 } 161 } 162 } else { 163 error!( 164 "Dependency not found: {} -> {}", 165 entity.task.name_version(), 166 dep.name_version() 167 ); 168 std::process::exit(1); 169 } 170 } 171 visited.insert(entity.id, true); 172 result.push(entity.clone()); 173 return Ok(()); 174 } 175 } 176 177 /// # 任务调度器 178 #[derive(Debug)] 179 pub struct Scheduler { 180 /// DragonOS sysroot在主机上的路径 181 dragonos_dir: PathBuf, 182 /// 要执行的操作 183 action: Action, 184 /// 调度实体列表 185 target: SchedEntities, 186 } 187 188 pub enum SchedulerError { 189 TaskError(String), 190 DependencyNotFound(Rc<SchedEntity>, String), 191 RunError(String), 192 } 193 194 impl Debug for SchedulerError { 195 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { 196 match self { 197 Self::TaskError(arg0) => { 198 write!(f, "TaskError: {}", arg0) 199 } 200 SchedulerError::DependencyNotFound(current, msg) => { 201 write!( 202 f, 203 "For task {}, dependency not found: {}. Please check file: {}", 204 current.task.name_version(), 205 msg, 206 current.file_path.display() 207 ) 208 } 209 SchedulerError::RunError(msg) => { 210 write!(f, "RunError: {}", msg) 211 } 212 } 213 } 214 } 215 216 impl Scheduler { 217 pub fn new( 218 dragonos_dir: PathBuf, 219 action: Action, 220 tasks: Vec<(PathBuf, DADKTask)>, 221 ) -> Result<Self, SchedulerError> { 222 let entities = SchedEntities::new(); 223 224 let mut scheduler = Scheduler { 225 dragonos_dir, 226 action, 227 target: entities, 228 }; 229 230 let r = scheduler.add_tasks(tasks); 231 if r.is_err() { 232 error!("Error while adding tasks: {:?}", r); 233 return Err(r.err().unwrap()); 234 } 235 236 return Ok(scheduler); 237 } 238 239 /// # 添加多个任务 240 /// 241 /// 添加任务到调度器中,如果任务已经存在,则返回错误 242 pub fn add_tasks(&mut self, tasks: Vec<(PathBuf, DADKTask)>) -> Result<(), SchedulerError> { 243 for task in tasks { 244 self.add_task(task.0, task.1)?; 245 } 246 247 return Ok(()); 248 } 249 250 /// # 添加一个任务 251 /// 252 /// 添加任务到调度器中,如果任务已经存在,则返回错误 253 pub fn add_task(&mut self, path: PathBuf, task: DADKTask) -> Result<(), SchedulerError> { 254 let id: i32 = self.generate_task_id(); 255 let entity = Rc::new(SchedEntity { 256 id, 257 task, 258 file_path: path.clone(), 259 }); 260 let name_version = (entity.task.name.clone(), entity.task.version.clone()); 261 262 if self 263 .target 264 .get_by_name_version(&name_version.0, &name_version.1) 265 .is_some() 266 { 267 return Err(SchedulerError::TaskError(format!( 268 "Task with name [{}] and version [{}] already exists. Config file: {}", 269 name_version.0, 270 name_version.1, 271 path.display() 272 ))); 273 } 274 275 self.target.add(entity.clone()); 276 277 info!("Task added: {}", entity.task.name_version()); 278 return Ok(()); 279 } 280 281 fn generate_task_id(&self) -> i32 { 282 static TASK_ID: AtomicI32 = AtomicI32::new(0); 283 return TASK_ID.fetch_add(1, Ordering::SeqCst); 284 } 285 286 /// # 执行调度器中的所有任务 287 pub fn run(&self) -> Result<(), SchedulerError> { 288 // 准备全局环境变量 289 crate::executor::prepare_env(&self.target) 290 .map_err(|e| SchedulerError::RunError(format!("{:?}", e)))?; 291 292 match self.action { 293 Action::Build | Action::Install => { 294 self.run_with_topo_sort()?; 295 } 296 Action::Clean(_) => self.run_without_topo_sort()?, 297 _ => unimplemented!(), 298 } 299 300 return Ok(()); 301 } 302 303 /// Action需要按照拓扑序执行 304 /// 305 /// Action::Build | Action::Install 306 fn run_with_topo_sort(&self) -> Result<(), SchedulerError> { 307 // 检查是否有不存在的依赖 308 let r = self.check_not_exists_dependency(); 309 if r.is_err() { 310 error!("Error while checking tasks: {:?}", r); 311 return r; 312 } 313 314 // 对调度实体进行拓扑排序 315 let r: Vec<Rc<SchedEntity>> = self.target.topo_sort(); 316 317 for entity in r.iter() { 318 let mut executor = Executor::new( 319 entity.clone(), 320 self.action.clone(), 321 self.dragonos_dir.clone(), 322 ) 323 .map_err(|e| { 324 error!( 325 "Error while creating executor for task {} : {:?}", 326 entity.task().name_version(), 327 e 328 ); 329 exit(-1); 330 }) 331 .unwrap(); 332 333 executor 334 .execute() 335 .map_err(|e| { 336 error!( 337 "Error while executing task {} : {:?}", 338 entity.task().name_version(), 339 e 340 ); 341 exit(-1); 342 }) 343 .unwrap(); 344 } 345 return Ok(()); 346 } 347 348 /// Action不需要按照拓扑序执行 349 fn run_without_topo_sort(&self) -> Result<(), SchedulerError> { 350 for entity in self.target.iter() { 351 let mut executor = Executor::new( 352 entity.clone(), 353 self.action.clone(), 354 self.dragonos_dir.clone(), 355 ) 356 .map_err(|e| { 357 error!( 358 "Error while creating executor for task {} : {:?}", 359 entity.task().name_version(), 360 e 361 ); 362 exit(-1); 363 }) 364 .unwrap(); 365 366 executor 367 .execute() 368 .map_err(|e| { 369 error!( 370 "Error while executing task {} : {:?}", 371 entity.task().name_version(), 372 e 373 ); 374 exit(-1); 375 }) 376 .unwrap(); 377 } 378 return Ok(()); 379 } 380 381 /// # 检查是否有不存在的依赖 382 /// 383 /// 如果某个任务的dependency中的任务不存在,则返回错误 384 fn check_not_exists_dependency(&self) -> Result<(), SchedulerError> { 385 for entity in self.target.iter() { 386 for dependency in entity.task.depends.iter() { 387 let name_version = (dependency.name.clone(), dependency.version.clone()); 388 if !self 389 .target 390 .get_by_name_version(&name_version.0, &name_version.1) 391 .is_some() 392 { 393 return Err(SchedulerError::DependencyNotFound( 394 entity.clone(), 395 format!("name:{}, version:{}", name_version.0, name_version.1,), 396 )); 397 } 398 } 399 } 400 401 return Ok(()); 402 } 403 } 404 405 /// # 环形依赖错误路径 406 /// 407 /// 本结构体用于在回溯过程中记录环形依赖的路径。 408 /// 409 /// 例如,假设有如下依赖关系: 410 /// 411 /// ```text 412 /// A -> B -> C -> D -> A 413 /// ``` 414 /// 415 /// 则在DFS回溯过程中,会依次记录如下路径: 416 /// 417 /// ```text 418 /// D -> A 419 /// C -> D 420 /// B -> C 421 /// A -> B 422 pub struct DependencyCycleError { 423 /// # 起始实体 424 /// 本错误的起始实体,即环形依赖的起点 425 head_entity: Rc<SchedEntity>, 426 /// 是否停止传播 427 stop_propagation: bool, 428 /// 依赖关系 429 dependencies: Vec<(Rc<SchedEntity>, Rc<SchedEntity>)>, 430 } 431 432 impl DependencyCycleError { 433 pub fn new(head_entity: Rc<SchedEntity>) -> Self { 434 Self { 435 head_entity, 436 stop_propagation: false, 437 dependencies: Vec::new(), 438 } 439 } 440 441 pub fn add(&mut self, current: Rc<SchedEntity>, dependency: Rc<SchedEntity>) { 442 self.dependencies.push((current, dependency)); 443 } 444 445 pub fn stop_propagation(&mut self) { 446 self.stop_propagation = true; 447 } 448 449 #[allow(dead_code)] 450 pub fn dependencies(&self) -> &Vec<(Rc<SchedEntity>, Rc<SchedEntity>)> { 451 &self.dependencies 452 } 453 454 pub fn display(&self) -> String { 455 let mut tmp = self.dependencies.clone(); 456 tmp.reverse(); 457 458 let mut ret = format!("Dependency cycle detected: \nStart ->\n"); 459 for (current, dep) in tmp.iter() { 460 ret.push_str(&format!( 461 "->\t{} ({})\t--depends-->\t{} ({})\n", 462 current.task.name_version(), 463 current.file_path.display(), 464 dep.task.name_version(), 465 dep.file_path.display() 466 )); 467 } 468 ret.push_str("-> End"); 469 return ret; 470 } 471 } 472