xref: /DADK/src/scheduler/mod.rs (revision 820df76286f71ecfa36e75bd5f934ca78da16cb8)
1 use std::{
2     collections::{BTreeMap, HashMap},
3     fmt::Debug,
4     path::PathBuf,
5     process::exit,
6     sync::{
7         atomic::{AtomicI32, Ordering},
8         Arc, Mutex, RwLock,
9     },
10     thread::ThreadId,
11 };
12 
13 use log::{error, info};
14 
15 use crate::{
16     console::Action,
17     executor::{target::Target, Executor},
18     parser::task::DADKTask,
19 };
20 
21 use self::task_deque::TASK_DEQUE;
22 
23 pub mod task_deque;
24 
25 lazy_static! {
26     // 线程id与任务实体id映射表
27     pub static ref TID_EID: Mutex<HashMap<ThreadId,i32>> = Mutex::new(HashMap::new());
28 }
29 
30 /// # 调度实体内部结构
31 #[derive(Debug, Clone)]
32 pub struct InnerEntity {
33     /// 任务ID
34     id: i32,
35     file_path: PathBuf,
36     /// 任务
37     task: DADKTask,
38     /// 入度
39     indegree: usize,
40     /// 子节点
41     children: Vec<Arc<SchedEntity>>,
42     /// target管理
43     target: Option<Target>,
44 }
45 
46 /// # 调度实体
47 #[derive(Debug)]
48 pub struct SchedEntity {
49     inner: Mutex<InnerEntity>,
50 }
51 
52 impl PartialEq for SchedEntity {
53     fn eq(&self, other: &Self) -> bool {
54         self.inner.lock().unwrap().id == other.inner.lock().unwrap().id
55     }
56 }
57 
58 impl SchedEntity {
59     #[allow(dead_code)]
60     pub fn id(&self) -> i32 {
61         self.inner.lock().unwrap().id
62     }
63 
64     #[allow(dead_code)]
65     pub fn file_path(&self) -> PathBuf {
66         self.inner.lock().unwrap().file_path.clone()
67     }
68 
69     #[allow(dead_code)]
70     pub fn task(&self) -> DADKTask {
71         self.inner.lock().unwrap().task.clone()
72     }
73 
74     /// 入度加1
75     pub fn add_indegree(&self) {
76         self.inner.lock().unwrap().indegree += 1;
77     }
78 
79     /// 入度减1
80     pub fn sub_indegree(&self) -> usize {
81         self.inner.lock().unwrap().indegree -= 1;
82         return self.inner.lock().unwrap().indegree;
83     }
84 
85     /// 增加子节点
86     pub fn add_child(&self, entity: Arc<SchedEntity>) {
87         self.inner.lock().unwrap().children.push(entity);
88     }
89 
90     /// 获取入度
91     pub fn indegree(&self) -> usize {
92         self.inner.lock().unwrap().indegree
93     }
94 
95     /// 获取target
96     pub fn target(&self) -> Option<Target> {
97         self.inner.lock().unwrap().target.clone()
98     }
99 
100     /// 当前任务完成后,所有子节点入度减1
101     ///
102     /// ## 参数
103     ///
104     /// 无
105     ///
106     /// ## 返回值
107     ///
108     /// 所有入度为0的子节点集合
109     pub fn sub_children_indegree(&self) -> Vec<Arc<SchedEntity>> {
110         let mut zero_child = Vec::new();
111         let children = &self.inner.lock().unwrap().children;
112         for child in children.iter() {
113             if child.sub_indegree() == 0 {
114                 zero_child.push(child.clone());
115             }
116         }
117         return zero_child;
118     }
119 }
120 
121 /// # 调度实体列表
122 ///
123 /// 用于存储所有的调度实体
124 #[derive(Debug)]
125 pub struct SchedEntities {
126     /// 任务ID到调度实体的映射
127     id2entity: RwLock<BTreeMap<i32, Arc<SchedEntity>>>,
128 }
129 
130 impl SchedEntities {
131     pub fn new() -> Self {
132         Self {
133             id2entity: RwLock::new(BTreeMap::new()),
134         }
135     }
136 
137     pub fn add(&mut self, entity: Arc<SchedEntity>) {
138         self.id2entity
139             .write()
140             .unwrap()
141             .insert(entity.id(), entity.clone());
142     }
143 
144     #[allow(dead_code)]
145     pub fn get(&self, id: i32) -> Option<Arc<SchedEntity>> {
146         self.id2entity.read().unwrap().get(&id).cloned()
147     }
148 
149     pub fn get_by_name_version(&self, name: &str, version: &str) -> Option<Arc<SchedEntity>> {
150         for e in self.id2entity.read().unwrap().iter() {
151             if e.1.task().name_version_env() == DADKTask::name_version_uppercase(name, version) {
152                 return Some(e.1.clone());
153             }
154         }
155         return None;
156     }
157 
158     pub fn entities(&self) -> Vec<Arc<SchedEntity>> {
159         let mut v = Vec::new();
160         for e in self.id2entity.read().unwrap().iter() {
161             v.push(e.1.clone());
162         }
163         return v;
164     }
165 
166     pub fn id2entity(&self) -> BTreeMap<i32, Arc<SchedEntity>> {
167         self.id2entity.read().unwrap().clone()
168     }
169 
170     #[allow(dead_code)]
171     pub fn len(&self) -> usize {
172         self.id2entity.read().unwrap().len()
173     }
174 
175     #[allow(dead_code)]
176     pub fn is_empty(&self) -> bool {
177         self.id2entity.read().unwrap().is_empty()
178     }
179 
180     #[allow(dead_code)]
181     pub fn clear(&mut self) {
182         self.id2entity.write().unwrap().clear();
183     }
184 
185     pub fn topo_sort(&self) -> Vec<Arc<SchedEntity>> {
186         let mut result = Vec::new();
187         let mut visited = BTreeMap::new();
188         let btree = self.id2entity.write().unwrap().clone();
189         for entity in btree.iter() {
190             if !visited.contains_key(entity.0) {
191                 let r = self.dfs(entity.1, &mut visited, &mut result);
192                 if r.is_err() {
193                     let err = r.unwrap_err();
194                     error!("{}", err.display());
195                     println!("Please fix the errors above and try again.");
196                     std::process::exit(1);
197                 }
198             }
199         }
200         return result;
201     }
202 
203     fn dfs(
204         &self,
205         entity: &Arc<SchedEntity>,
206         visited: &mut BTreeMap<i32, bool>,
207         result: &mut Vec<Arc<SchedEntity>>,
208     ) -> Result<(), DependencyCycleError> {
209         visited.insert(entity.id(), false);
210         for dep in entity.task().depends.iter() {
211             if let Some(dep_entity) = self.get_by_name_version(&dep.name, &dep.version) {
212                 let guard = self.id2entity.write().unwrap();
213                 let e = guard.get(&entity.id()).unwrap();
214                 let d = guard.get(&dep_entity.id()).unwrap();
215                 e.add_indegree();
216                 d.add_child(e.clone());
217                 if let Some(&false) = visited.get(&dep_entity.id()) {
218                     // 输出完整环形依赖
219                     let mut err = DependencyCycleError::new(dep_entity.clone());
220 
221                     err.add(entity.clone(), dep_entity);
222                     return Err(err);
223                 }
224                 if !visited.contains_key(&dep_entity.id()) {
225                     drop(guard);
226                     let r = self.dfs(&dep_entity, visited, result);
227                     if r.is_err() {
228                         let mut err: DependencyCycleError = r.unwrap_err();
229                         // 如果错误已经停止传播,则直接返回
230                         if err.stop_propagation {
231                             return Err(err);
232                         }
233                         // 如果当前实体是错误的起始实体,则停止传播
234                         if entity == &err.head_entity {
235                             err.stop_propagation();
236                         }
237                         err.add(entity.clone(), dep_entity);
238                         return Err(err);
239                     }
240                 }
241             } else {
242                 error!(
243                     "Dependency not found: {} -> {}",
244                     entity.task().name_version(),
245                     dep.name_version()
246                 );
247                 std::process::exit(1);
248             }
249         }
250         visited.insert(entity.id(), true);
251         result.push(entity.clone());
252         return Ok(());
253     }
254 }
255 
256 /// # 任务调度器
257 #[derive(Debug)]
258 pub struct Scheduler {
259     /// DragonOS sysroot在主机上的路径
260     dragonos_dir: PathBuf,
261     /// 要执行的操作
262     action: Action,
263     /// 调度实体列表
264     target: SchedEntities,
265 }
266 
267 pub enum SchedulerError {
268     TaskError(String),
269     DependencyNotFound(Arc<SchedEntity>, String),
270     RunError(String),
271 }
272 
273 impl Debug for SchedulerError {
274     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
275         match self {
276             Self::TaskError(arg0) => {
277                 write!(f, "TaskError: {}", arg0)
278             }
279             SchedulerError::DependencyNotFound(current, msg) => {
280                 write!(
281                     f,
282                     "For task {}, dependency not found: {}. Please check file: {}",
283                     current.task().name_version(),
284                     msg,
285                     current.file_path().display()
286                 )
287             }
288             SchedulerError::RunError(msg) => {
289                 write!(f, "RunError: {}", msg)
290             }
291         }
292     }
293 }
294 
295 impl Scheduler {
296     pub fn new(
297         dragonos_dir: PathBuf,
298         action: Action,
299         tasks: Vec<(PathBuf, DADKTask)>,
300     ) -> Result<Self, SchedulerError> {
301         let entities = SchedEntities::new();
302 
303         let mut scheduler = Scheduler {
304             dragonos_dir,
305             action,
306             target: entities,
307         };
308 
309         let r = scheduler.add_tasks(tasks);
310         if r.is_err() {
311             error!("Error while adding tasks: {:?}", r);
312             return Err(r.err().unwrap());
313         }
314 
315         return Ok(scheduler);
316     }
317 
318     /// # 添加多个任务
319     ///
320     /// 添加任务到调度器中,如果任务已经存在,则返回错误
321     pub fn add_tasks(&mut self, tasks: Vec<(PathBuf, DADKTask)>) -> Result<(), SchedulerError> {
322         for task in tasks {
323             self.add_task(task.0, task.1)?;
324         }
325 
326         return Ok(());
327     }
328 
329     /// # 添加一个任务
330     ///
331     /// 添加任务到调度器中,如果任务已经存在,则返回错误
332     pub fn add_task(
333         &mut self,
334         path: PathBuf,
335         task: DADKTask,
336     ) -> Result<Arc<SchedEntity>, SchedulerError> {
337         let id: i32 = self.generate_task_id();
338         let indegree: usize = 0;
339         let children = Vec::new();
340         let target = self.generate_task_target(&path, &task.rust_target)?;
341         let entity = Arc::new(SchedEntity {
342             inner: Mutex::new(InnerEntity {
343                 id,
344                 task,
345                 file_path: path.clone(),
346                 indegree,
347                 children,
348                 target,
349             }),
350         });
351         let name_version = (entity.task().name.clone(), entity.task().version.clone());
352 
353         if self
354             .target
355             .get_by_name_version(&name_version.0, &name_version.1)
356             .is_some()
357         {
358             return Err(SchedulerError::TaskError(format!(
359                 "Task with name [{}] and version [{}] already exists. Config file: {}",
360                 name_version.0,
361                 name_version.1,
362                 path.display()
363             )));
364         }
365 
366         self.target.add(entity.clone());
367 
368         info!("Task added: {}", entity.task().name_version());
369         return Ok(entity);
370     }
371 
372     fn generate_task_id(&self) -> i32 {
373         static TASK_ID: AtomicI32 = AtomicI32::new(0);
374         return TASK_ID.fetch_add(1, Ordering::SeqCst);
375     }
376 
377     fn generate_task_target(
378         &self,
379         path: &PathBuf,
380         rust_target: &Option<String>,
381     ) -> Result<Option<Target>, SchedulerError> {
382         if let Some(rust_target) = rust_target {
383             // 如果rust_target字段不为none,说明需要target管理
384             // 获取dadk任务路径,用于生成临时dadk文件名
385             let file_str = path.as_path().to_str().unwrap();
386             let tmp_dadk_path = Target::tmp_dadk(file_str);
387             let tmp_dadk_str = tmp_dadk_path.as_path().to_str().unwrap();
388 
389             if Target::is_user_target(rust_target) {
390                 // 如果target文件是用户自己的
391                 if let Ok(target_path) = Target::user_target_path(rust_target) {
392                     let target_path_str = target_path.as_path().to_str().unwrap();
393                     let index = target_path_str.rfind('/').unwrap();
394                     let target_name = target_path_str[index + 1..].to_string();
395                     let tmp_target = PathBuf::from(format!("{}{}", tmp_dadk_str, target_name));
396                     return Ok(Some(Target::new(tmp_target)));
397                 } else {
398                     return Err(SchedulerError::TaskError(
399                         "The path of target file is invalid.".to_string(),
400                     ));
401                 }
402             } else {
403                 // 如果target文件是内置的
404                 let tmp_target = PathBuf::from(format!("{}{}.json", tmp_dadk_str, rust_target));
405                 return Ok(Some(Target::new(tmp_target)));
406             }
407         }
408         return Ok(None);
409     }
410 
411     /// # 执行调度器中的所有任务
412     pub fn run(&self) -> Result<(), SchedulerError> {
413         // 准备全局环境变量
414         crate::executor::prepare_env(&self.target)
415             .map_err(|e| SchedulerError::RunError(format!("{:?}", e)))?;
416 
417         match self.action {
418             Action::Build | Action::Install => {
419                 self.run_with_topo_sort()?;
420             }
421             Action::Clean(_) => self.run_without_topo_sort()?,
422             _ => unimplemented!(),
423         }
424 
425         return Ok(());
426     }
427 
428     /// Action需要按照拓扑序执行
429     ///
430     /// Action::Build | Action::Install
431     fn run_with_topo_sort(&self) -> Result<(), SchedulerError> {
432         // 检查是否有不存在的依赖
433         let r = self.check_not_exists_dependency();
434         if r.is_err() {
435             error!("Error while checking tasks: {:?}", r);
436             return r;
437         }
438 
439         // 对调度实体进行拓扑排序
440         let r: Vec<Arc<SchedEntity>> = self.target.topo_sort();
441 
442         let action = self.action.clone();
443         let dragonos_dir = self.dragonos_dir.clone();
444         let id2entity = self.target.id2entity();
445         let count = r.len();
446 
447         // 启动守护线程
448         let handler = std::thread::spawn(move || {
449             Self::build_install_daemon(action, dragonos_dir, id2entity, count, &r)
450         });
451 
452         handler.join().expect("Could not join deamon");
453 
454         return Ok(());
455     }
456 
457     /// Action不需要按照拓扑序执行
458     fn run_without_topo_sort(&self) -> Result<(), SchedulerError> {
459         // 启动守护线程
460         let action = self.action.clone();
461         let dragonos_dir = self.dragonos_dir.clone();
462         let mut r = self.target.entities();
463         let handler = std::thread::spawn(move || {
464             Self::clean_daemon(action, dragonos_dir, &mut r);
465         });
466 
467         handler.join().expect("Could not join deamon");
468         return Ok(());
469     }
470 
471     pub fn execute(action: Action, dragonos_dir: PathBuf, entity: Arc<SchedEntity>) {
472         let mut executor = Executor::new(entity.clone(), action.clone(), dragonos_dir.clone())
473             .map_err(|e| {
474                 error!(
475                     "Error while creating executor for task {} : {:?}",
476                     entity.task().name_version(),
477                     e
478                 );
479                 exit(-1);
480             })
481             .unwrap();
482 
483         executor
484             .execute()
485             .map_err(|e| {
486                 error!(
487                     "Error while executing task {} : {:?}",
488                     entity.task().name_version(),
489                     e
490                 );
491                 exit(-1);
492             })
493             .unwrap();
494     }
495 
496     /// 构建和安装DADK任务的守护线程
497     ///
498     /// ## 参数
499     ///
500     /// - `action` : 要执行的操作
501     /// - `dragonos_dir` : DragonOS sysroot在主机上的路径
502     /// - `id2entity` : DADK任务id与实体映射表
503     /// - `count` : 当前剩余任务数
504     /// - `r` : 总任务实体表
505     ///
506     /// ## 返回值
507     ///
508     /// 无
509     pub fn build_install_daemon(
510         action: Action,
511         dragonos_dir: PathBuf,
512         id2entity: BTreeMap<i32, Arc<SchedEntity>>,
513         mut count: usize,
514         r: &Vec<Arc<SchedEntity>>,
515     ) {
516         let mut guard = TASK_DEQUE.lock().unwrap();
517         // 初始化0入度的任务实体
518         let mut zero_entity: Vec<Arc<SchedEntity>> = Vec::new();
519         for e in r.iter() {
520             if e.indegree() == 0 {
521                 zero_entity.push(e.clone());
522             }
523         }
524 
525         while count > 0 {
526             // 将入度为0的任务实体加入任务队列中,直至没有入度为0的任务实体 或 任务队列满了
527             while !zero_entity.is_empty()
528                 && guard.build_install_task(
529                     action.clone(),
530                     dragonos_dir.clone(),
531                     zero_entity.last().unwrap().clone(),
532                 )
533             {
534                 zero_entity.pop();
535             }
536 
537             let queue = guard.queue_mut();
538             // 如果任务线程已完成,将其从任务队列中删除,并把它的子节点入度减1,如果有0入度子节点,则加入zero_entity,后续可以加入任务队列中
539             queue.retain(|x| {
540                 if x.is_finished() {
541                     count -= 1;
542                     let tid = x.thread().id();
543                     let eid = *TID_EID.lock().unwrap().get(&tid).unwrap();
544                     let entity = id2entity.get(&eid).unwrap();
545                     let zero = entity.sub_children_indegree();
546                     for e in zero.iter() {
547                         zero_entity.push(e.clone());
548                     }
549                     return false;
550                 }
551                 return true;
552             })
553         }
554     }
555 
556     /// 清理DADK任务的守护线程
557     ///
558     /// ## 参数
559     ///
560     /// - `action` : 要执行的操作
561     /// - `dragonos_dir` : DragonOS sysroot在主机上的路径
562     /// - `r` : 总任务实体表
563     ///
564     /// ## 返回值
565     ///
566     /// 无
567     pub fn clean_daemon(action: Action, dragonos_dir: PathBuf, r: &mut Vec<Arc<SchedEntity>>) {
568         let mut guard = TASK_DEQUE.lock().unwrap();
569         while !guard.queue().is_empty() && !r.is_empty() {
570             guard.clean_task(action, dragonos_dir.clone(), r.pop().unwrap().clone());
571         }
572     }
573 
574     /// # 检查是否有不存在的依赖
575     ///
576     /// 如果某个任务的dependency中的任务不存在,则返回错误
577     fn check_not_exists_dependency(&self) -> Result<(), SchedulerError> {
578         for entity in self.target.entities().iter() {
579             for dependency in entity.task().depends.iter() {
580                 let name_version = (dependency.name.clone(), dependency.version.clone());
581                 if !self
582                     .target
583                     .get_by_name_version(&name_version.0, &name_version.1)
584                     .is_some()
585                 {
586                     return Err(SchedulerError::DependencyNotFound(
587                         entity.clone(),
588                         format!("name:{}, version:{}", name_version.0, name_version.1,),
589                     ));
590                 }
591             }
592         }
593 
594         return Ok(());
595     }
596 }
597 
598 /// # 环形依赖错误路径
599 ///
600 /// 本结构体用于在回溯过程中记录环形依赖的路径。
601 ///
602 /// 例如,假设有如下依赖关系:
603 ///
604 /// ```text
605 /// A -> B -> C -> D -> A
606 /// ```
607 ///
608 /// 则在DFS回溯过程中,会依次记录如下路径:
609 ///
610 /// ```text
611 /// D -> A
612 /// C -> D
613 /// B -> C
614 /// A -> B
615 pub struct DependencyCycleError {
616     /// # 起始实体
617     /// 本错误的起始实体,即环形依赖的起点
618     head_entity: Arc<SchedEntity>,
619     /// 是否停止传播
620     stop_propagation: bool,
621     /// 依赖关系
622     dependencies: Vec<(Arc<SchedEntity>, Arc<SchedEntity>)>,
623 }
624 
625 impl DependencyCycleError {
626     pub fn new(head_entity: Arc<SchedEntity>) -> Self {
627         Self {
628             head_entity,
629             stop_propagation: false,
630             dependencies: Vec::new(),
631         }
632     }
633 
634     pub fn add(&mut self, current: Arc<SchedEntity>, dependency: Arc<SchedEntity>) {
635         self.dependencies.push((current, dependency));
636     }
637 
638     pub fn stop_propagation(&mut self) {
639         self.stop_propagation = true;
640     }
641 
642     #[allow(dead_code)]
643     pub fn dependencies(&self) -> &Vec<(Arc<SchedEntity>, Arc<SchedEntity>)> {
644         &self.dependencies
645     }
646 
647     pub fn display(&self) -> String {
648         let mut tmp = self.dependencies.clone();
649         tmp.reverse();
650 
651         let mut ret = format!("Dependency cycle detected: \nStart ->\n");
652         for (current, dep) in tmp.iter() {
653             ret.push_str(&format!(
654                 "->\t{} ({})\t--depends-->\t{} ({})\n",
655                 current.task().name_version(),
656                 current.file_path().display(),
657                 dep.task().name_version(),
658                 dep.file_path().display()
659             ));
660         }
661         ret.push_str("-> End");
662         return ret;
663     }
664 }
665