1 //! POSIX conditional variable implementation based on user-space wait queues. 2 use super::{abi, error::expect_success_aborting, spin::SpinMutex, task, time::with_tmos_strong}; 3 use crate::std::{mem::replace, ptr::NonNull, sys::locks::Mutex, time::Duration}; 4 5 // The implementation is inspired by the queue-based implementation shown in 6 // Andrew D. Birrell's paper "Implementing Condition Variables with Semaphores" 7 8 pub struct Condvar { 9 waiters: SpinMutex<waiter_queue::WaiterQueue>, 10 } 11 12 unsafe impl Send for Condvar {} 13 unsafe impl Sync for Condvar {} 14 15 impl Condvar { 16 #[inline] 17 pub const fn new() -> Condvar { 18 Condvar { 19 waiters: SpinMutex::new(waiter_queue::WaiterQueue::new()), 20 } 21 } 22 23 pub fn notify_one(&self) { 24 self.waiters.with_locked(|waiters| { 25 if let Some(task) = waiters.pop_front() { 26 // Unpark the task 27 match unsafe { abi::wup_tsk(task) } { 28 // The task already has a token. 29 abi::E_QOVR => {} 30 // Can't undo the effect; abort the program on failure 31 er => { 32 expect_success_aborting(er, &"wup_tsk"); 33 } 34 } 35 } 36 }); 37 } 38 39 pub fn notify_all(&self) { 40 self.waiters.with_locked(|waiters| { 41 while let Some(task) = waiters.pop_front() { 42 // Unpark the task 43 match unsafe { abi::wup_tsk(task) } { 44 // The task already has a token. 45 abi::E_QOVR => {} 46 // Can't undo the effect; abort the program on failure 47 er => { 48 expect_success_aborting(er, &"wup_tsk"); 49 } 50 } 51 } 52 }); 53 } 54 55 pub unsafe fn wait(&self, mutex: &Mutex) { 56 // Construct `Waiter`. 57 let mut waiter = waiter_queue::Waiter::new(); 58 let waiter = NonNull::from(&mut waiter); 59 60 self.waiters.with_locked(|waiters| unsafe { 61 waiters.insert(waiter); 62 }); 63 64 unsafe { mutex.unlock() }; 65 66 // Wait until `waiter` is removed from the queue 67 loop { 68 // Park the current task 69 expect_success_aborting(unsafe { abi::slp_tsk() }, &"slp_tsk"); 70 71 if !self 72 .waiters 73 .with_locked(|waiters| unsafe { waiters.is_queued(waiter) }) 74 { 75 break; 76 } 77 } 78 79 mutex.lock(); 80 } 81 82 pub unsafe fn wait_timeout(&self, mutex: &Mutex, dur: Duration) -> bool { 83 // Construct and pin `Waiter` 84 let mut waiter = waiter_queue::Waiter::new(); 85 let waiter = NonNull::from(&mut waiter); 86 87 self.waiters.with_locked(|waiters| unsafe { 88 waiters.insert(waiter); 89 }); 90 91 unsafe { mutex.unlock() }; 92 93 // Park the current task and do not wake up until the timeout elapses 94 // or the task gets woken up by `notify_*` 95 match with_tmos_strong(dur, |tmo| { 96 let er = unsafe { abi::tslp_tsk(tmo) }; 97 if er == 0 { 98 // We were unparked. Are we really dequeued? 99 if self 100 .waiters 101 .with_locked(|waiters| unsafe { waiters.is_queued(waiter) }) 102 { 103 // No we are not. Continue waiting. 104 return abi::E_TMOUT; 105 } 106 } 107 er 108 }) { 109 abi::E_TMOUT => {} 110 er => { 111 expect_success_aborting(er, &"tslp_tsk"); 112 } 113 } 114 115 // Remove `waiter` from `self.waiters`. If `waiter` is still in 116 // `waiters`, it means we woke up because of a timeout. Otherwise, 117 // we woke up because of `notify_*`. 118 let success = self 119 .waiters 120 .with_locked(|waiters| unsafe { !waiters.remove(waiter) }); 121 122 mutex.lock(); 123 success 124 } 125 } 126 127 mod waiter_queue { 128 use super::*; 129 130 pub struct WaiterQueue { 131 head: Option<ListHead>, 132 } 133 134 #[derive(Copy, Clone)] 135 struct ListHead { 136 first: NonNull<Waiter>, 137 last: NonNull<Waiter>, 138 } 139 140 unsafe impl Send for ListHead {} 141 unsafe impl Sync for ListHead {} 142 143 pub struct Waiter { 144 // These fields are only accessed through `&[mut] WaiterQueue`. 145 /// The waiting task's ID. Will be zeroed when the task is woken up 146 /// and removed from a queue. 147 task: abi::ID, 148 priority: abi::PRI, 149 prev: Option<NonNull<Waiter>>, 150 next: Option<NonNull<Waiter>>, 151 } 152 153 unsafe impl Send for Waiter {} 154 unsafe impl Sync for Waiter {} 155 156 impl Waiter { 157 #[inline] 158 pub fn new() -> Self { 159 let task = task::current_task_id(); 160 let priority = task::task_priority(abi::TSK_SELF); 161 162 // Zeroness of `Waiter::task` indicates whether the `Waiter` is 163 // linked to a queue or not. This invariant is important for 164 // the correctness. 165 debug_assert_ne!(task, 0); 166 167 Self { 168 task, 169 priority, 170 prev: None, 171 next: None, 172 } 173 } 174 } 175 176 impl WaiterQueue { 177 #[inline] 178 pub const fn new() -> Self { 179 Self { head: None } 180 } 181 182 /// # Safety 183 /// 184 /// - The caller must own `*waiter_ptr`. The caller will lose the 185 /// ownership until `*waiter_ptr` is removed from `self`. 186 /// 187 /// - `*waiter_ptr` must be valid until it's removed from the queue. 188 /// 189 /// - `*waiter_ptr` must not have been previously inserted to a `WaiterQueue`. 190 /// 191 pub unsafe fn insert(&mut self, mut waiter_ptr: NonNull<Waiter>) { 192 unsafe { 193 let waiter = waiter_ptr.as_mut(); 194 195 debug_assert!(waiter.prev.is_none()); 196 debug_assert!(waiter.next.is_none()); 197 198 if let Some(head) = &mut self.head { 199 // Find the insertion position and insert `waiter` 200 let insert_after = { 201 let mut cursor = head.last; 202 loop { 203 if waiter.priority >= cursor.as_ref().priority { 204 // `cursor` and all previous waiters have the same or higher 205 // priority than `current_task_priority`. Insert the new 206 // waiter right after `cursor`. 207 break Some(cursor); 208 } 209 cursor = if let Some(prev) = cursor.as_ref().prev { 210 prev 211 } else { 212 break None; 213 }; 214 } 215 }; 216 217 if let Some(mut insert_after) = insert_after { 218 // Insert `waiter` after `insert_after` 219 let insert_before = insert_after.as_ref().next; 220 221 waiter.prev = Some(insert_after); 222 insert_after.as_mut().next = Some(waiter_ptr); 223 224 waiter.next = insert_before; 225 if let Some(mut insert_before) = insert_before { 226 insert_before.as_mut().prev = Some(waiter_ptr); 227 } else { 228 head.last = waiter_ptr; 229 } 230 } else { 231 // Insert `waiter` to the front 232 waiter.next = Some(head.first); 233 head.first.as_mut().prev = Some(waiter_ptr); 234 head.first = waiter_ptr; 235 } 236 } else { 237 // `waiter` is the only element 238 self.head = Some(ListHead { 239 first: waiter_ptr, 240 last: waiter_ptr, 241 }); 242 } 243 } 244 } 245 246 /// Given a `Waiter` that was previously inserted to `self`, remove 247 /// it from `self` if it's still there. 248 #[inline] 249 pub unsafe fn remove(&mut self, mut waiter_ptr: NonNull<Waiter>) -> bool { 250 unsafe { 251 let waiter = waiter_ptr.as_mut(); 252 if waiter.task != 0 { 253 let head = self.head.as_mut().unwrap(); 254 255 match (waiter.prev, waiter.next) { 256 (Some(mut prev), Some(mut next)) => { 257 prev.as_mut().next = Some(next); 258 next.as_mut().prev = Some(prev); 259 } 260 (None, Some(mut next)) => { 261 head.first = next; 262 next.as_mut().prev = None; 263 } 264 (Some(mut prev), None) => { 265 prev.as_mut().next = None; 266 head.last = prev; 267 } 268 (None, None) => { 269 self.head = None; 270 } 271 } 272 273 waiter.task = 0; 274 275 true 276 } else { 277 false 278 } 279 } 280 } 281 282 /// Given a `Waiter` that was previously inserted to `self`, return a 283 /// flag indicating whether it's still in `self`. 284 #[inline] 285 pub unsafe fn is_queued(&self, waiter: NonNull<Waiter>) -> bool { 286 unsafe { waiter.as_ref().task != 0 } 287 } 288 289 #[inline] 290 pub fn pop_front(&mut self) -> Option<abi::ID> { 291 unsafe { 292 let head = self.head.as_mut()?; 293 let waiter = head.first.as_mut(); 294 295 // Get the ID 296 let id = replace(&mut waiter.task, 0); 297 298 // Unlink the waiter 299 if let Some(mut next) = waiter.next { 300 head.first = next; 301 next.as_mut().prev = None; 302 } else { 303 self.head = None; 304 } 305 306 Some(id) 307 } 308 } 309 } 310 } 311