1 use super::*; 2 use crate::std::env; 3 use crate::std::rc::Rc; 4 use crate::std::sync::mpmc::SendTimeoutError; 5 use crate::std::thread; 6 use crate::std::time::Duration; 7 8 pub fn stress_factor() -> usize { 9 match env::var("RUST_TEST_STRESS") { 10 Ok(val) => val.parse().unwrap(), 11 Err(..) => 1, 12 } 13 } 14 15 #[test] 16 fn smoke() { 17 let (tx, rx) = sync_channel::<i32>(1); 18 tx.send(1).unwrap(); 19 assert_eq!(rx.recv().unwrap(), 1); 20 } 21 22 #[test] 23 fn drop_full() { 24 let (tx, _rx) = sync_channel::<Box<isize>>(1); 25 tx.send(Box::new(1)).unwrap(); 26 } 27 28 #[test] 29 fn smoke_shared() { 30 let (tx, rx) = sync_channel::<i32>(1); 31 tx.send(1).unwrap(); 32 assert_eq!(rx.recv().unwrap(), 1); 33 let tx = tx.clone(); 34 tx.send(1).unwrap(); 35 assert_eq!(rx.recv().unwrap(), 1); 36 } 37 38 #[test] 39 fn recv_timeout() { 40 let (tx, rx) = sync_channel::<i32>(1); 41 assert_eq!( 42 rx.recv_timeout(Duration::from_millis(1)), 43 Err(RecvTimeoutError::Timeout) 44 ); 45 tx.send(1).unwrap(); 46 assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(1)); 47 } 48 49 #[test] 50 fn send_timeout() { 51 let (tx, _rx) = sync_channel::<i32>(1); 52 assert_eq!(tx.send_timeout(1, Duration::from_millis(1)), Ok(())); 53 assert_eq!( 54 tx.send_timeout(1, Duration::from_millis(1)), 55 Err(SendTimeoutError::Timeout(1)) 56 ); 57 } 58 59 #[test] 60 fn smoke_threads() { 61 let (tx, rx) = sync_channel::<i32>(0); 62 let _t = thread::spawn(move || { 63 tx.send(1).unwrap(); 64 }); 65 assert_eq!(rx.recv().unwrap(), 1); 66 } 67 68 #[test] 69 fn smoke_port_gone() { 70 let (tx, rx) = sync_channel::<i32>(0); 71 drop(rx); 72 assert!(tx.send(1).is_err()); 73 } 74 75 #[test] 76 fn smoke_shared_port_gone2() { 77 let (tx, rx) = sync_channel::<i32>(0); 78 drop(rx); 79 let tx2 = tx.clone(); 80 drop(tx); 81 assert!(tx2.send(1).is_err()); 82 } 83 84 #[test] 85 fn port_gone_concurrent() { 86 let (tx, rx) = sync_channel::<i32>(0); 87 let _t = thread::spawn(move || { 88 rx.recv().unwrap(); 89 }); 90 while tx.send(1).is_ok() {} 91 } 92 93 #[test] 94 fn port_gone_concurrent_shared() { 95 let (tx, rx) = sync_channel::<i32>(0); 96 let tx2 = tx.clone(); 97 let _t = thread::spawn(move || { 98 rx.recv().unwrap(); 99 }); 100 while tx.send(1).is_ok() && tx2.send(1).is_ok() {} 101 } 102 103 #[test] 104 fn smoke_chan_gone() { 105 let (tx, rx) = sync_channel::<i32>(0); 106 drop(tx); 107 assert!(rx.recv().is_err()); 108 } 109 110 #[test] 111 fn smoke_chan_gone_shared() { 112 let (tx, rx) = sync_channel::<()>(0); 113 let tx2 = tx.clone(); 114 drop(tx); 115 drop(tx2); 116 assert!(rx.recv().is_err()); 117 } 118 119 #[test] 120 fn chan_gone_concurrent() { 121 let (tx, rx) = sync_channel::<i32>(0); 122 thread::spawn(move || { 123 tx.send(1).unwrap(); 124 tx.send(1).unwrap(); 125 }); 126 while rx.recv().is_ok() {} 127 } 128 129 #[test] 130 fn stress() { 131 let count = if cfg!(miri) { 100 } else { 10000 }; 132 let (tx, rx) = sync_channel::<i32>(0); 133 thread::spawn(move || { 134 for _ in 0..count { 135 tx.send(1).unwrap(); 136 } 137 }); 138 for _ in 0..count { 139 assert_eq!(rx.recv().unwrap(), 1); 140 } 141 } 142 143 #[test] 144 fn stress_recv_timeout_two_threads() { 145 let count = if cfg!(miri) { 100 } else { 10000 }; 146 let (tx, rx) = sync_channel::<i32>(0); 147 148 thread::spawn(move || { 149 for _ in 0..count { 150 tx.send(1).unwrap(); 151 } 152 }); 153 154 let mut recv_count = 0; 155 loop { 156 match rx.recv_timeout(Duration::from_millis(1)) { 157 Ok(v) => { 158 assert_eq!(v, 1); 159 recv_count += 1; 160 } 161 Err(RecvTimeoutError::Timeout) => continue, 162 Err(RecvTimeoutError::Disconnected) => break, 163 } 164 } 165 166 assert_eq!(recv_count, count); 167 } 168 169 #[test] 170 fn stress_recv_timeout_shared() { 171 const AMT: u32 = if cfg!(miri) { 100 } else { 1000 }; 172 const NTHREADS: u32 = 8; 173 let (tx, rx) = sync_channel::<i32>(0); 174 let (dtx, drx) = sync_channel::<()>(0); 175 176 thread::spawn(move || { 177 let mut recv_count = 0; 178 loop { 179 match rx.recv_timeout(Duration::from_millis(10)) { 180 Ok(v) => { 181 assert_eq!(v, 1); 182 recv_count += 1; 183 } 184 Err(RecvTimeoutError::Timeout) => continue, 185 Err(RecvTimeoutError::Disconnected) => break, 186 } 187 } 188 189 assert_eq!(recv_count, AMT * NTHREADS); 190 assert!(rx.try_recv().is_err()); 191 192 dtx.send(()).unwrap(); 193 }); 194 195 for _ in 0..NTHREADS { 196 let tx = tx.clone(); 197 thread::spawn(move || { 198 for _ in 0..AMT { 199 tx.send(1).unwrap(); 200 } 201 }); 202 } 203 204 drop(tx); 205 206 drx.recv().unwrap(); 207 } 208 209 #[test] 210 fn stress_shared() { 211 const AMT: u32 = if cfg!(miri) { 100 } else { 1000 }; 212 const NTHREADS: u32 = 8; 213 let (tx, rx) = sync_channel::<i32>(0); 214 let (dtx, drx) = sync_channel::<()>(0); 215 216 thread::spawn(move || { 217 for _ in 0..AMT * NTHREADS { 218 assert_eq!(rx.recv().unwrap(), 1); 219 } 220 match rx.try_recv() { 221 Ok(..) => panic!(), 222 _ => {} 223 } 224 dtx.send(()).unwrap(); 225 }); 226 227 for _ in 0..NTHREADS { 228 let tx = tx.clone(); 229 thread::spawn(move || { 230 for _ in 0..AMT { 231 tx.send(1).unwrap(); 232 } 233 }); 234 } 235 drop(tx); 236 drx.recv().unwrap(); 237 } 238 239 #[test] 240 fn oneshot_single_thread_close_port_first() { 241 // Simple test of closing without sending 242 let (_tx, rx) = sync_channel::<i32>(0); 243 drop(rx); 244 } 245 246 #[test] 247 fn oneshot_single_thread_close_chan_first() { 248 // Simple test of closing without sending 249 let (tx, _rx) = sync_channel::<i32>(0); 250 drop(tx); 251 } 252 253 #[test] 254 fn oneshot_single_thread_send_port_close() { 255 // Testing that the sender cleans up the payload if receiver is closed 256 let (tx, rx) = sync_channel::<Box<i32>>(0); 257 drop(rx); 258 assert!(tx.send(Box::new(0)).is_err()); 259 } 260 261 #[test] 262 fn oneshot_single_thread_recv_chan_close() { 263 // Receiving on a closed chan will panic 264 let res = thread::spawn(move || { 265 let (tx, rx) = sync_channel::<i32>(0); 266 drop(tx); 267 rx.recv().unwrap(); 268 }) 269 .join(); 270 // What is our res? 271 assert!(res.is_err()); 272 } 273 274 #[test] 275 fn oneshot_single_thread_send_then_recv() { 276 let (tx, rx) = sync_channel::<Box<i32>>(1); 277 tx.send(Box::new(10)).unwrap(); 278 assert!(*rx.recv().unwrap() == 10); 279 } 280 281 #[test] 282 fn oneshot_single_thread_try_send_open() { 283 let (tx, rx) = sync_channel::<i32>(1); 284 assert_eq!(tx.try_send(10), Ok(())); 285 assert!(rx.recv().unwrap() == 10); 286 } 287 288 #[test] 289 fn oneshot_single_thread_try_send_closed() { 290 let (tx, rx) = sync_channel::<i32>(0); 291 drop(rx); 292 assert_eq!(tx.try_send(10), Err(TrySendError::Disconnected(10))); 293 } 294 295 #[test] 296 fn oneshot_single_thread_try_send_closed2() { 297 let (tx, _rx) = sync_channel::<i32>(0); 298 assert_eq!(tx.try_send(10), Err(TrySendError::Full(10))); 299 } 300 301 #[test] 302 fn oneshot_single_thread_try_recv_open() { 303 let (tx, rx) = sync_channel::<i32>(1); 304 tx.send(10).unwrap(); 305 assert!(rx.recv() == Ok(10)); 306 } 307 308 #[test] 309 fn oneshot_single_thread_try_recv_closed() { 310 let (tx, rx) = sync_channel::<i32>(0); 311 drop(tx); 312 assert!(rx.recv().is_err()); 313 } 314 315 #[test] 316 fn oneshot_single_thread_try_recv_closed_with_data() { 317 let (tx, rx) = sync_channel::<i32>(1); 318 tx.send(10).unwrap(); 319 drop(tx); 320 assert_eq!(rx.try_recv(), Ok(10)); 321 assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected)); 322 } 323 324 #[test] 325 fn oneshot_single_thread_peek_data() { 326 let (tx, rx) = sync_channel::<i32>(1); 327 assert_eq!(rx.try_recv(), Err(TryRecvError::Empty)); 328 tx.send(10).unwrap(); 329 assert_eq!(rx.try_recv(), Ok(10)); 330 } 331 332 #[test] 333 fn oneshot_single_thread_peek_close() { 334 let (tx, rx) = sync_channel::<i32>(0); 335 drop(tx); 336 assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected)); 337 assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected)); 338 } 339 340 #[test] 341 fn oneshot_single_thread_peek_open() { 342 let (_tx, rx) = sync_channel::<i32>(0); 343 assert_eq!(rx.try_recv(), Err(TryRecvError::Empty)); 344 } 345 346 #[test] 347 fn oneshot_multi_task_recv_then_send() { 348 let (tx, rx) = sync_channel::<Box<i32>>(0); 349 let _t = thread::spawn(move || { 350 assert!(*rx.recv().unwrap() == 10); 351 }); 352 353 tx.send(Box::new(10)).unwrap(); 354 } 355 356 #[test] 357 fn oneshot_multi_task_recv_then_close() { 358 let (tx, rx) = sync_channel::<Box<i32>>(0); 359 let _t = thread::spawn(move || { 360 drop(tx); 361 }); 362 let res = thread::spawn(move || { 363 assert!(*rx.recv().unwrap() == 10); 364 }) 365 .join(); 366 assert!(res.is_err()); 367 } 368 369 #[test] 370 fn oneshot_multi_thread_close_stress() { 371 for _ in 0..stress_factor() { 372 let (tx, rx) = sync_channel::<i32>(0); 373 let _t = thread::spawn(move || { 374 drop(rx); 375 }); 376 drop(tx); 377 } 378 } 379 380 #[test] 381 fn oneshot_multi_thread_send_close_stress() { 382 for _ in 0..stress_factor() { 383 let (tx, rx) = sync_channel::<i32>(0); 384 let _t = thread::spawn(move || { 385 drop(rx); 386 }); 387 let _ = thread::spawn(move || { 388 tx.send(1).unwrap(); 389 }) 390 .join(); 391 } 392 } 393 394 #[test] 395 fn oneshot_multi_thread_recv_close_stress() { 396 for _ in 0..stress_factor() { 397 let (tx, rx) = sync_channel::<i32>(0); 398 let _t = thread::spawn(move || { 399 let res = thread::spawn(move || { 400 rx.recv().unwrap(); 401 }) 402 .join(); 403 assert!(res.is_err()); 404 }); 405 let _t = thread::spawn(move || { 406 thread::spawn(move || { 407 drop(tx); 408 }); 409 }); 410 } 411 } 412 413 #[test] 414 fn oneshot_multi_thread_send_recv_stress() { 415 for _ in 0..stress_factor() { 416 let (tx, rx) = sync_channel::<Box<i32>>(0); 417 let _t = thread::spawn(move || { 418 tx.send(Box::new(10)).unwrap(); 419 }); 420 assert!(*rx.recv().unwrap() == 10); 421 } 422 } 423 424 #[test] 425 fn stream_send_recv_stress() { 426 for _ in 0..stress_factor() { 427 let (tx, rx) = sync_channel::<Box<i32>>(0); 428 429 send(tx, 0); 430 recv(rx, 0); 431 432 fn send(tx: SyncSender<Box<i32>>, i: i32) { 433 if i == 10 { 434 return; 435 } 436 437 thread::spawn(move || { 438 tx.send(Box::new(i)).unwrap(); 439 send(tx, i + 1); 440 }); 441 } 442 443 fn recv(rx: Receiver<Box<i32>>, i: i32) { 444 if i == 10 { 445 return; 446 } 447 448 thread::spawn(move || { 449 assert!(*rx.recv().unwrap() == i); 450 recv(rx, i + 1); 451 }); 452 } 453 } 454 } 455 456 #[test] 457 fn recv_a_lot() { 458 let count = if cfg!(miri) { 1000 } else { 10000 }; 459 // Regression test that we don't run out of stack in scheduler context 460 let (tx, rx) = sync_channel(count); 461 for _ in 0..count { 462 tx.send(()).unwrap(); 463 } 464 for _ in 0..count { 465 rx.recv().unwrap(); 466 } 467 } 468 469 #[test] 470 fn shared_chan_stress() { 471 let (tx, rx) = sync_channel(0); 472 let total = stress_factor() + 100; 473 for _ in 0..total { 474 let tx = tx.clone(); 475 thread::spawn(move || { 476 tx.send(()).unwrap(); 477 }); 478 } 479 480 for _ in 0..total { 481 rx.recv().unwrap(); 482 } 483 } 484 485 #[test] 486 fn test_nested_recv_iter() { 487 let (tx, rx) = sync_channel::<i32>(0); 488 let (total_tx, total_rx) = sync_channel::<i32>(0); 489 490 let _t = thread::spawn(move || { 491 let mut acc = 0; 492 for x in rx.iter() { 493 acc += x; 494 } 495 total_tx.send(acc).unwrap(); 496 }); 497 498 tx.send(3).unwrap(); 499 tx.send(1).unwrap(); 500 tx.send(2).unwrap(); 501 drop(tx); 502 assert_eq!(total_rx.recv().unwrap(), 6); 503 } 504 505 #[test] 506 fn test_recv_iter_break() { 507 let (tx, rx) = sync_channel::<i32>(0); 508 let (count_tx, count_rx) = sync_channel(0); 509 510 let _t = thread::spawn(move || { 511 let mut count = 0; 512 for x in rx.iter() { 513 if count >= 3 { 514 break; 515 } else { 516 count += x; 517 } 518 } 519 count_tx.send(count).unwrap(); 520 }); 521 522 tx.send(2).unwrap(); 523 tx.send(2).unwrap(); 524 tx.send(2).unwrap(); 525 let _ = tx.try_send(2); 526 drop(tx); 527 assert_eq!(count_rx.recv().unwrap(), 4); 528 } 529 530 #[test] 531 fn try_recv_states() { 532 let (tx1, rx1) = sync_channel::<i32>(1); 533 let (tx2, rx2) = sync_channel::<()>(1); 534 let (tx3, rx3) = sync_channel::<()>(1); 535 let _t = thread::spawn(move || { 536 rx2.recv().unwrap(); 537 tx1.send(1).unwrap(); 538 tx3.send(()).unwrap(); 539 rx2.recv().unwrap(); 540 drop(tx1); 541 tx3.send(()).unwrap(); 542 }); 543 544 assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty)); 545 tx2.send(()).unwrap(); 546 rx3.recv().unwrap(); 547 assert_eq!(rx1.try_recv(), Ok(1)); 548 assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty)); 549 tx2.send(()).unwrap(); 550 rx3.recv().unwrap(); 551 assert_eq!(rx1.try_recv(), Err(TryRecvError::Disconnected)); 552 } 553 554 // This bug used to end up in a livelock inside of the Receiver destructor 555 // because the internal state of the Shared packet was corrupted 556 #[test] 557 fn destroy_upgraded_shared_port_when_sender_still_active() { 558 let (tx, rx) = sync_channel::<()>(0); 559 let (tx2, rx2) = sync_channel::<()>(0); 560 let _t = thread::spawn(move || { 561 rx.recv().unwrap(); // wait on a oneshot 562 drop(rx); // destroy a shared 563 tx2.send(()).unwrap(); 564 }); 565 // make sure the other thread has gone to sleep 566 for _ in 0..5000 { 567 thread::yield_now(); 568 } 569 570 // upgrade to a shared chan and send a message 571 let t = tx.clone(); 572 drop(tx); 573 t.send(()).unwrap(); 574 575 // wait for the child thread to exit before we exit 576 rx2.recv().unwrap(); 577 } 578 579 #[test] 580 fn send1() { 581 let (tx, rx) = sync_channel::<i32>(0); 582 let _t = thread::spawn(move || { 583 rx.recv().unwrap(); 584 }); 585 assert_eq!(tx.send(1), Ok(())); 586 } 587 588 #[test] 589 fn send2() { 590 let (tx, rx) = sync_channel::<i32>(0); 591 let _t = thread::spawn(move || { 592 drop(rx); 593 }); 594 assert!(tx.send(1).is_err()); 595 } 596 597 #[test] 598 fn send3() { 599 let (tx, rx) = sync_channel::<i32>(1); 600 assert_eq!(tx.send(1), Ok(())); 601 let _t = thread::spawn(move || { 602 drop(rx); 603 }); 604 assert!(tx.send(1).is_err()); 605 } 606 607 #[test] 608 fn send4() { 609 let (tx, rx) = sync_channel::<i32>(0); 610 let tx2 = tx.clone(); 611 let (done, donerx) = channel(); 612 let done2 = done.clone(); 613 let _t = thread::spawn(move || { 614 assert!(tx.send(1).is_err()); 615 done.send(()).unwrap(); 616 }); 617 let _t = thread::spawn(move || { 618 assert!(tx2.send(2).is_err()); 619 done2.send(()).unwrap(); 620 }); 621 drop(rx); 622 donerx.recv().unwrap(); 623 donerx.recv().unwrap(); 624 } 625 626 #[test] 627 fn try_send1() { 628 let (tx, _rx) = sync_channel::<i32>(0); 629 assert_eq!(tx.try_send(1), Err(TrySendError::Full(1))); 630 } 631 632 #[test] 633 fn try_send2() { 634 let (tx, _rx) = sync_channel::<i32>(1); 635 assert_eq!(tx.try_send(1), Ok(())); 636 assert_eq!(tx.try_send(1), Err(TrySendError::Full(1))); 637 } 638 639 #[test] 640 fn try_send3() { 641 let (tx, rx) = sync_channel::<i32>(1); 642 assert_eq!(tx.try_send(1), Ok(())); 643 drop(rx); 644 assert_eq!(tx.try_send(1), Err(TrySendError::Disconnected(1))); 645 } 646 647 #[test] 648 fn issue_15761() { 649 fn repro() { 650 let (tx1, rx1) = sync_channel::<()>(3); 651 let (tx2, rx2) = sync_channel::<()>(3); 652 653 let _t = thread::spawn(move || { 654 rx1.recv().unwrap(); 655 tx2.try_send(()).unwrap(); 656 }); 657 658 tx1.try_send(()).unwrap(); 659 rx2.recv().unwrap(); 660 } 661 662 for _ in 0..100 { 663 repro() 664 } 665 } 666 667 #[test] 668 fn drop_unreceived() { 669 let (tx, rx) = sync_channel::<Rc<()>>(1); 670 let msg = Rc::new(()); 671 let weak = Rc::downgrade(&msg); 672 assert!(tx.send(msg).is_ok()); 673 drop(rx); 674 // Messages should be dropped immediately when the last receiver is destroyed. 675 assert!(weak.upgrade().is_none()); 676 drop(tx); 677 } 678