xref: /drstd/src/std/sync/mpmc/mod.rs (revision 0fe3ff0054d3aec7fbf9bddecfecb10bc7d23a51)
1 //! Multi-producer multi-consumer channels.
2 
3 // This module is not currently exposed publicly, but is used
4 // as the implementation for the channels in `sync::mpsc`. The
5 // implementation comes from the crossbeam-channel crate:
6 //
7 // Copyright (c) 2019 The Crossbeam Project Developers
8 //
9 // Permission is hereby granted, free of charge, to any
10 // person obtaining a copy of this software and associated
11 // documentation files (the "Software"), to deal in the
12 // Software without restriction, including without
13 // limitation the rights to use, copy, modify, merge,
14 // publish, distribute, sublicense, and/or sell copies of
15 // the Software, and to permit persons to whom the Software
16 // is furnished to do so, subject to the following
17 // conditions:
18 //
19 // The above copyright notice and this permission notice
20 // shall be included in all copies or substantial portions
21 // of the Software.
22 //
23 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF
24 // ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
25 // TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
26 // PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
27 // SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
28 // CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
29 // OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
30 // IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
31 // DEALINGS IN THE SOFTWARE.
32 
33 mod array;
34 mod context;
35 mod counter;
36 mod error;
37 mod list;
38 mod select;
39 mod utils;
40 mod waker;
41 mod zero;
42 
43 use crate::std::fmt;
44 use crate::std::panic::{RefUnwindSafe, UnwindSafe};
45 use crate::std::time::{Duration, Instant};
46 pub use error::*;
47 
48 /// Creates a channel of unbounded capacity.
49 ///
50 /// This channel has a growable buffer that can hold any number of messages at a time.
51 pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
52     let (s, r) = counter::new(list::Channel::new());
53     let s = Sender {
54         flavor: SenderFlavor::List(s),
55     };
56     let r = Receiver {
57         flavor: ReceiverFlavor::List(r),
58     };
59     (s, r)
60 }
61 
62 /// Creates a channel of bounded capacity.
63 ///
64 /// This channel has a buffer that can hold at most `cap` messages at a time.
65 ///
66 /// A special case is zero-capacity channel, which cannot hold any messages. Instead, send and
67 /// receive operations must appear at the same time in order to pair up and pass the message over.
68 pub fn sync_channel<T>(cap: usize) -> (Sender<T>, Receiver<T>) {
69     if cap == 0 {
70         let (s, r) = counter::new(zero::Channel::new());
71         let s = Sender {
72             flavor: SenderFlavor::Zero(s),
73         };
74         let r = Receiver {
75             flavor: ReceiverFlavor::Zero(r),
76         };
77         (s, r)
78     } else {
79         let (s, r) = counter::new(array::Channel::with_capacity(cap));
80         let s = Sender {
81             flavor: SenderFlavor::Array(s),
82         };
83         let r = Receiver {
84             flavor: ReceiverFlavor::Array(r),
85         };
86         (s, r)
87     }
88 }
89 
90 /// The sending side of a channel.
91 pub struct Sender<T> {
92     flavor: SenderFlavor<T>,
93 }
94 
95 /// Sender flavors.
96 enum SenderFlavor<T> {
97     /// Bounded channel based on a preallocated array.
98     Array(counter::Sender<array::Channel<T>>),
99 
100     /// Unbounded channel implemented as a linked list.
101     List(counter::Sender<list::Channel<T>>),
102 
103     /// Zero-capacity channel.
104     Zero(counter::Sender<zero::Channel<T>>),
105 }
106 
107 unsafe impl<T: Send> Send for Sender<T> {}
108 unsafe impl<T: Send> Sync for Sender<T> {}
109 
110 impl<T> UnwindSafe for Sender<T> {}
111 impl<T> RefUnwindSafe for Sender<T> {}
112 
113 impl<T> Sender<T> {
114     /// Attempts to send a message into the channel without blocking.
115     ///
116     /// This method will either send a message into the channel immediately or return an error if
117     /// the channel is full or disconnected. The returned error contains the original message.
118     ///
119     /// If called on a zero-capacity channel, this method will send the message only if there
120     /// happens to be a receive operation on the other side of the channel at the same time.
121     pub fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> {
122         match &self.flavor {
123             SenderFlavor::Array(chan) => chan.try_send(msg),
124             SenderFlavor::List(chan) => chan.try_send(msg),
125             SenderFlavor::Zero(chan) => chan.try_send(msg),
126         }
127     }
128 
129     /// Blocks the current thread until a message is sent or the channel is disconnected.
130     ///
131     /// If the channel is full and not disconnected, this call will block until the send operation
132     /// can proceed. If the channel becomes disconnected, this call will wake up and return an
133     /// error. The returned error contains the original message.
134     ///
135     /// If called on a zero-capacity channel, this method will wait for a receive operation to
136     /// appear on the other side of the channel.
137     pub fn send(&self, msg: T) -> Result<(), SendError<T>> {
138         match &self.flavor {
139             SenderFlavor::Array(chan) => chan.send(msg, None),
140             SenderFlavor::List(chan) => chan.send(msg, None),
141             SenderFlavor::Zero(chan) => chan.send(msg, None),
142         }
143         .map_err(|err| match err {
144             SendTimeoutError::Disconnected(msg) => SendError(msg),
145             SendTimeoutError::Timeout(_) => unreachable!(),
146         })
147     }
148 }
149 
150 // The methods below are not used by `sync::mpsc`, but
151 // are useful and we'll likely want to expose them
152 // eventually
153 #[allow(unused)]
154 impl<T> Sender<T> {
155     /// Waits for a message to be sent into the channel, but only for a limited time.
156     ///
157     /// If the channel is full and not disconnected, this call will block until the send operation
158     /// can proceed or the operation times out. If the channel becomes disconnected, this call will
159     /// wake up and return an error. The returned error contains the original message.
160     ///
161     /// If called on a zero-capacity channel, this method will wait for a receive operation to
162     /// appear on the other side of the channel.
163     pub fn send_timeout(&self, msg: T, timeout: Duration) -> Result<(), SendTimeoutError<T>> {
164         match Instant::now().checked_add(timeout) {
165             Some(deadline) => self.send_deadline(msg, deadline),
166             // So far in the future that it's practically the same as waiting indefinitely.
167             None => self.send(msg).map_err(SendTimeoutError::from),
168         }
169     }
170 
171     /// Waits for a message to be sent into the channel, but only until a given deadline.
172     ///
173     /// If the channel is full and not disconnected, this call will block until the send operation
174     /// can proceed or the operation times out. If the channel becomes disconnected, this call will
175     /// wake up and return an error. The returned error contains the original message.
176     ///
177     /// If called on a zero-capacity channel, this method will wait for a receive operation to
178     /// appear on the other side of the channel.
179     pub fn send_deadline(&self, msg: T, deadline: Instant) -> Result<(), SendTimeoutError<T>> {
180         match &self.flavor {
181             SenderFlavor::Array(chan) => chan.send(msg, Some(deadline)),
182             SenderFlavor::List(chan) => chan.send(msg, Some(deadline)),
183             SenderFlavor::Zero(chan) => chan.send(msg, Some(deadline)),
184         }
185     }
186 
187     /// Returns `true` if the channel is empty.
188     ///
189     /// Note: Zero-capacity channels are always empty.
190     pub fn is_empty(&self) -> bool {
191         match &self.flavor {
192             SenderFlavor::Array(chan) => chan.is_empty(),
193             SenderFlavor::List(chan) => chan.is_empty(),
194             SenderFlavor::Zero(chan) => chan.is_empty(),
195         }
196     }
197 
198     /// Returns `true` if the channel is full.
199     ///
200     /// Note: Zero-capacity channels are always full.
201     pub fn is_full(&self) -> bool {
202         match &self.flavor {
203             SenderFlavor::Array(chan) => chan.is_full(),
204             SenderFlavor::List(chan) => chan.is_full(),
205             SenderFlavor::Zero(chan) => chan.is_full(),
206         }
207     }
208 
209     /// Returns the number of messages in the channel.
210     pub fn len(&self) -> usize {
211         match &self.flavor {
212             SenderFlavor::Array(chan) => chan.len(),
213             SenderFlavor::List(chan) => chan.len(),
214             SenderFlavor::Zero(chan) => chan.len(),
215         }
216     }
217 
218     /// If the channel is bounded, returns its capacity.
219     pub fn capacity(&self) -> Option<usize> {
220         match &self.flavor {
221             SenderFlavor::Array(chan) => chan.capacity(),
222             SenderFlavor::List(chan) => chan.capacity(),
223             SenderFlavor::Zero(chan) => chan.capacity(),
224         }
225     }
226 
227     /// Returns `true` if senders belong to the same channel.
228     pub fn same_channel(&self, other: &Sender<T>) -> bool {
229         match (&self.flavor, &other.flavor) {
230             (SenderFlavor::Array(ref a), SenderFlavor::Array(ref b)) => a == b,
231             (SenderFlavor::List(ref a), SenderFlavor::List(ref b)) => a == b,
232             (SenderFlavor::Zero(ref a), SenderFlavor::Zero(ref b)) => a == b,
233             _ => false,
234         }
235     }
236 }
237 
238 impl<T> Drop for Sender<T> {
239     fn drop(&mut self) {
240         unsafe {
241             match &self.flavor {
242                 SenderFlavor::Array(chan) => chan.release(|c| c.disconnect_senders()),
243                 SenderFlavor::List(chan) => chan.release(|c| c.disconnect_senders()),
244                 SenderFlavor::Zero(chan) => chan.release(|c| c.disconnect()),
245             }
246         }
247     }
248 }
249 
250 impl<T> Clone for Sender<T> {
251     fn clone(&self) -> Self {
252         let flavor = match &self.flavor {
253             SenderFlavor::Array(chan) => SenderFlavor::Array(chan.acquire()),
254             SenderFlavor::List(chan) => SenderFlavor::List(chan.acquire()),
255             SenderFlavor::Zero(chan) => SenderFlavor::Zero(chan.acquire()),
256         };
257 
258         Sender { flavor }
259     }
260 }
261 
262 impl<T> fmt::Debug for Sender<T> {
263     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
264         f.pad("Sender { .. }")
265     }
266 }
267 
268 /// The receiving side of a channel.
269 pub struct Receiver<T> {
270     flavor: ReceiverFlavor<T>,
271 }
272 
273 /// Receiver flavors.
274 enum ReceiverFlavor<T> {
275     /// Bounded channel based on a preallocated array.
276     Array(counter::Receiver<array::Channel<T>>),
277 
278     /// Unbounded channel implemented as a linked list.
279     List(counter::Receiver<list::Channel<T>>),
280 
281     /// Zero-capacity channel.
282     Zero(counter::Receiver<zero::Channel<T>>),
283 }
284 
285 unsafe impl<T: Send> Send for Receiver<T> {}
286 unsafe impl<T: Send> Sync for Receiver<T> {}
287 
288 impl<T> UnwindSafe for Receiver<T> {}
289 impl<T> RefUnwindSafe for Receiver<T> {}
290 
291 impl<T> Receiver<T> {
292     /// Attempts to receive a message from the channel without blocking.
293     ///
294     /// This method will either receive a message from the channel immediately or return an error
295     /// if the channel is empty.
296     ///
297     /// If called on a zero-capacity channel, this method will receive a message only if there
298     /// happens to be a send operation on the other side of the channel at the same time.
299     pub fn try_recv(&self) -> Result<T, TryRecvError> {
300         match &self.flavor {
301             ReceiverFlavor::Array(chan) => chan.try_recv(),
302             ReceiverFlavor::List(chan) => chan.try_recv(),
303             ReceiverFlavor::Zero(chan) => chan.try_recv(),
304         }
305     }
306 
307     /// Blocks the current thread until a message is received or the channel is empty and
308     /// disconnected.
309     ///
310     /// If the channel is empty and not disconnected, this call will block until the receive
311     /// operation can proceed. If the channel is empty and becomes disconnected, this call will
312     /// wake up and return an error.
313     ///
314     /// If called on a zero-capacity channel, this method will wait for a send operation to appear
315     /// on the other side of the channel.
316     pub fn recv(&self) -> Result<T, RecvError> {
317         match &self.flavor {
318             ReceiverFlavor::Array(chan) => chan.recv(None),
319             ReceiverFlavor::List(chan) => chan.recv(None),
320             ReceiverFlavor::Zero(chan) => chan.recv(None),
321         }
322         .map_err(|_| RecvError)
323     }
324 
325     /// Waits for a message to be received from the channel, but only for a limited time.
326     ///
327     /// If the channel is empty and not disconnected, this call will block until the receive
328     /// operation can proceed or the operation times out. If the channel is empty and becomes
329     /// disconnected, this call will wake up and return an error.
330     ///
331     /// If called on a zero-capacity channel, this method will wait for a send operation to appear
332     /// on the other side of the channel.
333     pub fn recv_timeout(&self, timeout: Duration) -> Result<T, RecvTimeoutError> {
334         match Instant::now().checked_add(timeout) {
335             Some(deadline) => self.recv_deadline(deadline),
336             // So far in the future that it's practically the same as waiting indefinitely.
337             None => self.recv().map_err(RecvTimeoutError::from),
338         }
339     }
340 
341     /// Waits for a message to be received from the channel, but only for a limited time.
342     ///
343     /// If the channel is empty and not disconnected, this call will block until the receive
344     /// operation can proceed or the operation times out. If the channel is empty and becomes
345     /// disconnected, this call will wake up and return an error.
346     ///
347     /// If called on a zero-capacity channel, this method will wait for a send operation to appear
348     /// on the other side of the channel.
349     pub fn recv_deadline(&self, deadline: Instant) -> Result<T, RecvTimeoutError> {
350         match &self.flavor {
351             ReceiverFlavor::Array(chan) => chan.recv(Some(deadline)),
352             ReceiverFlavor::List(chan) => chan.recv(Some(deadline)),
353             ReceiverFlavor::Zero(chan) => chan.recv(Some(deadline)),
354         }
355     }
356 }
357 
358 // The methods below are not used by `sync::mpsc`, but
359 // are useful and we'll likely want to expose them
360 // eventually
361 #[allow(unused)]
362 impl<T> Receiver<T> {
363     /// Returns `true` if the channel is empty.
364     ///
365     /// Note: Zero-capacity channels are always empty.
366     pub fn is_empty(&self) -> bool {
367         match &self.flavor {
368             ReceiverFlavor::Array(chan) => chan.is_empty(),
369             ReceiverFlavor::List(chan) => chan.is_empty(),
370             ReceiverFlavor::Zero(chan) => chan.is_empty(),
371         }
372     }
373 
374     /// Returns `true` if the channel is full.
375     ///
376     /// Note: Zero-capacity channels are always full.
377     pub fn is_full(&self) -> bool {
378         match &self.flavor {
379             ReceiverFlavor::Array(chan) => chan.is_full(),
380             ReceiverFlavor::List(chan) => chan.is_full(),
381             ReceiverFlavor::Zero(chan) => chan.is_full(),
382         }
383     }
384 
385     /// Returns the number of messages in the channel.
386     pub fn len(&self) -> usize {
387         match &self.flavor {
388             ReceiverFlavor::Array(chan) => chan.len(),
389             ReceiverFlavor::List(chan) => chan.len(),
390             ReceiverFlavor::Zero(chan) => chan.len(),
391         }
392     }
393 
394     /// If the channel is bounded, returns its capacity.
395     pub fn capacity(&self) -> Option<usize> {
396         match &self.flavor {
397             ReceiverFlavor::Array(chan) => chan.capacity(),
398             ReceiverFlavor::List(chan) => chan.capacity(),
399             ReceiverFlavor::Zero(chan) => chan.capacity(),
400         }
401     }
402 
403     /// Returns `true` if receivers belong to the same channel.
404     pub fn same_channel(&self, other: &Receiver<T>) -> bool {
405         match (&self.flavor, &other.flavor) {
406             (ReceiverFlavor::Array(a), ReceiverFlavor::Array(b)) => a == b,
407             (ReceiverFlavor::List(a), ReceiverFlavor::List(b)) => a == b,
408             (ReceiverFlavor::Zero(a), ReceiverFlavor::Zero(b)) => a == b,
409             _ => false,
410         }
411     }
412 }
413 
414 impl<T> Drop for Receiver<T> {
415     fn drop(&mut self) {
416         unsafe {
417             match &self.flavor {
418                 ReceiverFlavor::Array(chan) => chan.release(|c| c.disconnect_receivers()),
419                 ReceiverFlavor::List(chan) => chan.release(|c| c.disconnect_receivers()),
420                 ReceiverFlavor::Zero(chan) => chan.release(|c| c.disconnect()),
421             }
422         }
423     }
424 }
425 
426 impl<T> Clone for Receiver<T> {
427     fn clone(&self) -> Self {
428         let flavor = match &self.flavor {
429             ReceiverFlavor::Array(chan) => ReceiverFlavor::Array(chan.acquire()),
430             ReceiverFlavor::List(chan) => ReceiverFlavor::List(chan.acquire()),
431             ReceiverFlavor::Zero(chan) => ReceiverFlavor::Zero(chan.acquire()),
432         };
433 
434         Receiver { flavor }
435     }
436 }
437 
438 impl<T> fmt::Debug for Receiver<T> {
439     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
440         f.pad("Receiver { .. }")
441     }
442 }
443