1 //! Zero-capacity channel. 2 //! 3 //! This kind of channel is also known as *rendezvous* channel. 4 5 use super::context::Context; 6 use super::error::*; 7 use super::select::{Operation, Selected, Token}; 8 use super::utils::Backoff; 9 use super::waker::Waker; 10 11 use crate::std::cell::UnsafeCell; 12 use crate::std::marker::PhantomData; 13 use crate::std::sync::atomic::{AtomicBool, Ordering}; 14 use crate::std::sync::Mutex; 15 use crate::std::time::Instant; 16 use crate::std::{fmt, ptr}; 17 18 /// A pointer to a packet. 19 pub(crate) struct ZeroToken(*mut ()); 20 21 impl Default for ZeroToken { default() -> Self22 fn default() -> Self { 23 Self(ptr::null_mut()) 24 } 25 } 26 27 impl fmt::Debug for ZeroToken { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result28 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 29 fmt::Debug::fmt(&(self.0 as usize), f) 30 } 31 } 32 33 /// A slot for passing one message from a sender to a receiver. 34 struct Packet<T> { 35 /// Equals `true` if the packet is allocated on the stack. 36 on_stack: bool, 37 38 /// Equals `true` once the packet is ready for reading or writing. 39 ready: AtomicBool, 40 41 /// The message. 42 msg: UnsafeCell<Option<T>>, 43 } 44 45 impl<T> Packet<T> { 46 /// Creates an empty packet on the stack. empty_on_stack() -> Packet<T>47 fn empty_on_stack() -> Packet<T> { 48 Packet { 49 on_stack: true, 50 ready: AtomicBool::new(false), 51 msg: UnsafeCell::new(None), 52 } 53 } 54 55 /// Creates a packet on the stack, containing a message. message_on_stack(msg: T) -> Packet<T>56 fn message_on_stack(msg: T) -> Packet<T> { 57 Packet { 58 on_stack: true, 59 ready: AtomicBool::new(false), 60 msg: UnsafeCell::new(Some(msg)), 61 } 62 } 63 64 /// Waits until the packet becomes ready for reading or writing. wait_ready(&self)65 fn wait_ready(&self) { 66 let backoff = Backoff::new(); 67 while !self.ready.load(Ordering::Acquire) { 68 backoff.spin_heavy(); 69 } 70 } 71 } 72 73 /// Inner representation of a zero-capacity channel. 74 struct Inner { 75 /// Senders waiting to pair up with a receive operation. 76 senders: Waker, 77 78 /// Receivers waiting to pair up with a send operation. 79 receivers: Waker, 80 81 /// Equals `true` when the channel is disconnected. 82 is_disconnected: bool, 83 } 84 85 /// Zero-capacity channel. 86 pub(crate) struct Channel<T> { 87 /// Inner representation of the channel. 88 inner: Mutex<Inner>, 89 90 /// Indicates that dropping a `Channel<T>` may drop values of type `T`. 91 _marker: PhantomData<T>, 92 } 93 94 impl<T> Channel<T> { 95 /// Constructs a new zero-capacity channel. new() -> Self96 pub(crate) fn new() -> Self { 97 Channel { 98 inner: Mutex::new(Inner { 99 senders: Waker::new(), 100 receivers: Waker::new(), 101 is_disconnected: false, 102 }), 103 _marker: PhantomData, 104 } 105 } 106 107 /// Writes a message into the packet. write(&self, token: &mut Token, msg: T) -> Result<(), T>108 pub(crate) unsafe fn write(&self, token: &mut Token, msg: T) -> Result<(), T> { 109 // If there is no packet, the channel is disconnected. 110 if token.zero.0.is_null() { 111 return Err(msg); 112 } 113 114 let packet = &*(token.zero.0 as *const Packet<T>); 115 packet.msg.get().write(Some(msg)); 116 packet.ready.store(true, Ordering::Release); 117 Ok(()) 118 } 119 120 /// Reads a message from the packet. read(&self, token: &mut Token) -> Result<T, ()>121 pub(crate) unsafe fn read(&self, token: &mut Token) -> Result<T, ()> { 122 // If there is no packet, the channel is disconnected. 123 if token.zero.0.is_null() { 124 return Err(()); 125 } 126 127 let packet = &*(token.zero.0 as *const Packet<T>); 128 129 if packet.on_stack { 130 // The message has been in the packet from the beginning, so there is no need to wait 131 // for it. However, after reading the message, we need to set `ready` to `true` in 132 // order to signal that the packet can be destroyed. 133 let msg = packet.msg.get().replace(None).unwrap(); 134 packet.ready.store(true, Ordering::Release); 135 Ok(msg) 136 } else { 137 // Wait until the message becomes available, then read it and destroy the 138 // heap-allocated packet. 139 packet.wait_ready(); 140 let msg = packet.msg.get().replace(None).unwrap(); 141 drop(Box::from_raw(token.zero.0 as *mut Packet<T>)); 142 Ok(msg) 143 } 144 } 145 146 /// Attempts to send a message into the channel. try_send(&self, msg: T) -> Result<(), TrySendError<T>>147 pub(crate) fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> { 148 let token = &mut Token::default(); 149 let mut inner = self.inner.lock().unwrap(); 150 151 // If there's a waiting receiver, pair up with it. 152 if let Some(operation) = inner.receivers.try_select() { 153 token.zero.0 = operation.packet; 154 drop(inner); 155 unsafe { 156 self.write(token, msg).ok().unwrap(); 157 } 158 Ok(()) 159 } else if inner.is_disconnected { 160 Err(TrySendError::Disconnected(msg)) 161 } else { 162 Err(TrySendError::Full(msg)) 163 } 164 } 165 166 /// Sends a message into the channel. send( &self, msg: T, deadline: Option<Instant>, ) -> Result<(), SendTimeoutError<T>>167 pub(crate) fn send( 168 &self, 169 msg: T, 170 deadline: Option<Instant>, 171 ) -> Result<(), SendTimeoutError<T>> { 172 let token = &mut Token::default(); 173 let mut inner = self.inner.lock().unwrap(); 174 175 // If there's a waiting receiver, pair up with it. 176 if let Some(operation) = inner.receivers.try_select() { 177 token.zero.0 = operation.packet; 178 drop(inner); 179 unsafe { 180 self.write(token, msg).ok().unwrap(); 181 } 182 return Ok(()); 183 } 184 185 if inner.is_disconnected { 186 return Err(SendTimeoutError::Disconnected(msg)); 187 } 188 189 Context::with(|cx| { 190 // Prepare for blocking until a receiver wakes us up. 191 let oper = Operation::hook(token); 192 let mut packet = Packet::<T>::message_on_stack(msg); 193 inner 194 .senders 195 .register_with_packet(oper, &mut packet as *mut Packet<T> as *mut (), cx); 196 inner.receivers.notify(); 197 drop(inner); 198 199 // Block the current thread. 200 let sel = cx.wait_until(deadline); 201 202 match sel { 203 Selected::Waiting => unreachable!(), 204 Selected::Aborted => { 205 self.inner.lock().unwrap().senders.unregister(oper).unwrap(); 206 let msg = unsafe { packet.msg.get().replace(None).unwrap() }; 207 Err(SendTimeoutError::Timeout(msg)) 208 } 209 Selected::Disconnected => { 210 self.inner.lock().unwrap().senders.unregister(oper).unwrap(); 211 let msg = unsafe { packet.msg.get().replace(None).unwrap() }; 212 Err(SendTimeoutError::Disconnected(msg)) 213 } 214 Selected::Operation(_) => { 215 // Wait until the message is read, then drop the packet. 216 packet.wait_ready(); 217 Ok(()) 218 } 219 } 220 }) 221 } 222 223 /// Attempts to receive a message without blocking. try_recv(&self) -> Result<T, TryRecvError>224 pub(crate) fn try_recv(&self) -> Result<T, TryRecvError> { 225 let token = &mut Token::default(); 226 let mut inner = self.inner.lock().unwrap(); 227 228 // If there's a waiting sender, pair up with it. 229 if let Some(operation) = inner.senders.try_select() { 230 token.zero.0 = operation.packet; 231 drop(inner); 232 unsafe { self.read(token).map_err(|_| TryRecvError::Disconnected) } 233 } else if inner.is_disconnected { 234 Err(TryRecvError::Disconnected) 235 } else { 236 Err(TryRecvError::Empty) 237 } 238 } 239 240 /// Receives a message from the channel. recv(&self, deadline: Option<Instant>) -> Result<T, RecvTimeoutError>241 pub(crate) fn recv(&self, deadline: Option<Instant>) -> Result<T, RecvTimeoutError> { 242 let token = &mut Token::default(); 243 let mut inner = self.inner.lock().unwrap(); 244 245 // If there's a waiting sender, pair up with it. 246 if let Some(operation) = inner.senders.try_select() { 247 token.zero.0 = operation.packet; 248 drop(inner); 249 unsafe { 250 return self.read(token).map_err(|_| RecvTimeoutError::Disconnected); 251 } 252 } 253 254 if inner.is_disconnected { 255 return Err(RecvTimeoutError::Disconnected); 256 } 257 258 Context::with(|cx| { 259 // Prepare for blocking until a sender wakes us up. 260 let oper = Operation::hook(token); 261 let mut packet = Packet::<T>::empty_on_stack(); 262 inner.receivers.register_with_packet( 263 oper, 264 &mut packet as *mut Packet<T> as *mut (), 265 cx, 266 ); 267 inner.senders.notify(); 268 drop(inner); 269 270 // Block the current thread. 271 let sel = cx.wait_until(deadline); 272 273 match sel { 274 Selected::Waiting => unreachable!(), 275 Selected::Aborted => { 276 self.inner 277 .lock() 278 .unwrap() 279 .receivers 280 .unregister(oper) 281 .unwrap(); 282 Err(RecvTimeoutError::Timeout) 283 } 284 Selected::Disconnected => { 285 self.inner 286 .lock() 287 .unwrap() 288 .receivers 289 .unregister(oper) 290 .unwrap(); 291 Err(RecvTimeoutError::Disconnected) 292 } 293 Selected::Operation(_) => { 294 // Wait until the message is provided, then read it. 295 packet.wait_ready(); 296 unsafe { Ok(packet.msg.get().replace(None).unwrap()) } 297 } 298 } 299 }) 300 } 301 302 /// Disconnects the channel and wakes up all blocked senders and receivers. 303 /// 304 /// Returns `true` if this call disconnected the channel. disconnect(&self) -> bool305 pub(crate) fn disconnect(&self) -> bool { 306 let mut inner = self.inner.lock().unwrap(); 307 308 if !inner.is_disconnected { 309 inner.is_disconnected = true; 310 inner.senders.disconnect(); 311 inner.receivers.disconnect(); 312 true 313 } else { 314 false 315 } 316 } 317 318 /// Returns the current number of messages inside the channel. len(&self) -> usize319 pub(crate) fn len(&self) -> usize { 320 0 321 } 322 323 /// Returns the capacity of the channel. 324 #[allow(clippy::unnecessary_wraps)] // This is intentional. capacity(&self) -> Option<usize>325 pub(crate) fn capacity(&self) -> Option<usize> { 326 Some(0) 327 } 328 329 /// Returns `true` if the channel is empty. is_empty(&self) -> bool330 pub(crate) fn is_empty(&self) -> bool { 331 true 332 } 333 334 /// Returns `true` if the channel is full. is_full(&self) -> bool335 pub(crate) fn is_full(&self) -> bool { 336 true 337 } 338 } 339