xref: /DragonReach/src/executor/dep_graph/mod.rs (revision 236b9b4f4d4f527c482cad40d09674195023e5fb)
1 use std::sync::Arc;
2 use std::sync::Mutex;
3 use std::vec::Vec;
4 
5 use crate::manager::UnitManager;
6 use crate::{
7     error::runtime_error::{RuntimeError, RuntimeErrorType},
8     unit::Unit,
9 };
10 
11 pub struct DepGraphNode {
12     value: usize,
13     edges: Vec<usize>,
14     incoming_edges: Vec<usize>,
15 }
16 
17 pub struct DepGraph {
18     nodes: Vec<DepGraphNode>,
19     value: Vec<usize>,
20 }
21 
22 // 提供拓扑排序方法,在启动服务时确定先后顺序
23 impl DepGraph {
24     fn new() -> Self {
25         return DepGraph {
26             nodes: Vec::new(),
27             value: Vec::new(),
28         };
29     }
30 
31     pub fn add_node(&mut self, value: usize) -> usize {
32         let index = self.nodes.len();
33         //如果nodes中已经有了这个value则无需重复添加,直接返回nodes中的value对应的index
34         if let Some(idx) = self.value.iter().position(|x| *x == value) {
35             return idx;
36         }
37         //如果value在nodes中不存在,则添加value
38         self.nodes.push(DepGraphNode {
39             value: value,
40             edges: Vec::new(),
41             incoming_edges: Vec::new(),
42         });
43         self.value.push(value);
44         return index;
45     }
46     pub fn add_edge(&mut self, from: usize, to: usize) {
47         self.nodes[from].edges.push(to);
48         self.nodes[to].incoming_edges.push(from);
49     }
50     pub fn topological_sort(&mut self) -> Result<Vec<usize>, RuntimeError> {
51         let mut result = Vec::new();
52         let mut visited = Vec::new();
53         let mut stack = Vec::new();
54         for (i, node) in self.nodes.iter().enumerate() {
55             if node.incoming_edges.len() == 0 {
56                 stack.push(i);
57             }
58         }
59         while stack.len() > 0 {
60             let index = stack.pop().unwrap();
61             if visited.contains(&index) {
62                 continue;
63             }
64             visited.push(index);
65             result.push(self.nodes[index].value);
66             let len = self.nodes[index].edges.len();
67             for i in 0..len {
68                 let edge = self.nodes[index].edges[i];
69                 self.nodes[edge].incoming_edges.retain(|&x| x != index);
70                 if self.nodes[edge].incoming_edges.len() == 0 {
71                     stack.push(edge);
72                 }
73             }
74         }
75         if result.len() != self.nodes.len() {
76             return Err(RuntimeError::new(RuntimeErrorType::CircularDependency));
77         }
78         result.reverse();
79         return Ok(result);
80     }
81 
82     fn add_edges(&mut self, unit: usize, after: &[usize]) {
83         //因为service的依赖关系规模不会很大,故先使用递归实现
84         //TODO:改递归
85         for target in after {
86             let s = self.add_node(unit);
87             let t = self.add_node(*target);
88             self.add_edge(s, t);
89 
90             let arc_unit = UnitManager::get_unit_with_id(target).unwrap();
91             let unit = arc_unit.lock().unwrap();
92             let after = unit.unit_base().unit_part().after();
93 
94             self.add_edges(*target, after);
95         }
96     }
97 
98     pub fn construct_graph(unit: &Arc<Mutex<dyn Unit>>) -> DepGraph {
99         let mut graph: DepGraph = DepGraph::new();
100 
101         let unit = unit.lock().unwrap();
102         let uid = unit.unit_id();
103         graph.add_node(uid);
104         let after = (&unit).unit_base().unit_part().after();
105         //递归添加边来构建图
106         graph.add_edges(uid, after);
107         return graph;
108     }
109 }
110