xref: /DADK/src/scheduler/mod.rs (revision c5dad5d052be95bca4a6a49d01be12411cfcd468)
1 use std::{
2     collections::{BTreeMap, HashMap},
3     fmt::Debug,
4     path::PathBuf,
5     process::exit,
6     sync::{
7         atomic::{AtomicI32, Ordering},
8         Arc, Mutex, RwLock,
9     },
10     thread::ThreadId,
11 };
12 
13 use log::{error, info};
14 
15 use crate::{
16     console::Action,
17     executor::{target::Target, Executor},
18     parser::task::DADKTask,
19 };
20 
21 use self::task_deque::TASK_DEQUE;
22 
23 pub mod task_deque;
24 
25 lazy_static! {
26     // 线程id与任务实体id映射表
27     pub static ref TID_EID: Mutex<HashMap<ThreadId,i32>> = Mutex::new(HashMap::new());
28 }
29 
30 /// # 调度实体内部结构
31 #[derive(Debug, Clone)]
32 pub struct InnerEntity {
33     /// 任务ID
34     id: i32,
35     file_path: PathBuf,
36     /// 任务
37     task: DADKTask,
38     /// 入度
39     indegree: usize,
40     /// 子节点
41     children: Vec<Arc<SchedEntity>>,
42     /// target管理
43     target: Option<Target>,
44 }
45 
46 /// # 调度实体
47 #[derive(Debug)]
48 pub struct SchedEntity {
49     inner: Mutex<InnerEntity>,
50 }
51 
52 impl PartialEq for SchedEntity {
53     fn eq(&self, other: &Self) -> bool {
54         self.inner.lock().unwrap().id == other.inner.lock().unwrap().id
55     }
56 }
57 
58 impl SchedEntity {
59     #[allow(dead_code)]
60     pub fn id(&self) -> i32 {
61         self.inner.lock().unwrap().id
62     }
63 
64     #[allow(dead_code)]
65     pub fn file_path(&self) -> PathBuf {
66         self.inner.lock().unwrap().file_path.clone()
67     }
68 
69     #[allow(dead_code)]
70     pub fn task(&self) -> DADKTask {
71         self.inner.lock().unwrap().task.clone()
72     }
73 
74     /// 入度加1
75     pub fn add_indegree(&self) {
76         self.inner.lock().unwrap().indegree += 1;
77     }
78 
79     /// 入度减1
80     pub fn sub_indegree(&self) -> usize {
81         self.inner.lock().unwrap().indegree -= 1;
82         return self.inner.lock().unwrap().indegree;
83     }
84 
85     /// 增加子节点
86     pub fn add_child(&self, entity: Arc<SchedEntity>) {
87         self.inner.lock().unwrap().children.push(entity);
88     }
89 
90     /// 获取入度
91     pub fn indegree(&self) -> usize {
92         self.inner.lock().unwrap().indegree
93     }
94 
95     /// 获取target
96     pub fn target(&self) -> Option<Target> {
97         self.inner.lock().unwrap().target.clone()
98     }
99 
100     /// 当前任务完成后,所有子节点入度减1
101     ///
102     /// ## 参数
103     ///
104     /// 无
105     ///
106     /// ## 返回值
107     ///
108     /// 所有入度为0的子节点集合
109     pub fn sub_children_indegree(&self) -> Vec<Arc<SchedEntity>> {
110         let mut zero_child = Vec::new();
111         let children = &self.inner.lock().unwrap().children;
112         for child in children.iter() {
113             if child.sub_indegree() == 0 {
114                 zero_child.push(child.clone());
115             }
116         }
117         return zero_child;
118     }
119 }
120 
121 /// # 调度实体列表
122 ///
123 /// 用于存储所有的调度实体
124 #[derive(Debug)]
125 pub struct SchedEntities {
126     /// 任务ID到调度实体的映射
127     id2entity: RwLock<BTreeMap<i32, Arc<SchedEntity>>>,
128 }
129 
130 impl SchedEntities {
131     pub fn new() -> Self {
132         Self {
133             id2entity: RwLock::new(BTreeMap::new()),
134         }
135     }
136 
137     pub fn add(&mut self, entity: Arc<SchedEntity>) {
138         self.id2entity
139             .write()
140             .unwrap()
141             .insert(entity.id(), entity.clone());
142     }
143 
144     #[allow(dead_code)]
145     pub fn get(&self, id: i32) -> Option<Arc<SchedEntity>> {
146         self.id2entity.read().unwrap().get(&id).cloned()
147     }
148 
149     pub fn get_by_name_version(&self, name: &str, version: &str) -> Option<Arc<SchedEntity>> {
150         for e in self.id2entity.read().unwrap().iter() {
151             if e.1.task().name_version_env() == DADKTask::name_version_uppercase(name, version) {
152                 return Some(e.1.clone());
153             }
154         }
155         return None;
156     }
157 
158     pub fn entities(&self) -> Vec<Arc<SchedEntity>> {
159         let mut v = Vec::new();
160         for e in self.id2entity.read().unwrap().iter() {
161             v.push(e.1.clone());
162         }
163         return v;
164     }
165 
166     pub fn id2entity(&self) -> BTreeMap<i32, Arc<SchedEntity>> {
167         self.id2entity.read().unwrap().clone()
168     }
169 
170     #[allow(dead_code)]
171     pub fn len(&self) -> usize {
172         self.id2entity.read().unwrap().len()
173     }
174 
175     #[allow(dead_code)]
176     pub fn is_empty(&self) -> bool {
177         self.id2entity.read().unwrap().is_empty()
178     }
179 
180     #[allow(dead_code)]
181     pub fn clear(&mut self) {
182         self.id2entity.write().unwrap().clear();
183     }
184 
185     pub fn topo_sort(&self) -> Vec<Arc<SchedEntity>> {
186         let mut result = Vec::new();
187         let mut visited = BTreeMap::new();
188         let btree = self.id2entity.write().unwrap().clone();
189         for entity in btree.iter() {
190             if !visited.contains_key(entity.0) {
191                 let r = self.dfs(entity.1, &mut visited, &mut result);
192                 if r.is_err() {
193                     let err = r.unwrap_err();
194                     error!("{}", err.display());
195                     println!("Please fix the errors above and try again.");
196                     std::process::exit(1);
197                 }
198             }
199         }
200         return result;
201     }
202 
203     fn dfs(
204         &self,
205         entity: &Arc<SchedEntity>,
206         visited: &mut BTreeMap<i32, bool>,
207         result: &mut Vec<Arc<SchedEntity>>,
208     ) -> Result<(), DependencyCycleError> {
209         visited.insert(entity.id(), false);
210         for dep in entity.task().depends.iter() {
211             if let Some(dep_entity) = self.get_by_name_version(&dep.name, &dep.version) {
212                 let guard = self.id2entity.write().unwrap();
213                 let e = guard.get(&entity.id()).unwrap();
214                 let d = guard.get(&dep_entity.id()).unwrap();
215                 e.add_indegree();
216                 d.add_child(e.clone());
217                 if let Some(&false) = visited.get(&dep_entity.id()) {
218                     // 输出完整环形依赖
219                     let mut err = DependencyCycleError::new(dep_entity.clone());
220 
221                     err.add(entity.clone(), dep_entity);
222                     return Err(err);
223                 }
224                 if !visited.contains_key(&dep_entity.id()) {
225                     drop(guard);
226                     let r = self.dfs(&dep_entity, visited, result);
227                     if r.is_err() {
228                         let mut err: DependencyCycleError = r.unwrap_err();
229                         // 如果错误已经停止传播,则直接返回
230                         if err.stop_propagation {
231                             return Err(err);
232                         }
233                         // 如果当前实体是错误的起始实体,则停止传播
234                         if entity == &err.head_entity {
235                             err.stop_propagation();
236                         }
237                         err.add(entity.clone(), dep_entity);
238                         return Err(err);
239                     }
240                 }
241             } else {
242                 error!(
243                     "Dependency not found: {} -> {}",
244                     entity.task().name_version(),
245                     dep.name_version()
246                 );
247                 std::process::exit(1);
248             }
249         }
250         visited.insert(entity.id(), true);
251         result.push(entity.clone());
252         return Ok(());
253     }
254 }
255 
256 /// # 任务调度器
257 #[derive(Debug)]
258 pub struct Scheduler {
259     /// DragonOS sysroot在主机上的路径
260     dragonos_dir: PathBuf,
261     /// 要执行的操作
262     action: Action,
263     /// 调度实体列表
264     target: SchedEntities,
265 }
266 
267 pub enum SchedulerError {
268     TaskError(String),
269     DependencyNotFound(Arc<SchedEntity>, String),
270     RunError(String),
271 }
272 
273 impl Debug for SchedulerError {
274     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
275         match self {
276             Self::TaskError(arg0) => {
277                 write!(f, "TaskError: {}", arg0)
278             }
279             SchedulerError::DependencyNotFound(current, msg) => {
280                 write!(
281                     f,
282                     "For task {}, dependency not found: {}. Please check file: {}",
283                     current.task().name_version(),
284                     msg,
285                     current.file_path().display()
286                 )
287             }
288             SchedulerError::RunError(msg) => {
289                 write!(f, "RunError: {}", msg)
290             }
291         }
292     }
293 }
294 
295 impl Scheduler {
296     pub fn new(
297         dragonos_dir: PathBuf,
298         action: Action,
299         tasks: Vec<(PathBuf, DADKTask)>,
300     ) -> Result<Self, SchedulerError> {
301         let entities = SchedEntities::new();
302 
303         let mut scheduler = Scheduler {
304             dragonos_dir,
305             action,
306             target: entities,
307         };
308 
309         let r = scheduler.add_tasks(tasks);
310         if r.is_err() {
311             error!("Error while adding tasks: {:?}", r);
312             return Err(r.err().unwrap());
313         }
314 
315         return Ok(scheduler);
316     }
317 
318     /// # 添加多个任务
319     ///
320     /// 添加任务到调度器中,如果任务已经存在,则返回错误
321     pub fn add_tasks(&mut self, tasks: Vec<(PathBuf, DADKTask)>) -> Result<(), SchedulerError> {
322         for task in tasks {
323             self.add_task(task.0, task.1)?;
324         }
325 
326         return Ok(());
327     }
328 
329     /// # 添加一个任务
330     ///
331     /// 添加任务到调度器中,如果任务已经存在,则返回错误
332     pub fn add_task(&mut self, path: PathBuf, task: DADKTask) -> Result<(), SchedulerError> {
333         let id: i32 = self.generate_task_id();
334         let indegree: usize = 0;
335         let children = Vec::new();
336         let target = self.generate_task_target(&path, &task.rust_target)?;
337         let entity = Arc::new(SchedEntity {
338             inner: Mutex::new(InnerEntity {
339                 id,
340                 task,
341                 file_path: path.clone(),
342                 indegree,
343                 children,
344                 target,
345             }),
346         });
347         let name_version = (entity.task().name.clone(), entity.task().version.clone());
348 
349         if self
350             .target
351             .get_by_name_version(&name_version.0, &name_version.1)
352             .is_some()
353         {
354             return Err(SchedulerError::TaskError(format!(
355                 "Task with name [{}] and version [{}] already exists. Config file: {}",
356                 name_version.0,
357                 name_version.1,
358                 path.display()
359             )));
360         }
361 
362         self.target.add(entity.clone());
363 
364         info!("Task added: {}", entity.task().name_version());
365         return Ok(());
366     }
367 
368     fn generate_task_id(&self) -> i32 {
369         static TASK_ID: AtomicI32 = AtomicI32::new(0);
370         return TASK_ID.fetch_add(1, Ordering::SeqCst);
371     }
372 
373     fn generate_task_target(
374         &self,
375         path: &PathBuf,
376         rust_target: &Option<String>,
377     ) -> Result<Option<Target>, SchedulerError> {
378         if let Some(rust_target) = rust_target {
379             // 如果rust_target字段不为none,说明需要target管理
380             // 获取dadk任务路径,用于生成临时dadk文件名
381             let file_str = path.as_path().to_str().unwrap();
382             let tmp_dadk_path = Target::tmp_dadk(file_str);
383             let tmp_dadk_str = tmp_dadk_path.as_path().to_str().unwrap();
384 
385             if Target::is_user_target(rust_target) {
386                 // 如果target文件是用户自己的
387                 if let Ok(target_path) = Target::user_target_path(rust_target) {
388                     let target_path_str = target_path.as_path().to_str().unwrap();
389                     let index = target_path_str.rfind('/').unwrap();
390                     let target_name = target_path_str[index + 1..].to_string();
391                     let tmp_target = PathBuf::from(format!("{}{}", tmp_dadk_str, target_name));
392                     return Ok(Some(Target::new(tmp_target)));
393                 } else {
394                     return Err(SchedulerError::TaskError(
395                         "The path of target file is invalid.".to_string(),
396                     ));
397                 }
398             } else {
399                 // 如果target文件是内置的
400                 let tmp_target = PathBuf::from(format!("{}{}.json", tmp_dadk_str, rust_target));
401                 return Ok(Some(Target::new(tmp_target)));
402             }
403         }
404         return Ok(None);
405     }
406 
407     /// # 执行调度器中的所有任务
408     pub fn run(&self) -> Result<(), SchedulerError> {
409         // 准备全局环境变量
410         crate::executor::prepare_env(&self.target)
411             .map_err(|e| SchedulerError::RunError(format!("{:?}", e)))?;
412 
413         match self.action {
414             Action::Build | Action::Install => {
415                 self.run_with_topo_sort()?;
416             }
417             Action::Clean(_) => self.run_without_topo_sort()?,
418             _ => unimplemented!(),
419         }
420 
421         return Ok(());
422     }
423 
424     /// Action需要按照拓扑序执行
425     ///
426     /// Action::Build | Action::Install
427     fn run_with_topo_sort(&self) -> Result<(), SchedulerError> {
428         // 检查是否有不存在的依赖
429         let r = self.check_not_exists_dependency();
430         if r.is_err() {
431             error!("Error while checking tasks: {:?}", r);
432             return r;
433         }
434 
435         // 对调度实体进行拓扑排序
436         let r: Vec<Arc<SchedEntity>> = self.target.topo_sort();
437 
438         let action = self.action.clone();
439         let dragonos_dir = self.dragonos_dir.clone();
440         let id2entity = self.target.id2entity();
441         let count = r.len();
442 
443         // 启动守护线程
444         let handler = std::thread::spawn(move || {
445             Self::build_install_daemon(action, dragonos_dir, id2entity, count, &r)
446         });
447 
448         handler.join().expect("Could not join deamon");
449 
450         return Ok(());
451     }
452 
453     /// Action不需要按照拓扑序执行
454     fn run_without_topo_sort(&self) -> Result<(), SchedulerError> {
455         // 启动守护线程
456         let action = self.action.clone();
457         let dragonos_dir = self.dragonos_dir.clone();
458         let mut r = self.target.entities();
459         let handler = std::thread::spawn(move || {
460             Self::clean_daemon(action, dragonos_dir, &mut r);
461         });
462 
463         handler.join().expect("Could not join deamon");
464         return Ok(());
465     }
466 
467     pub fn execute(action: Action, dragonos_dir: PathBuf, entity: Arc<SchedEntity>) {
468         let mut executor = Executor::new(entity.clone(), action.clone(), dragonos_dir.clone())
469             .map_err(|e| {
470                 error!(
471                     "Error while creating executor for task {} : {:?}",
472                     entity.task().name_version(),
473                     e
474                 );
475                 exit(-1);
476             })
477             .unwrap();
478 
479         executor
480             .execute()
481             .map_err(|e| {
482                 error!(
483                     "Error while executing task {} : {:?}",
484                     entity.task().name_version(),
485                     e
486                 );
487                 exit(-1);
488             })
489             .unwrap();
490     }
491 
492     /// 构建和安装DADK任务的守护线程
493     ///
494     /// ## 参数
495     ///
496     /// - `action` : 要执行的操作
497     /// - `dragonos_dir` : DragonOS sysroot在主机上的路径
498     /// - `id2entity` : DADK任务id与实体映射表
499     /// - `count` : 当前剩余任务数
500     /// - `r` : 总任务实体表
501     ///
502     /// ## 返回值
503     ///
504     /// 无
505     pub fn build_install_daemon(
506         action: Action,
507         dragonos_dir: PathBuf,
508         id2entity: BTreeMap<i32, Arc<SchedEntity>>,
509         mut count: usize,
510         r: &Vec<Arc<SchedEntity>>,
511     ) {
512         let mut guard = TASK_DEQUE.lock().unwrap();
513         // 初始化0入度的任务实体
514         let mut zero_entity: Vec<Arc<SchedEntity>> = Vec::new();
515         for e in r.iter() {
516             if e.indegree() == 0 {
517                 zero_entity.push(e.clone());
518             }
519         }
520 
521         while count > 0 {
522             // 将入度为0的任务实体加入任务队列中,直至没有入度为0的任务实体 或 任务队列满了
523             while !zero_entity.is_empty()
524                 && guard.build_install_task(
525                     action.clone(),
526                     dragonos_dir.clone(),
527                     zero_entity.last().unwrap().clone(),
528                 )
529             {
530                 zero_entity.pop();
531             }
532 
533             let queue = guard.queue_mut();
534             // 如果任务线程已完成,将其从任务队列中删除,并把它的子节点入度减1,如果有0入度子节点,则加入zero_entity,后续可以加入任务队列中
535             queue.retain(|x| {
536                 if x.is_finished() {
537                     count -= 1;
538                     let tid = x.thread().id();
539                     let eid = *TID_EID.lock().unwrap().get(&tid).unwrap();
540                     let entity = id2entity.get(&eid).unwrap();
541                     let zero = entity.sub_children_indegree();
542                     for e in zero.iter() {
543                         zero_entity.push(e.clone());
544                     }
545                     return false;
546                 }
547                 return true;
548             })
549         }
550     }
551 
552     /// 清理DADK任务的守护线程
553     ///
554     /// ## 参数
555     ///
556     /// - `action` : 要执行的操作
557     /// - `dragonos_dir` : DragonOS sysroot在主机上的路径
558     /// - `r` : 总任务实体表
559     ///
560     /// ## 返回值
561     ///
562     /// 无
563     pub fn clean_daemon(action: Action, dragonos_dir: PathBuf, r: &mut Vec<Arc<SchedEntity>>) {
564         let mut guard = TASK_DEQUE.lock().unwrap();
565         while !guard.queue().is_empty() && !r.is_empty() {
566             guard.clean_task(action, dragonos_dir.clone(), r.pop().unwrap().clone());
567         }
568     }
569 
570     /// # 检查是否有不存在的依赖
571     ///
572     /// 如果某个任务的dependency中的任务不存在,则返回错误
573     fn check_not_exists_dependency(&self) -> Result<(), SchedulerError> {
574         for entity in self.target.entities().iter() {
575             for dependency in entity.task().depends.iter() {
576                 let name_version = (dependency.name.clone(), dependency.version.clone());
577                 if !self
578                     .target
579                     .get_by_name_version(&name_version.0, &name_version.1)
580                     .is_some()
581                 {
582                     return Err(SchedulerError::DependencyNotFound(
583                         entity.clone(),
584                         format!("name:{}, version:{}", name_version.0, name_version.1,),
585                     ));
586                 }
587             }
588         }
589 
590         return Ok(());
591     }
592 }
593 
594 /// # 环形依赖错误路径
595 ///
596 /// 本结构体用于在回溯过程中记录环形依赖的路径。
597 ///
598 /// 例如,假设有如下依赖关系:
599 ///
600 /// ```text
601 /// A -> B -> C -> D -> A
602 /// ```
603 ///
604 /// 则在DFS回溯过程中,会依次记录如下路径:
605 ///
606 /// ```text
607 /// D -> A
608 /// C -> D
609 /// B -> C
610 /// A -> B
611 pub struct DependencyCycleError {
612     /// # 起始实体
613     /// 本错误的起始实体,即环形依赖的起点
614     head_entity: Arc<SchedEntity>,
615     /// 是否停止传播
616     stop_propagation: bool,
617     /// 依赖关系
618     dependencies: Vec<(Arc<SchedEntity>, Arc<SchedEntity>)>,
619 }
620 
621 impl DependencyCycleError {
622     pub fn new(head_entity: Arc<SchedEntity>) -> Self {
623         Self {
624             head_entity,
625             stop_propagation: false,
626             dependencies: Vec::new(),
627         }
628     }
629 
630     pub fn add(&mut self, current: Arc<SchedEntity>, dependency: Arc<SchedEntity>) {
631         self.dependencies.push((current, dependency));
632     }
633 
634     pub fn stop_propagation(&mut self) {
635         self.stop_propagation = true;
636     }
637 
638     #[allow(dead_code)]
639     pub fn dependencies(&self) -> &Vec<(Arc<SchedEntity>, Arc<SchedEntity>)> {
640         &self.dependencies
641     }
642 
643     pub fn display(&self) -> String {
644         let mut tmp = self.dependencies.clone();
645         tmp.reverse();
646 
647         let mut ret = format!("Dependency cycle detected: \nStart ->\n");
648         for (current, dep) in tmp.iter() {
649             ret.push_str(&format!(
650                 "->\t{} ({})\t--depends-->\t{} ({})\n",
651                 current.task().name_version(),
652                 current.file_path().display(),
653                 dep.task().name_version(),
654                 dep.file_path().display()
655             ));
656         }
657         ret.push_str("-> End");
658         return ret;
659     }
660 }
661