xref: /drstd/src/std/sync/mpmc/zero.rs (revision 9670759b785600bf6315e4173e46a602f16add7a)
1 //! Zero-capacity channel.
2 //!
3 //! This kind of channel is also known as *rendezvous* channel.
4 
5 use super::context::Context;
6 use super::error::*;
7 use super::select::{Operation, Selected, Token};
8 use super::utils::Backoff;
9 use super::waker::Waker;
10 
11 use crate::std::cell::UnsafeCell;
12 use crate::std::marker::PhantomData;
13 use crate::std::sync::atomic::{AtomicBool, Ordering};
14 use crate::std::sync::Mutex;
15 use crate::std::time::Instant;
16 use crate::std::{fmt, ptr};
17 
18 /// A pointer to a packet.
19 pub(crate) struct ZeroToken(*mut ());
20 
21 impl Default for ZeroToken {
default() -> Self22     fn default() -> Self {
23         Self(ptr::null_mut())
24     }
25 }
26 
27 impl fmt::Debug for ZeroToken {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result28     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
29         fmt::Debug::fmt(&(self.0 as usize), f)
30     }
31 }
32 
33 /// A slot for passing one message from a sender to a receiver.
34 struct Packet<T> {
35     /// Equals `true` if the packet is allocated on the stack.
36     on_stack: bool,
37 
38     /// Equals `true` once the packet is ready for reading or writing.
39     ready: AtomicBool,
40 
41     /// The message.
42     msg: UnsafeCell<Option<T>>,
43 }
44 
45 impl<T> Packet<T> {
46     /// Creates an empty packet on the stack.
empty_on_stack() -> Packet<T>47     fn empty_on_stack() -> Packet<T> {
48         Packet {
49             on_stack: true,
50             ready: AtomicBool::new(false),
51             msg: UnsafeCell::new(None),
52         }
53     }
54 
55     /// Creates a packet on the stack, containing a message.
message_on_stack(msg: T) -> Packet<T>56     fn message_on_stack(msg: T) -> Packet<T> {
57         Packet {
58             on_stack: true,
59             ready: AtomicBool::new(false),
60             msg: UnsafeCell::new(Some(msg)),
61         }
62     }
63 
64     /// Waits until the packet becomes ready for reading or writing.
wait_ready(&self)65     fn wait_ready(&self) {
66         let backoff = Backoff::new();
67         while !self.ready.load(Ordering::Acquire) {
68             backoff.spin_heavy();
69         }
70     }
71 }
72 
73 /// Inner representation of a zero-capacity channel.
74 struct Inner {
75     /// Senders waiting to pair up with a receive operation.
76     senders: Waker,
77 
78     /// Receivers waiting to pair up with a send operation.
79     receivers: Waker,
80 
81     /// Equals `true` when the channel is disconnected.
82     is_disconnected: bool,
83 }
84 
85 /// Zero-capacity channel.
86 pub(crate) struct Channel<T> {
87     /// Inner representation of the channel.
88     inner: Mutex<Inner>,
89 
90     /// Indicates that dropping a `Channel<T>` may drop values of type `T`.
91     _marker: PhantomData<T>,
92 }
93 
94 impl<T> Channel<T> {
95     /// Constructs a new zero-capacity channel.
new() -> Self96     pub(crate) fn new() -> Self {
97         Channel {
98             inner: Mutex::new(Inner {
99                 senders: Waker::new(),
100                 receivers: Waker::new(),
101                 is_disconnected: false,
102             }),
103             _marker: PhantomData,
104         }
105     }
106 
107     /// Writes a message into the packet.
write(&self, token: &mut Token, msg: T) -> Result<(), T>108     pub(crate) unsafe fn write(&self, token: &mut Token, msg: T) -> Result<(), T> {
109         // If there is no packet, the channel is disconnected.
110         if token.zero.0.is_null() {
111             return Err(msg);
112         }
113 
114         let packet = &*(token.zero.0 as *const Packet<T>);
115         packet.msg.get().write(Some(msg));
116         packet.ready.store(true, Ordering::Release);
117         Ok(())
118     }
119 
120     /// Reads a message from the packet.
read(&self, token: &mut Token) -> Result<T, ()>121     pub(crate) unsafe fn read(&self, token: &mut Token) -> Result<T, ()> {
122         // If there is no packet, the channel is disconnected.
123         if token.zero.0.is_null() {
124             return Err(());
125         }
126 
127         let packet = &*(token.zero.0 as *const Packet<T>);
128 
129         if packet.on_stack {
130             // The message has been in the packet from the beginning, so there is no need to wait
131             // for it. However, after reading the message, we need to set `ready` to `true` in
132             // order to signal that the packet can be destroyed.
133             let msg = packet.msg.get().replace(None).unwrap();
134             packet.ready.store(true, Ordering::Release);
135             Ok(msg)
136         } else {
137             // Wait until the message becomes available, then read it and destroy the
138             // heap-allocated packet.
139             packet.wait_ready();
140             let msg = packet.msg.get().replace(None).unwrap();
141             drop(Box::from_raw(token.zero.0 as *mut Packet<T>));
142             Ok(msg)
143         }
144     }
145 
146     /// Attempts to send a message into the channel.
try_send(&self, msg: T) -> Result<(), TrySendError<T>>147     pub(crate) fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> {
148         let token = &mut Token::default();
149         let mut inner = self.inner.lock().unwrap();
150 
151         // If there's a waiting receiver, pair up with it.
152         if let Some(operation) = inner.receivers.try_select() {
153             token.zero.0 = operation.packet;
154             drop(inner);
155             unsafe {
156                 self.write(token, msg).ok().unwrap();
157             }
158             Ok(())
159         } else if inner.is_disconnected {
160             Err(TrySendError::Disconnected(msg))
161         } else {
162             Err(TrySendError::Full(msg))
163         }
164     }
165 
166     /// Sends a message into the channel.
send( &self, msg: T, deadline: Option<Instant>, ) -> Result<(), SendTimeoutError<T>>167     pub(crate) fn send(
168         &self,
169         msg: T,
170         deadline: Option<Instant>,
171     ) -> Result<(), SendTimeoutError<T>> {
172         let token = &mut Token::default();
173         let mut inner = self.inner.lock().unwrap();
174 
175         // If there's a waiting receiver, pair up with it.
176         if let Some(operation) = inner.receivers.try_select() {
177             token.zero.0 = operation.packet;
178             drop(inner);
179             unsafe {
180                 self.write(token, msg).ok().unwrap();
181             }
182             return Ok(());
183         }
184 
185         if inner.is_disconnected {
186             return Err(SendTimeoutError::Disconnected(msg));
187         }
188 
189         Context::with(|cx| {
190             // Prepare for blocking until a receiver wakes us up.
191             let oper = Operation::hook(token);
192             let mut packet = Packet::<T>::message_on_stack(msg);
193             inner
194                 .senders
195                 .register_with_packet(oper, &mut packet as *mut Packet<T> as *mut (), cx);
196             inner.receivers.notify();
197             drop(inner);
198 
199             // Block the current thread.
200             let sel = cx.wait_until(deadline);
201 
202             match sel {
203                 Selected::Waiting => unreachable!(),
204                 Selected::Aborted => {
205                     self.inner.lock().unwrap().senders.unregister(oper).unwrap();
206                     let msg = unsafe { packet.msg.get().replace(None).unwrap() };
207                     Err(SendTimeoutError::Timeout(msg))
208                 }
209                 Selected::Disconnected => {
210                     self.inner.lock().unwrap().senders.unregister(oper).unwrap();
211                     let msg = unsafe { packet.msg.get().replace(None).unwrap() };
212                     Err(SendTimeoutError::Disconnected(msg))
213                 }
214                 Selected::Operation(_) => {
215                     // Wait until the message is read, then drop the packet.
216                     packet.wait_ready();
217                     Ok(())
218                 }
219             }
220         })
221     }
222 
223     /// Attempts to receive a message without blocking.
try_recv(&self) -> Result<T, TryRecvError>224     pub(crate) fn try_recv(&self) -> Result<T, TryRecvError> {
225         let token = &mut Token::default();
226         let mut inner = self.inner.lock().unwrap();
227 
228         // If there's a waiting sender, pair up with it.
229         if let Some(operation) = inner.senders.try_select() {
230             token.zero.0 = operation.packet;
231             drop(inner);
232             unsafe { self.read(token).map_err(|_| TryRecvError::Disconnected) }
233         } else if inner.is_disconnected {
234             Err(TryRecvError::Disconnected)
235         } else {
236             Err(TryRecvError::Empty)
237         }
238     }
239 
240     /// Receives a message from the channel.
recv(&self, deadline: Option<Instant>) -> Result<T, RecvTimeoutError>241     pub(crate) fn recv(&self, deadline: Option<Instant>) -> Result<T, RecvTimeoutError> {
242         let token = &mut Token::default();
243         let mut inner = self.inner.lock().unwrap();
244 
245         // If there's a waiting sender, pair up with it.
246         if let Some(operation) = inner.senders.try_select() {
247             token.zero.0 = operation.packet;
248             drop(inner);
249             unsafe {
250                 return self.read(token).map_err(|_| RecvTimeoutError::Disconnected);
251             }
252         }
253 
254         if inner.is_disconnected {
255             return Err(RecvTimeoutError::Disconnected);
256         }
257 
258         Context::with(|cx| {
259             // Prepare for blocking until a sender wakes us up.
260             let oper = Operation::hook(token);
261             let mut packet = Packet::<T>::empty_on_stack();
262             inner.receivers.register_with_packet(
263                 oper,
264                 &mut packet as *mut Packet<T> as *mut (),
265                 cx,
266             );
267             inner.senders.notify();
268             drop(inner);
269 
270             // Block the current thread.
271             let sel = cx.wait_until(deadline);
272 
273             match sel {
274                 Selected::Waiting => unreachable!(),
275                 Selected::Aborted => {
276                     self.inner
277                         .lock()
278                         .unwrap()
279                         .receivers
280                         .unregister(oper)
281                         .unwrap();
282                     Err(RecvTimeoutError::Timeout)
283                 }
284                 Selected::Disconnected => {
285                     self.inner
286                         .lock()
287                         .unwrap()
288                         .receivers
289                         .unregister(oper)
290                         .unwrap();
291                     Err(RecvTimeoutError::Disconnected)
292                 }
293                 Selected::Operation(_) => {
294                     // Wait until the message is provided, then read it.
295                     packet.wait_ready();
296                     unsafe { Ok(packet.msg.get().replace(None).unwrap()) }
297                 }
298             }
299         })
300     }
301 
302     /// Disconnects the channel and wakes up all blocked senders and receivers.
303     ///
304     /// Returns `true` if this call disconnected the channel.
disconnect(&self) -> bool305     pub(crate) fn disconnect(&self) -> bool {
306         let mut inner = self.inner.lock().unwrap();
307 
308         if !inner.is_disconnected {
309             inner.is_disconnected = true;
310             inner.senders.disconnect();
311             inner.receivers.disconnect();
312             true
313         } else {
314             false
315         }
316     }
317 
318     /// Returns the current number of messages inside the channel.
len(&self) -> usize319     pub(crate) fn len(&self) -> usize {
320         0
321     }
322 
323     /// Returns the capacity of the channel.
324     #[allow(clippy::unnecessary_wraps)] // This is intentional.
capacity(&self) -> Option<usize>325     pub(crate) fn capacity(&self) -> Option<usize> {
326         Some(0)
327     }
328 
329     /// Returns `true` if the channel is empty.
is_empty(&self) -> bool330     pub(crate) fn is_empty(&self) -> bool {
331         true
332     }
333 
334     /// Returns `true` if the channel is full.
is_full(&self) -> bool335     pub(crate) fn is_full(&self) -> bool {
336         true
337     }
338 }
339