1 //! Multi-producer multi-consumer channels. 2 3 // This module is not currently exposed publicly, but is used 4 // as the implementation for the channels in `sync::mpsc`. The 5 // implementation comes from the crossbeam-channel crate: 6 // 7 // Copyright (c) 2019 The Crossbeam Project Developers 8 // 9 // Permission is hereby granted, free of charge, to any 10 // person obtaining a copy of this software and associated 11 // documentation files (the "Software"), to deal in the 12 // Software without restriction, including without 13 // limitation the rights to use, copy, modify, merge, 14 // publish, distribute, sublicense, and/or sell copies of 15 // the Software, and to permit persons to whom the Software 16 // is furnished to do so, subject to the following 17 // conditions: 18 // 19 // The above copyright notice and this permission notice 20 // shall be included in all copies or substantial portions 21 // of the Software. 22 // 23 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF 24 // ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED 25 // TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A 26 // PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT 27 // SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY 28 // CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION 29 // OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR 30 // IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER 31 // DEALINGS IN THE SOFTWARE. 32 33 mod array; 34 mod context; 35 mod counter; 36 mod error; 37 mod list; 38 mod select; 39 mod utils; 40 mod waker; 41 mod zero; 42 43 use crate::std::fmt; 44 use crate::std::panic::{RefUnwindSafe, UnwindSafe}; 45 use crate::std::time::{Duration, Instant}; 46 pub use error::*; 47 48 /// Creates a channel of unbounded capacity. 49 /// 50 /// This channel has a growable buffer that can hold any number of messages at a time. 51 pub fn channel<T>() -> (Sender<T>, Receiver<T>) { 52 let (s, r) = counter::new(list::Channel::new()); 53 let s = Sender { 54 flavor: SenderFlavor::List(s), 55 }; 56 let r = Receiver { 57 flavor: ReceiverFlavor::List(r), 58 }; 59 (s, r) 60 } 61 62 /// Creates a channel of bounded capacity. 63 /// 64 /// This channel has a buffer that can hold at most `cap` messages at a time. 65 /// 66 /// A special case is zero-capacity channel, which cannot hold any messages. Instead, send and 67 /// receive operations must appear at the same time in order to pair up and pass the message over. 68 pub fn sync_channel<T>(cap: usize) -> (Sender<T>, Receiver<T>) { 69 if cap == 0 { 70 let (s, r) = counter::new(zero::Channel::new()); 71 let s = Sender { 72 flavor: SenderFlavor::Zero(s), 73 }; 74 let r = Receiver { 75 flavor: ReceiverFlavor::Zero(r), 76 }; 77 (s, r) 78 } else { 79 let (s, r) = counter::new(array::Channel::with_capacity(cap)); 80 let s = Sender { 81 flavor: SenderFlavor::Array(s), 82 }; 83 let r = Receiver { 84 flavor: ReceiverFlavor::Array(r), 85 }; 86 (s, r) 87 } 88 } 89 90 /// The sending side of a channel. 91 pub struct Sender<T> { 92 flavor: SenderFlavor<T>, 93 } 94 95 /// Sender flavors. 96 enum SenderFlavor<T> { 97 /// Bounded channel based on a preallocated array. 98 Array(counter::Sender<array::Channel<T>>), 99 100 /// Unbounded channel implemented as a linked list. 101 List(counter::Sender<list::Channel<T>>), 102 103 /// Zero-capacity channel. 104 Zero(counter::Sender<zero::Channel<T>>), 105 } 106 107 unsafe impl<T: Send> Send for Sender<T> {} 108 unsafe impl<T: Send> Sync for Sender<T> {} 109 110 impl<T> UnwindSafe for Sender<T> {} 111 impl<T> RefUnwindSafe for Sender<T> {} 112 113 impl<T> Sender<T> { 114 /// Attempts to send a message into the channel without blocking. 115 /// 116 /// This method will either send a message into the channel immediately or return an error if 117 /// the channel is full or disconnected. The returned error contains the original message. 118 /// 119 /// If called on a zero-capacity channel, this method will send the message only if there 120 /// happens to be a receive operation on the other side of the channel at the same time. 121 pub fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> { 122 match &self.flavor { 123 SenderFlavor::Array(chan) => chan.try_send(msg), 124 SenderFlavor::List(chan) => chan.try_send(msg), 125 SenderFlavor::Zero(chan) => chan.try_send(msg), 126 } 127 } 128 129 /// Blocks the current thread until a message is sent or the channel is disconnected. 130 /// 131 /// If the channel is full and not disconnected, this call will block until the send operation 132 /// can proceed. If the channel becomes disconnected, this call will wake up and return an 133 /// error. The returned error contains the original message. 134 /// 135 /// If called on a zero-capacity channel, this method will wait for a receive operation to 136 /// appear on the other side of the channel. 137 pub fn send(&self, msg: T) -> Result<(), SendError<T>> { 138 match &self.flavor { 139 SenderFlavor::Array(chan) => chan.send(msg, None), 140 SenderFlavor::List(chan) => chan.send(msg, None), 141 SenderFlavor::Zero(chan) => chan.send(msg, None), 142 } 143 .map_err(|err| match err { 144 SendTimeoutError::Disconnected(msg) => SendError(msg), 145 SendTimeoutError::Timeout(_) => unreachable!(), 146 }) 147 } 148 } 149 150 // The methods below are not used by `sync::mpsc`, but 151 // are useful and we'll likely want to expose them 152 // eventually 153 #[allow(unused)] 154 impl<T> Sender<T> { 155 /// Waits for a message to be sent into the channel, but only for a limited time. 156 /// 157 /// If the channel is full and not disconnected, this call will block until the send operation 158 /// can proceed or the operation times out. If the channel becomes disconnected, this call will 159 /// wake up and return an error. The returned error contains the original message. 160 /// 161 /// If called on a zero-capacity channel, this method will wait for a receive operation to 162 /// appear on the other side of the channel. 163 pub fn send_timeout(&self, msg: T, timeout: Duration) -> Result<(), SendTimeoutError<T>> { 164 match Instant::now().checked_add(timeout) { 165 Some(deadline) => self.send_deadline(msg, deadline), 166 // So far in the future that it's practically the same as waiting indefinitely. 167 None => self.send(msg).map_err(SendTimeoutError::from), 168 } 169 } 170 171 /// Waits for a message to be sent into the channel, but only until a given deadline. 172 /// 173 /// If the channel is full and not disconnected, this call will block until the send operation 174 /// can proceed or the operation times out. If the channel becomes disconnected, this call will 175 /// wake up and return an error. The returned error contains the original message. 176 /// 177 /// If called on a zero-capacity channel, this method will wait for a receive operation to 178 /// appear on the other side of the channel. 179 pub fn send_deadline(&self, msg: T, deadline: Instant) -> Result<(), SendTimeoutError<T>> { 180 match &self.flavor { 181 SenderFlavor::Array(chan) => chan.send(msg, Some(deadline)), 182 SenderFlavor::List(chan) => chan.send(msg, Some(deadline)), 183 SenderFlavor::Zero(chan) => chan.send(msg, Some(deadline)), 184 } 185 } 186 187 /// Returns `true` if the channel is empty. 188 /// 189 /// Note: Zero-capacity channels are always empty. 190 pub fn is_empty(&self) -> bool { 191 match &self.flavor { 192 SenderFlavor::Array(chan) => chan.is_empty(), 193 SenderFlavor::List(chan) => chan.is_empty(), 194 SenderFlavor::Zero(chan) => chan.is_empty(), 195 } 196 } 197 198 /// Returns `true` if the channel is full. 199 /// 200 /// Note: Zero-capacity channels are always full. 201 pub fn is_full(&self) -> bool { 202 match &self.flavor { 203 SenderFlavor::Array(chan) => chan.is_full(), 204 SenderFlavor::List(chan) => chan.is_full(), 205 SenderFlavor::Zero(chan) => chan.is_full(), 206 } 207 } 208 209 /// Returns the number of messages in the channel. 210 pub fn len(&self) -> usize { 211 match &self.flavor { 212 SenderFlavor::Array(chan) => chan.len(), 213 SenderFlavor::List(chan) => chan.len(), 214 SenderFlavor::Zero(chan) => chan.len(), 215 } 216 } 217 218 /// If the channel is bounded, returns its capacity. 219 pub fn capacity(&self) -> Option<usize> { 220 match &self.flavor { 221 SenderFlavor::Array(chan) => chan.capacity(), 222 SenderFlavor::List(chan) => chan.capacity(), 223 SenderFlavor::Zero(chan) => chan.capacity(), 224 } 225 } 226 227 /// Returns `true` if senders belong to the same channel. 228 pub fn same_channel(&self, other: &Sender<T>) -> bool { 229 match (&self.flavor, &other.flavor) { 230 (SenderFlavor::Array(ref a), SenderFlavor::Array(ref b)) => a == b, 231 (SenderFlavor::List(ref a), SenderFlavor::List(ref b)) => a == b, 232 (SenderFlavor::Zero(ref a), SenderFlavor::Zero(ref b)) => a == b, 233 _ => false, 234 } 235 } 236 } 237 238 impl<T> Drop for Sender<T> { 239 fn drop(&mut self) { 240 unsafe { 241 match &self.flavor { 242 SenderFlavor::Array(chan) => chan.release(|c| c.disconnect_senders()), 243 SenderFlavor::List(chan) => chan.release(|c| c.disconnect_senders()), 244 SenderFlavor::Zero(chan) => chan.release(|c| c.disconnect()), 245 } 246 } 247 } 248 } 249 250 impl<T> Clone for Sender<T> { 251 fn clone(&self) -> Self { 252 let flavor = match &self.flavor { 253 SenderFlavor::Array(chan) => SenderFlavor::Array(chan.acquire()), 254 SenderFlavor::List(chan) => SenderFlavor::List(chan.acquire()), 255 SenderFlavor::Zero(chan) => SenderFlavor::Zero(chan.acquire()), 256 }; 257 258 Sender { flavor } 259 } 260 } 261 262 impl<T> fmt::Debug for Sender<T> { 263 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 264 f.pad("Sender { .. }") 265 } 266 } 267 268 /// The receiving side of a channel. 269 pub struct Receiver<T> { 270 flavor: ReceiverFlavor<T>, 271 } 272 273 /// Receiver flavors. 274 enum ReceiverFlavor<T> { 275 /// Bounded channel based on a preallocated array. 276 Array(counter::Receiver<array::Channel<T>>), 277 278 /// Unbounded channel implemented as a linked list. 279 List(counter::Receiver<list::Channel<T>>), 280 281 /// Zero-capacity channel. 282 Zero(counter::Receiver<zero::Channel<T>>), 283 } 284 285 unsafe impl<T: Send> Send for Receiver<T> {} 286 unsafe impl<T: Send> Sync for Receiver<T> {} 287 288 impl<T> UnwindSafe for Receiver<T> {} 289 impl<T> RefUnwindSafe for Receiver<T> {} 290 291 impl<T> Receiver<T> { 292 /// Attempts to receive a message from the channel without blocking. 293 /// 294 /// This method will either receive a message from the channel immediately or return an error 295 /// if the channel is empty. 296 /// 297 /// If called on a zero-capacity channel, this method will receive a message only if there 298 /// happens to be a send operation on the other side of the channel at the same time. 299 pub fn try_recv(&self) -> Result<T, TryRecvError> { 300 match &self.flavor { 301 ReceiverFlavor::Array(chan) => chan.try_recv(), 302 ReceiverFlavor::List(chan) => chan.try_recv(), 303 ReceiverFlavor::Zero(chan) => chan.try_recv(), 304 } 305 } 306 307 /// Blocks the current thread until a message is received or the channel is empty and 308 /// disconnected. 309 /// 310 /// If the channel is empty and not disconnected, this call will block until the receive 311 /// operation can proceed. If the channel is empty and becomes disconnected, this call will 312 /// wake up and return an error. 313 /// 314 /// If called on a zero-capacity channel, this method will wait for a send operation to appear 315 /// on the other side of the channel. 316 pub fn recv(&self) -> Result<T, RecvError> { 317 match &self.flavor { 318 ReceiverFlavor::Array(chan) => chan.recv(None), 319 ReceiverFlavor::List(chan) => chan.recv(None), 320 ReceiverFlavor::Zero(chan) => chan.recv(None), 321 } 322 .map_err(|_| RecvError) 323 } 324 325 /// Waits for a message to be received from the channel, but only for a limited time. 326 /// 327 /// If the channel is empty and not disconnected, this call will block until the receive 328 /// operation can proceed or the operation times out. If the channel is empty and becomes 329 /// disconnected, this call will wake up and return an error. 330 /// 331 /// If called on a zero-capacity channel, this method will wait for a send operation to appear 332 /// on the other side of the channel. 333 pub fn recv_timeout(&self, timeout: Duration) -> Result<T, RecvTimeoutError> { 334 match Instant::now().checked_add(timeout) { 335 Some(deadline) => self.recv_deadline(deadline), 336 // So far in the future that it's practically the same as waiting indefinitely. 337 None => self.recv().map_err(RecvTimeoutError::from), 338 } 339 } 340 341 /// Waits for a message to be received from the channel, but only for a limited time. 342 /// 343 /// If the channel is empty and not disconnected, this call will block until the receive 344 /// operation can proceed or the operation times out. If the channel is empty and becomes 345 /// disconnected, this call will wake up and return an error. 346 /// 347 /// If called on a zero-capacity channel, this method will wait for a send operation to appear 348 /// on the other side of the channel. 349 pub fn recv_deadline(&self, deadline: Instant) -> Result<T, RecvTimeoutError> { 350 match &self.flavor { 351 ReceiverFlavor::Array(chan) => chan.recv(Some(deadline)), 352 ReceiverFlavor::List(chan) => chan.recv(Some(deadline)), 353 ReceiverFlavor::Zero(chan) => chan.recv(Some(deadline)), 354 } 355 } 356 } 357 358 // The methods below are not used by `sync::mpsc`, but 359 // are useful and we'll likely want to expose them 360 // eventually 361 #[allow(unused)] 362 impl<T> Receiver<T> { 363 /// Returns `true` if the channel is empty. 364 /// 365 /// Note: Zero-capacity channels are always empty. 366 pub fn is_empty(&self) -> bool { 367 match &self.flavor { 368 ReceiverFlavor::Array(chan) => chan.is_empty(), 369 ReceiverFlavor::List(chan) => chan.is_empty(), 370 ReceiverFlavor::Zero(chan) => chan.is_empty(), 371 } 372 } 373 374 /// Returns `true` if the channel is full. 375 /// 376 /// Note: Zero-capacity channels are always full. 377 pub fn is_full(&self) -> bool { 378 match &self.flavor { 379 ReceiverFlavor::Array(chan) => chan.is_full(), 380 ReceiverFlavor::List(chan) => chan.is_full(), 381 ReceiverFlavor::Zero(chan) => chan.is_full(), 382 } 383 } 384 385 /// Returns the number of messages in the channel. 386 pub fn len(&self) -> usize { 387 match &self.flavor { 388 ReceiverFlavor::Array(chan) => chan.len(), 389 ReceiverFlavor::List(chan) => chan.len(), 390 ReceiverFlavor::Zero(chan) => chan.len(), 391 } 392 } 393 394 /// If the channel is bounded, returns its capacity. 395 pub fn capacity(&self) -> Option<usize> { 396 match &self.flavor { 397 ReceiverFlavor::Array(chan) => chan.capacity(), 398 ReceiverFlavor::List(chan) => chan.capacity(), 399 ReceiverFlavor::Zero(chan) => chan.capacity(), 400 } 401 } 402 403 /// Returns `true` if receivers belong to the same channel. 404 pub fn same_channel(&self, other: &Receiver<T>) -> bool { 405 match (&self.flavor, &other.flavor) { 406 (ReceiverFlavor::Array(a), ReceiverFlavor::Array(b)) => a == b, 407 (ReceiverFlavor::List(a), ReceiverFlavor::List(b)) => a == b, 408 (ReceiverFlavor::Zero(a), ReceiverFlavor::Zero(b)) => a == b, 409 _ => false, 410 } 411 } 412 } 413 414 impl<T> Drop for Receiver<T> { 415 fn drop(&mut self) { 416 unsafe { 417 match &self.flavor { 418 ReceiverFlavor::Array(chan) => chan.release(|c| c.disconnect_receivers()), 419 ReceiverFlavor::List(chan) => chan.release(|c| c.disconnect_receivers()), 420 ReceiverFlavor::Zero(chan) => chan.release(|c| c.disconnect()), 421 } 422 } 423 } 424 } 425 426 impl<T> Clone for Receiver<T> { 427 fn clone(&self) -> Self { 428 let flavor = match &self.flavor { 429 ReceiverFlavor::Array(chan) => ReceiverFlavor::Array(chan.acquire()), 430 ReceiverFlavor::List(chan) => ReceiverFlavor::List(chan.acquire()), 431 ReceiverFlavor::Zero(chan) => ReceiverFlavor::Zero(chan.acquire()), 432 }; 433 434 Receiver { flavor } 435 } 436 } 437 438 impl<T> fmt::Debug for Receiver<T> { 439 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 440 f.pad("Receiver { .. }") 441 } 442 } 443