xref: /drstd/src/std/sys/itron/condvar.rs (revision 86982c5e9b2eaa583327251616ee822c36288824)
1 //! POSIX conditional variable implementation based on user-space wait queues.
2 use super::{abi, error::expect_success_aborting, spin::SpinMutex, task, time::with_tmos_strong};
3 use crate::std::{mem::replace, ptr::NonNull, sys::locks::Mutex, time::Duration};
4 
5 // The implementation is inspired by the queue-based implementation shown in
6 // Andrew D. Birrell's paper "Implementing Condition Variables with Semaphores"
7 
8 pub struct Condvar {
9     waiters: SpinMutex<waiter_queue::WaiterQueue>,
10 }
11 
12 unsafe impl Send for Condvar {}
13 unsafe impl Sync for Condvar {}
14 
15 impl Condvar {
16     #[inline]
17     pub const fn new() -> Condvar {
18         Condvar {
19             waiters: SpinMutex::new(waiter_queue::WaiterQueue::new()),
20         }
21     }
22 
23     pub fn notify_one(&self) {
24         self.waiters.with_locked(|waiters| {
25             if let Some(task) = waiters.pop_front() {
26                 // Unpark the task
27                 match unsafe { abi::wup_tsk(task) } {
28                     // The task already has a token.
29                     abi::E_QOVR => {}
30                     // Can't undo the effect; abort the program on failure
31                     er => {
32                         expect_success_aborting(er, &"wup_tsk");
33                     }
34                 }
35             }
36         });
37     }
38 
39     pub fn notify_all(&self) {
40         self.waiters.with_locked(|waiters| {
41             while let Some(task) = waiters.pop_front() {
42                 // Unpark the task
43                 match unsafe { abi::wup_tsk(task) } {
44                     // The task already has a token.
45                     abi::E_QOVR => {}
46                     // Can't undo the effect; abort the program on failure
47                     er => {
48                         expect_success_aborting(er, &"wup_tsk");
49                     }
50                 }
51             }
52         });
53     }
54 
55     pub unsafe fn wait(&self, mutex: &Mutex) {
56         // Construct `Waiter`.
57         let mut waiter = waiter_queue::Waiter::new();
58         let waiter = NonNull::from(&mut waiter);
59 
60         self.waiters.with_locked(|waiters| unsafe {
61             waiters.insert(waiter);
62         });
63 
64         unsafe { mutex.unlock() };
65 
66         // Wait until `waiter` is removed from the queue
67         loop {
68             // Park the current task
69             expect_success_aborting(unsafe { abi::slp_tsk() }, &"slp_tsk");
70 
71             if !self
72                 .waiters
73                 .with_locked(|waiters| unsafe { waiters.is_queued(waiter) })
74             {
75                 break;
76             }
77         }
78 
79         mutex.lock();
80     }
81 
82     pub unsafe fn wait_timeout(&self, mutex: &Mutex, dur: Duration) -> bool {
83         // Construct and pin `Waiter`
84         let mut waiter = waiter_queue::Waiter::new();
85         let waiter = NonNull::from(&mut waiter);
86 
87         self.waiters.with_locked(|waiters| unsafe {
88             waiters.insert(waiter);
89         });
90 
91         unsafe { mutex.unlock() };
92 
93         // Park the current task and do not wake up until the timeout elapses
94         // or the task gets woken up by `notify_*`
95         match with_tmos_strong(dur, |tmo| {
96             let er = unsafe { abi::tslp_tsk(tmo) };
97             if er == 0 {
98                 // We were unparked. Are we really dequeued?
99                 if self
100                     .waiters
101                     .with_locked(|waiters| unsafe { waiters.is_queued(waiter) })
102                 {
103                     // No we are not. Continue waiting.
104                     return abi::E_TMOUT;
105                 }
106             }
107             er
108         }) {
109             abi::E_TMOUT => {}
110             er => {
111                 expect_success_aborting(er, &"tslp_tsk");
112             }
113         }
114 
115         // Remove `waiter` from `self.waiters`. If `waiter` is still in
116         // `waiters`, it means we woke up because of a timeout. Otherwise,
117         // we woke up because of `notify_*`.
118         let success = self
119             .waiters
120             .with_locked(|waiters| unsafe { !waiters.remove(waiter) });
121 
122         mutex.lock();
123         success
124     }
125 }
126 
127 mod waiter_queue {
128     use super::*;
129 
130     pub struct WaiterQueue {
131         head: Option<ListHead>,
132     }
133 
134     #[derive(Copy, Clone)]
135     struct ListHead {
136         first: NonNull<Waiter>,
137         last: NonNull<Waiter>,
138     }
139 
140     unsafe impl Send for ListHead {}
141     unsafe impl Sync for ListHead {}
142 
143     pub struct Waiter {
144         // These fields are only accessed through `&[mut] WaiterQueue`.
145         /// The waiting task's ID. Will be zeroed when the task is woken up
146         /// and removed from a queue.
147         task: abi::ID,
148         priority: abi::PRI,
149         prev: Option<NonNull<Waiter>>,
150         next: Option<NonNull<Waiter>>,
151     }
152 
153     unsafe impl Send for Waiter {}
154     unsafe impl Sync for Waiter {}
155 
156     impl Waiter {
157         #[inline]
158         pub fn new() -> Self {
159             let task = task::current_task_id();
160             let priority = task::task_priority(abi::TSK_SELF);
161 
162             // Zeroness of `Waiter::task` indicates whether the `Waiter` is
163             // linked to a queue or not. This invariant is important for
164             // the correctness.
165             debug_assert_ne!(task, 0);
166 
167             Self {
168                 task,
169                 priority,
170                 prev: None,
171                 next: None,
172             }
173         }
174     }
175 
176     impl WaiterQueue {
177         #[inline]
178         pub const fn new() -> Self {
179             Self { head: None }
180         }
181 
182         /// # Safety
183         ///
184         ///  - The caller must own `*waiter_ptr`. The caller will lose the
185         ///    ownership until `*waiter_ptr` is removed from `self`.
186         ///
187         ///  - `*waiter_ptr` must be valid until it's removed from the queue.
188         ///
189         ///  - `*waiter_ptr` must not have been previously inserted to a `WaiterQueue`.
190         ///
191         pub unsafe fn insert(&mut self, mut waiter_ptr: NonNull<Waiter>) {
192             unsafe {
193                 let waiter = waiter_ptr.as_mut();
194 
195                 debug_assert!(waiter.prev.is_none());
196                 debug_assert!(waiter.next.is_none());
197 
198                 if let Some(head) = &mut self.head {
199                     // Find the insertion position and insert `waiter`
200                     let insert_after = {
201                         let mut cursor = head.last;
202                         loop {
203                             if waiter.priority >= cursor.as_ref().priority {
204                                 // `cursor` and all previous waiters have the same or higher
205                                 // priority than `current_task_priority`. Insert the new
206                                 // waiter right after `cursor`.
207                                 break Some(cursor);
208                             }
209                             cursor = if let Some(prev) = cursor.as_ref().prev {
210                                 prev
211                             } else {
212                                 break None;
213                             };
214                         }
215                     };
216 
217                     if let Some(mut insert_after) = insert_after {
218                         // Insert `waiter` after `insert_after`
219                         let insert_before = insert_after.as_ref().next;
220 
221                         waiter.prev = Some(insert_after);
222                         insert_after.as_mut().next = Some(waiter_ptr);
223 
224                         waiter.next = insert_before;
225                         if let Some(mut insert_before) = insert_before {
226                             insert_before.as_mut().prev = Some(waiter_ptr);
227                         } else {
228                             head.last = waiter_ptr;
229                         }
230                     } else {
231                         // Insert `waiter` to the front
232                         waiter.next = Some(head.first);
233                         head.first.as_mut().prev = Some(waiter_ptr);
234                         head.first = waiter_ptr;
235                     }
236                 } else {
237                     // `waiter` is the only element
238                     self.head = Some(ListHead {
239                         first: waiter_ptr,
240                         last: waiter_ptr,
241                     });
242                 }
243             }
244         }
245 
246         /// Given a `Waiter` that was previously inserted to `self`, remove
247         /// it from `self` if it's still there.
248         #[inline]
249         pub unsafe fn remove(&mut self, mut waiter_ptr: NonNull<Waiter>) -> bool {
250             unsafe {
251                 let waiter = waiter_ptr.as_mut();
252                 if waiter.task != 0 {
253                     let head = self.head.as_mut().unwrap();
254 
255                     match (waiter.prev, waiter.next) {
256                         (Some(mut prev), Some(mut next)) => {
257                             prev.as_mut().next = Some(next);
258                             next.as_mut().prev = Some(prev);
259                         }
260                         (None, Some(mut next)) => {
261                             head.first = next;
262                             next.as_mut().prev = None;
263                         }
264                         (Some(mut prev), None) => {
265                             prev.as_mut().next = None;
266                             head.last = prev;
267                         }
268                         (None, None) => {
269                             self.head = None;
270                         }
271                     }
272 
273                     waiter.task = 0;
274 
275                     true
276                 } else {
277                     false
278                 }
279             }
280         }
281 
282         /// Given a `Waiter` that was previously inserted to `self`, return a
283         /// flag indicating whether it's still in `self`.
284         #[inline]
285         pub unsafe fn is_queued(&self, waiter: NonNull<Waiter>) -> bool {
286             unsafe { waiter.as_ref().task != 0 }
287         }
288 
289         #[inline]
290         pub fn pop_front(&mut self) -> Option<abi::ID> {
291             unsafe {
292                 let head = self.head.as_mut()?;
293                 let waiter = head.first.as_mut();
294 
295                 // Get the ID
296                 let id = replace(&mut waiter.task, 0);
297 
298                 // Unlink the waiter
299                 if let Some(mut next) = waiter.next {
300                     head.first = next;
301                     next.as_mut().prev = None;
302                 } else {
303                     self.head = None;
304                 }
305 
306                 Some(id)
307             }
308         }
309     }
310 }
311