xref: /DADK/src/scheduler/mod.rs (revision 674005c9789d72f67f795c4397a4b7e462eb25c4)
1 use std::{
2     collections::BTreeMap,
3     fmt::Debug,
4     path::PathBuf,
5     process::exit,
6     rc::Rc,
7     sync::atomic::{AtomicI32, Ordering},
8 };
9 
10 use log::{error, info};
11 
12 use crate::{console::Action, executor::Executor, parser::task::DADKTask};
13 
14 /// # 调度实体
15 #[derive(Debug, Clone)]
16 pub struct SchedEntity {
17     /// 任务ID
18     id: i32,
19     file_path: PathBuf,
20     /// 任务
21     task: DADKTask,
22 }
23 
24 impl PartialEq for SchedEntity {
25     fn eq(&self, other: &Self) -> bool {
26         self.id == other.id
27     }
28 }
29 
30 impl SchedEntity {
31     #[allow(dead_code)]
32     pub fn id(&self) -> i32 {
33         self.id
34     }
35 
36     #[allow(dead_code)]
37     pub fn file_path(&self) -> &PathBuf {
38         &self.file_path
39     }
40 
41     #[allow(dead_code)]
42     pub fn task(&self) -> &DADKTask {
43         &self.task
44     }
45 
46     #[allow(dead_code)]
47     pub fn task_mut(&mut self) -> &mut DADKTask {
48         &mut self.task
49     }
50 }
51 
52 /// # 调度实体列表
53 ///
54 /// 用于存储所有的调度实体
55 #[derive(Debug)]
56 pub struct SchedEntities {
57     /// 调度实体列表
58     entities: Vec<Rc<SchedEntity>>,
59     /// 任务ID到调度实体的映射
60     id2entity: BTreeMap<i32, Rc<SchedEntity>>,
61     /// 任务名和版本到调度实体的映射
62     name_version_2_entity: BTreeMap<String, Rc<SchedEntity>>,
63 }
64 
65 impl SchedEntities {
66     pub fn new() -> Self {
67         Self {
68             entities: Vec::new(),
69             id2entity: BTreeMap::new(),
70             name_version_2_entity: BTreeMap::new(),
71         }
72     }
73 
74     pub fn add(&mut self, entity: Rc<SchedEntity>) {
75         self.entities.push(entity.clone());
76         self.id2entity.insert(entity.id, entity.clone());
77         self.name_version_2_entity
78             .insert(entity.task.name_version_env(), entity);
79     }
80 
81     #[allow(dead_code)]
82     pub fn get(&self, id: i32) -> Option<Rc<SchedEntity>> {
83         self.id2entity.get(&id).cloned()
84     }
85 
86     pub fn get_by_name_version(&self, name: &str, version: &str) -> Option<Rc<SchedEntity>> {
87         self.name_version_2_entity
88             .get(&DADKTask::name_version_uppercase(name, version))
89             .cloned()
90     }
91 
92     pub fn iter(&self) -> impl Iterator<Item = &Rc<SchedEntity>> {
93         self.entities.iter()
94     }
95 
96     #[allow(dead_code)]
97     pub fn len(&self) -> usize {
98         self.entities.len()
99     }
100 
101     #[allow(dead_code)]
102     pub fn is_empty(&self) -> bool {
103         self.entities.is_empty()
104     }
105 
106     #[allow(dead_code)]
107     pub fn clear(&mut self) {
108         self.entities.clear();
109         self.id2entity.clear();
110         self.name_version_2_entity.clear();
111     }
112 
113     pub fn topo_sort(&self) -> Vec<Rc<SchedEntity>> {
114         let mut result = Vec::new();
115         let mut visited = BTreeMap::new();
116         for entity in self.entities.iter() {
117             if !visited.contains_key(&entity.id) {
118                 let r = self.dfs(entity, &mut visited, &mut result);
119                 if r.is_err() {
120                     let err = r.unwrap_err();
121                     error!("{}", err.display());
122                     println!("Please fix the errors above and try again.");
123                     std::process::exit(1);
124                 }
125             }
126         }
127         return result;
128     }
129 
130     fn dfs(
131         &self,
132         entity: &Rc<SchedEntity>,
133         visited: &mut BTreeMap<i32, bool>,
134         result: &mut Vec<Rc<SchedEntity>>,
135     ) -> Result<(), DependencyCycleError> {
136         visited.insert(entity.id, false);
137         for dep in entity.task.depends.iter() {
138             if let Some(dep_entity) = self.get_by_name_version(&dep.name, &dep.version) {
139                 if let Some(&false) = visited.get(&dep_entity.id) {
140                     // 输出完整环形依赖
141                     let mut err = DependencyCycleError::new(dep_entity.clone());
142 
143                     err.add(entity.clone(), dep_entity);
144                     return Err(err);
145                 }
146                 if !visited.contains_key(&dep_entity.id) {
147                     let r = self.dfs(&dep_entity, visited, result);
148                     if r.is_err() {
149                         let mut err: DependencyCycleError = r.unwrap_err();
150                         // 如果错误已经停止传播,则直接返回
151                         if err.stop_propagation {
152                             return Err(err);
153                         }
154                         // 如果当前实体是错误的起始实体,则停止传播
155                         if entity == &err.head_entity {
156                             err.stop_propagation();
157                         }
158                         err.add(entity.clone(), dep_entity);
159                         return Err(err);
160                     }
161                 }
162             } else {
163                 error!(
164                     "Dependency not found: {} -> {}",
165                     entity.task.name_version(),
166                     dep.name_version()
167                 );
168                 std::process::exit(1);
169             }
170         }
171         visited.insert(entity.id, true);
172         result.push(entity.clone());
173         return Ok(());
174     }
175 }
176 
177 /// # 任务调度器
178 #[derive(Debug)]
179 pub struct Scheduler {
180     /// DragonOS sysroot在主机上的路径
181     dragonos_dir: PathBuf,
182     /// 要执行的操作
183     action: Action,
184     /// 调度实体列表
185     target: SchedEntities,
186 }
187 
188 pub enum SchedulerError {
189     TaskError(String),
190     DependencyNotFound(Rc<SchedEntity>, String),
191     RunError(String),
192 }
193 
194 impl Debug for SchedulerError {
195     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
196         match self {
197             Self::TaskError(arg0) => {
198                 write!(f, "TaskError: {}", arg0)
199             }
200             SchedulerError::DependencyNotFound(current, msg) => {
201                 write!(
202                     f,
203                     "For task {}, dependency not found: {}. Please check file: {}",
204                     current.task.name_version(),
205                     msg,
206                     current.file_path.display()
207                 )
208             }
209             SchedulerError::RunError(msg) => {
210                 write!(f, "RunError: {}", msg)
211             }
212         }
213     }
214 }
215 
216 impl Scheduler {
217     pub fn new(
218         dragonos_dir: PathBuf,
219         action: Action,
220         tasks: Vec<(PathBuf, DADKTask)>,
221     ) -> Result<Self, SchedulerError> {
222         let entities = SchedEntities::new();
223 
224         let mut scheduler = Scheduler {
225             dragonos_dir,
226             action,
227             target: entities,
228         };
229 
230         let r = scheduler.add_tasks(tasks);
231         if r.is_err() {
232             error!("Error while adding tasks: {:?}", r);
233             return Err(r.err().unwrap());
234         }
235 
236         return Ok(scheduler);
237     }
238 
239     /// # 添加多个任务
240     ///
241     /// 添加任务到调度器中,如果任务已经存在,则返回错误
242     pub fn add_tasks(&mut self, tasks: Vec<(PathBuf, DADKTask)>) -> Result<(), SchedulerError> {
243         for task in tasks {
244             self.add_task(task.0, task.1)?;
245         }
246 
247         return Ok(());
248     }
249 
250     /// # 添加一个任务
251     ///
252     /// 添加任务到调度器中,如果任务已经存在,则返回错误
253     pub fn add_task(&mut self, path: PathBuf, task: DADKTask) -> Result<(), SchedulerError> {
254         let id: i32 = self.generate_task_id();
255         let entity = Rc::new(SchedEntity {
256             id,
257             task,
258             file_path: path.clone(),
259         });
260         let name_version = (entity.task.name.clone(), entity.task.version.clone());
261 
262         if self
263             .target
264             .get_by_name_version(&name_version.0, &name_version.1)
265             .is_some()
266         {
267             return Err(SchedulerError::TaskError(format!(
268                 "Task with name [{}] and version [{}] already exists. Config file: {}",
269                 name_version.0,
270                 name_version.1,
271                 path.display()
272             )));
273         }
274 
275         self.target.add(entity.clone());
276 
277         info!("Task added: {}", entity.task.name_version());
278         return Ok(());
279     }
280 
281     fn generate_task_id(&self) -> i32 {
282         static TASK_ID: AtomicI32 = AtomicI32::new(0);
283         return TASK_ID.fetch_add(1, Ordering::SeqCst);
284     }
285 
286     /// # 执行调度器中的所有任务
287     pub fn run(&self) -> Result<(), SchedulerError> {
288         // 准备全局环境变量
289         crate::executor::prepare_env(&self.target)
290             .map_err(|e| SchedulerError::RunError(format!("{:?}", e)))?;
291 
292         match self.action {
293             Action::Build | Action::Install => {
294                 self.run_with_topo_sort()?;
295             }
296             Action::Clean(_) => self.run_without_topo_sort()?,
297             _ => unimplemented!(),
298         }
299 
300         return Ok(());
301     }
302 
303     /// Action需要按照拓扑序执行
304     ///
305     /// Action::Build | Action::Install
306     fn run_with_topo_sort(&self) -> Result<(), SchedulerError> {
307         // 检查是否有不存在的依赖
308         let r = self.check_not_exists_dependency();
309         if r.is_err() {
310             error!("Error while checking tasks: {:?}", r);
311             return r;
312         }
313 
314         // 对调度实体进行拓扑排序
315         let r: Vec<Rc<SchedEntity>> = self.target.topo_sort();
316 
317         for entity in r.iter() {
318             let mut executor = Executor::new(
319                 entity.clone(),
320                 self.action.clone(),
321                 self.dragonos_dir.clone(),
322             )
323             .map_err(|e| {
324                 error!(
325                     "Error while creating executor for task {} : {:?}",
326                     entity.task().name_version(),
327                     e
328                 );
329                 exit(-1);
330             })
331             .unwrap();
332 
333             executor
334                 .execute()
335                 .map_err(|e| {
336                     error!(
337                         "Error while executing task {} : {:?}",
338                         entity.task().name_version(),
339                         e
340                     );
341                     exit(-1);
342                 })
343                 .unwrap();
344         }
345         return Ok(());
346     }
347 
348     /// Action不需要按照拓扑序执行
349     fn run_without_topo_sort(&self) -> Result<(), SchedulerError> {
350         for entity in self.target.iter() {
351             let mut executor = Executor::new(
352                 entity.clone(),
353                 self.action.clone(),
354                 self.dragonos_dir.clone(),
355             )
356             .map_err(|e| {
357                 error!(
358                     "Error while creating executor for task {} : {:?}",
359                     entity.task().name_version(),
360                     e
361                 );
362                 exit(-1);
363             })
364             .unwrap();
365 
366             executor
367                 .execute()
368                 .map_err(|e| {
369                     error!(
370                         "Error while executing task {} : {:?}",
371                         entity.task().name_version(),
372                         e
373                     );
374                     exit(-1);
375                 })
376                 .unwrap();
377         }
378         return Ok(());
379     }
380 
381     /// # 检查是否有不存在的依赖
382     ///
383     /// 如果某个任务的dependency中的任务不存在,则返回错误
384     fn check_not_exists_dependency(&self) -> Result<(), SchedulerError> {
385         for entity in self.target.iter() {
386             for dependency in entity.task.depends.iter() {
387                 let name_version = (dependency.name.clone(), dependency.version.clone());
388                 if !self
389                     .target
390                     .get_by_name_version(&name_version.0, &name_version.1)
391                     .is_some()
392                 {
393                     return Err(SchedulerError::DependencyNotFound(
394                         entity.clone(),
395                         format!("name:{}, version:{}", name_version.0, name_version.1,),
396                     ));
397                 }
398             }
399         }
400 
401         return Ok(());
402     }
403 }
404 
405 /// # 环形依赖错误路径
406 ///
407 /// 本结构体用于在回溯过程中记录环形依赖的路径。
408 ///
409 /// 例如,假设有如下依赖关系:
410 ///
411 /// ```text
412 /// A -> B -> C -> D -> A
413 /// ```
414 ///
415 /// 则在DFS回溯过程中,会依次记录如下路径:
416 ///
417 /// ```text
418 /// D -> A
419 /// C -> D
420 /// B -> C
421 /// A -> B
422 pub struct DependencyCycleError {
423     /// # 起始实体
424     /// 本错误的起始实体,即环形依赖的起点
425     head_entity: Rc<SchedEntity>,
426     /// 是否停止传播
427     stop_propagation: bool,
428     /// 依赖关系
429     dependencies: Vec<(Rc<SchedEntity>, Rc<SchedEntity>)>,
430 }
431 
432 impl DependencyCycleError {
433     pub fn new(head_entity: Rc<SchedEntity>) -> Self {
434         Self {
435             head_entity,
436             stop_propagation: false,
437             dependencies: Vec::new(),
438         }
439     }
440 
441     pub fn add(&mut self, current: Rc<SchedEntity>, dependency: Rc<SchedEntity>) {
442         self.dependencies.push((current, dependency));
443     }
444 
445     pub fn stop_propagation(&mut self) {
446         self.stop_propagation = true;
447     }
448 
449     #[allow(dead_code)]
450     pub fn dependencies(&self) -> &Vec<(Rc<SchedEntity>, Rc<SchedEntity>)> {
451         &self.dependencies
452     }
453 
454     pub fn display(&self) -> String {
455         let mut tmp = self.dependencies.clone();
456         tmp.reverse();
457 
458         let mut ret = format!("Dependency cycle detected: \nStart ->\n");
459         for (current, dep) in tmp.iter() {
460             ret.push_str(&format!(
461                 "->\t{} ({})\t--depends-->\t{} ({})\n",
462                 current.task.name_version(),
463                 current.file_path.display(),
464                 dep.task.name_version(),
465                 dep.file_path.display()
466             ));
467         }
468         ret.push_str("-> End");
469         return ret;
470     }
471 }
472