xref: /drstd/src/std/sync/mpsc/sync_tests.rs (revision 0fe3ff0054d3aec7fbf9bddecfecb10bc7d23a51)
1 use super::*;
2 use crate::std::env;
3 use crate::std::rc::Rc;
4 use crate::std::sync::mpmc::SendTimeoutError;
5 use crate::std::thread;
6 use crate::std::time::Duration;
7 
8 pub fn stress_factor() -> usize {
9     match env::var("RUST_TEST_STRESS") {
10         Ok(val) => val.parse().unwrap(),
11         Err(..) => 1,
12     }
13 }
14 
15 #[test]
16 fn smoke() {
17     let (tx, rx) = sync_channel::<i32>(1);
18     tx.send(1).unwrap();
19     assert_eq!(rx.recv().unwrap(), 1);
20 }
21 
22 #[test]
23 fn drop_full() {
24     let (tx, _rx) = sync_channel::<Box<isize>>(1);
25     tx.send(Box::new(1)).unwrap();
26 }
27 
28 #[test]
29 fn smoke_shared() {
30     let (tx, rx) = sync_channel::<i32>(1);
31     tx.send(1).unwrap();
32     assert_eq!(rx.recv().unwrap(), 1);
33     let tx = tx.clone();
34     tx.send(1).unwrap();
35     assert_eq!(rx.recv().unwrap(), 1);
36 }
37 
38 #[test]
39 fn recv_timeout() {
40     let (tx, rx) = sync_channel::<i32>(1);
41     assert_eq!(
42         rx.recv_timeout(Duration::from_millis(1)),
43         Err(RecvTimeoutError::Timeout)
44     );
45     tx.send(1).unwrap();
46     assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(1));
47 }
48 
49 #[test]
50 fn send_timeout() {
51     let (tx, _rx) = sync_channel::<i32>(1);
52     assert_eq!(tx.send_timeout(1, Duration::from_millis(1)), Ok(()));
53     assert_eq!(
54         tx.send_timeout(1, Duration::from_millis(1)),
55         Err(SendTimeoutError::Timeout(1))
56     );
57 }
58 
59 #[test]
60 fn smoke_threads() {
61     let (tx, rx) = sync_channel::<i32>(0);
62     let _t = thread::spawn(move || {
63         tx.send(1).unwrap();
64     });
65     assert_eq!(rx.recv().unwrap(), 1);
66 }
67 
68 #[test]
69 fn smoke_port_gone() {
70     let (tx, rx) = sync_channel::<i32>(0);
71     drop(rx);
72     assert!(tx.send(1).is_err());
73 }
74 
75 #[test]
76 fn smoke_shared_port_gone2() {
77     let (tx, rx) = sync_channel::<i32>(0);
78     drop(rx);
79     let tx2 = tx.clone();
80     drop(tx);
81     assert!(tx2.send(1).is_err());
82 }
83 
84 #[test]
85 fn port_gone_concurrent() {
86     let (tx, rx) = sync_channel::<i32>(0);
87     let _t = thread::spawn(move || {
88         rx.recv().unwrap();
89     });
90     while tx.send(1).is_ok() {}
91 }
92 
93 #[test]
94 fn port_gone_concurrent_shared() {
95     let (tx, rx) = sync_channel::<i32>(0);
96     let tx2 = tx.clone();
97     let _t = thread::spawn(move || {
98         rx.recv().unwrap();
99     });
100     while tx.send(1).is_ok() && tx2.send(1).is_ok() {}
101 }
102 
103 #[test]
104 fn smoke_chan_gone() {
105     let (tx, rx) = sync_channel::<i32>(0);
106     drop(tx);
107     assert!(rx.recv().is_err());
108 }
109 
110 #[test]
111 fn smoke_chan_gone_shared() {
112     let (tx, rx) = sync_channel::<()>(0);
113     let tx2 = tx.clone();
114     drop(tx);
115     drop(tx2);
116     assert!(rx.recv().is_err());
117 }
118 
119 #[test]
120 fn chan_gone_concurrent() {
121     let (tx, rx) = sync_channel::<i32>(0);
122     thread::spawn(move || {
123         tx.send(1).unwrap();
124         tx.send(1).unwrap();
125     });
126     while rx.recv().is_ok() {}
127 }
128 
129 #[test]
130 fn stress() {
131     let count = if cfg!(miri) { 100 } else { 10000 };
132     let (tx, rx) = sync_channel::<i32>(0);
133     thread::spawn(move || {
134         for _ in 0..count {
135             tx.send(1).unwrap();
136         }
137     });
138     for _ in 0..count {
139         assert_eq!(rx.recv().unwrap(), 1);
140     }
141 }
142 
143 #[test]
144 fn stress_recv_timeout_two_threads() {
145     let count = if cfg!(miri) { 100 } else { 10000 };
146     let (tx, rx) = sync_channel::<i32>(0);
147 
148     thread::spawn(move || {
149         for _ in 0..count {
150             tx.send(1).unwrap();
151         }
152     });
153 
154     let mut recv_count = 0;
155     loop {
156         match rx.recv_timeout(Duration::from_millis(1)) {
157             Ok(v) => {
158                 assert_eq!(v, 1);
159                 recv_count += 1;
160             }
161             Err(RecvTimeoutError::Timeout) => continue,
162             Err(RecvTimeoutError::Disconnected) => break,
163         }
164     }
165 
166     assert_eq!(recv_count, count);
167 }
168 
169 #[test]
170 fn stress_recv_timeout_shared() {
171     const AMT: u32 = if cfg!(miri) { 100 } else { 1000 };
172     const NTHREADS: u32 = 8;
173     let (tx, rx) = sync_channel::<i32>(0);
174     let (dtx, drx) = sync_channel::<()>(0);
175 
176     thread::spawn(move || {
177         let mut recv_count = 0;
178         loop {
179             match rx.recv_timeout(Duration::from_millis(10)) {
180                 Ok(v) => {
181                     assert_eq!(v, 1);
182                     recv_count += 1;
183                 }
184                 Err(RecvTimeoutError::Timeout) => continue,
185                 Err(RecvTimeoutError::Disconnected) => break,
186             }
187         }
188 
189         assert_eq!(recv_count, AMT * NTHREADS);
190         assert!(rx.try_recv().is_err());
191 
192         dtx.send(()).unwrap();
193     });
194 
195     for _ in 0..NTHREADS {
196         let tx = tx.clone();
197         thread::spawn(move || {
198             for _ in 0..AMT {
199                 tx.send(1).unwrap();
200             }
201         });
202     }
203 
204     drop(tx);
205 
206     drx.recv().unwrap();
207 }
208 
209 #[test]
210 fn stress_shared() {
211     const AMT: u32 = if cfg!(miri) { 100 } else { 1000 };
212     const NTHREADS: u32 = 8;
213     let (tx, rx) = sync_channel::<i32>(0);
214     let (dtx, drx) = sync_channel::<()>(0);
215 
216     thread::spawn(move || {
217         for _ in 0..AMT * NTHREADS {
218             assert_eq!(rx.recv().unwrap(), 1);
219         }
220         match rx.try_recv() {
221             Ok(..) => panic!(),
222             _ => {}
223         }
224         dtx.send(()).unwrap();
225     });
226 
227     for _ in 0..NTHREADS {
228         let tx = tx.clone();
229         thread::spawn(move || {
230             for _ in 0..AMT {
231                 tx.send(1).unwrap();
232             }
233         });
234     }
235     drop(tx);
236     drx.recv().unwrap();
237 }
238 
239 #[test]
240 fn oneshot_single_thread_close_port_first() {
241     // Simple test of closing without sending
242     let (_tx, rx) = sync_channel::<i32>(0);
243     drop(rx);
244 }
245 
246 #[test]
247 fn oneshot_single_thread_close_chan_first() {
248     // Simple test of closing without sending
249     let (tx, _rx) = sync_channel::<i32>(0);
250     drop(tx);
251 }
252 
253 #[test]
254 fn oneshot_single_thread_send_port_close() {
255     // Testing that the sender cleans up the payload if receiver is closed
256     let (tx, rx) = sync_channel::<Box<i32>>(0);
257     drop(rx);
258     assert!(tx.send(Box::new(0)).is_err());
259 }
260 
261 #[test]
262 fn oneshot_single_thread_recv_chan_close() {
263     // Receiving on a closed chan will panic
264     let res = thread::spawn(move || {
265         let (tx, rx) = sync_channel::<i32>(0);
266         drop(tx);
267         rx.recv().unwrap();
268     })
269     .join();
270     // What is our res?
271     assert!(res.is_err());
272 }
273 
274 #[test]
275 fn oneshot_single_thread_send_then_recv() {
276     let (tx, rx) = sync_channel::<Box<i32>>(1);
277     tx.send(Box::new(10)).unwrap();
278     assert!(*rx.recv().unwrap() == 10);
279 }
280 
281 #[test]
282 fn oneshot_single_thread_try_send_open() {
283     let (tx, rx) = sync_channel::<i32>(1);
284     assert_eq!(tx.try_send(10), Ok(()));
285     assert!(rx.recv().unwrap() == 10);
286 }
287 
288 #[test]
289 fn oneshot_single_thread_try_send_closed() {
290     let (tx, rx) = sync_channel::<i32>(0);
291     drop(rx);
292     assert_eq!(tx.try_send(10), Err(TrySendError::Disconnected(10)));
293 }
294 
295 #[test]
296 fn oneshot_single_thread_try_send_closed2() {
297     let (tx, _rx) = sync_channel::<i32>(0);
298     assert_eq!(tx.try_send(10), Err(TrySendError::Full(10)));
299 }
300 
301 #[test]
302 fn oneshot_single_thread_try_recv_open() {
303     let (tx, rx) = sync_channel::<i32>(1);
304     tx.send(10).unwrap();
305     assert!(rx.recv() == Ok(10));
306 }
307 
308 #[test]
309 fn oneshot_single_thread_try_recv_closed() {
310     let (tx, rx) = sync_channel::<i32>(0);
311     drop(tx);
312     assert!(rx.recv().is_err());
313 }
314 
315 #[test]
316 fn oneshot_single_thread_try_recv_closed_with_data() {
317     let (tx, rx) = sync_channel::<i32>(1);
318     tx.send(10).unwrap();
319     drop(tx);
320     assert_eq!(rx.try_recv(), Ok(10));
321     assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
322 }
323 
324 #[test]
325 fn oneshot_single_thread_peek_data() {
326     let (tx, rx) = sync_channel::<i32>(1);
327     assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
328     tx.send(10).unwrap();
329     assert_eq!(rx.try_recv(), Ok(10));
330 }
331 
332 #[test]
333 fn oneshot_single_thread_peek_close() {
334     let (tx, rx) = sync_channel::<i32>(0);
335     drop(tx);
336     assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
337     assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
338 }
339 
340 #[test]
341 fn oneshot_single_thread_peek_open() {
342     let (_tx, rx) = sync_channel::<i32>(0);
343     assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
344 }
345 
346 #[test]
347 fn oneshot_multi_task_recv_then_send() {
348     let (tx, rx) = sync_channel::<Box<i32>>(0);
349     let _t = thread::spawn(move || {
350         assert!(*rx.recv().unwrap() == 10);
351     });
352 
353     tx.send(Box::new(10)).unwrap();
354 }
355 
356 #[test]
357 fn oneshot_multi_task_recv_then_close() {
358     let (tx, rx) = sync_channel::<Box<i32>>(0);
359     let _t = thread::spawn(move || {
360         drop(tx);
361     });
362     let res = thread::spawn(move || {
363         assert!(*rx.recv().unwrap() == 10);
364     })
365     .join();
366     assert!(res.is_err());
367 }
368 
369 #[test]
370 fn oneshot_multi_thread_close_stress() {
371     for _ in 0..stress_factor() {
372         let (tx, rx) = sync_channel::<i32>(0);
373         let _t = thread::spawn(move || {
374             drop(rx);
375         });
376         drop(tx);
377     }
378 }
379 
380 #[test]
381 fn oneshot_multi_thread_send_close_stress() {
382     for _ in 0..stress_factor() {
383         let (tx, rx) = sync_channel::<i32>(0);
384         let _t = thread::spawn(move || {
385             drop(rx);
386         });
387         let _ = thread::spawn(move || {
388             tx.send(1).unwrap();
389         })
390         .join();
391     }
392 }
393 
394 #[test]
395 fn oneshot_multi_thread_recv_close_stress() {
396     for _ in 0..stress_factor() {
397         let (tx, rx) = sync_channel::<i32>(0);
398         let _t = thread::spawn(move || {
399             let res = thread::spawn(move || {
400                 rx.recv().unwrap();
401             })
402             .join();
403             assert!(res.is_err());
404         });
405         let _t = thread::spawn(move || {
406             thread::spawn(move || {
407                 drop(tx);
408             });
409         });
410     }
411 }
412 
413 #[test]
414 fn oneshot_multi_thread_send_recv_stress() {
415     for _ in 0..stress_factor() {
416         let (tx, rx) = sync_channel::<Box<i32>>(0);
417         let _t = thread::spawn(move || {
418             tx.send(Box::new(10)).unwrap();
419         });
420         assert!(*rx.recv().unwrap() == 10);
421     }
422 }
423 
424 #[test]
425 fn stream_send_recv_stress() {
426     for _ in 0..stress_factor() {
427         let (tx, rx) = sync_channel::<Box<i32>>(0);
428 
429         send(tx, 0);
430         recv(rx, 0);
431 
432         fn send(tx: SyncSender<Box<i32>>, i: i32) {
433             if i == 10 {
434                 return;
435             }
436 
437             thread::spawn(move || {
438                 tx.send(Box::new(i)).unwrap();
439                 send(tx, i + 1);
440             });
441         }
442 
443         fn recv(rx: Receiver<Box<i32>>, i: i32) {
444             if i == 10 {
445                 return;
446             }
447 
448             thread::spawn(move || {
449                 assert!(*rx.recv().unwrap() == i);
450                 recv(rx, i + 1);
451             });
452         }
453     }
454 }
455 
456 #[test]
457 fn recv_a_lot() {
458     let count = if cfg!(miri) { 1000 } else { 10000 };
459     // Regression test that we don't run out of stack in scheduler context
460     let (tx, rx) = sync_channel(count);
461     for _ in 0..count {
462         tx.send(()).unwrap();
463     }
464     for _ in 0..count {
465         rx.recv().unwrap();
466     }
467 }
468 
469 #[test]
470 fn shared_chan_stress() {
471     let (tx, rx) = sync_channel(0);
472     let total = stress_factor() + 100;
473     for _ in 0..total {
474         let tx = tx.clone();
475         thread::spawn(move || {
476             tx.send(()).unwrap();
477         });
478     }
479 
480     for _ in 0..total {
481         rx.recv().unwrap();
482     }
483 }
484 
485 #[test]
486 fn test_nested_recv_iter() {
487     let (tx, rx) = sync_channel::<i32>(0);
488     let (total_tx, total_rx) = sync_channel::<i32>(0);
489 
490     let _t = thread::spawn(move || {
491         let mut acc = 0;
492         for x in rx.iter() {
493             acc += x;
494         }
495         total_tx.send(acc).unwrap();
496     });
497 
498     tx.send(3).unwrap();
499     tx.send(1).unwrap();
500     tx.send(2).unwrap();
501     drop(tx);
502     assert_eq!(total_rx.recv().unwrap(), 6);
503 }
504 
505 #[test]
506 fn test_recv_iter_break() {
507     let (tx, rx) = sync_channel::<i32>(0);
508     let (count_tx, count_rx) = sync_channel(0);
509 
510     let _t = thread::spawn(move || {
511         let mut count = 0;
512         for x in rx.iter() {
513             if count >= 3 {
514                 break;
515             } else {
516                 count += x;
517             }
518         }
519         count_tx.send(count).unwrap();
520     });
521 
522     tx.send(2).unwrap();
523     tx.send(2).unwrap();
524     tx.send(2).unwrap();
525     let _ = tx.try_send(2);
526     drop(tx);
527     assert_eq!(count_rx.recv().unwrap(), 4);
528 }
529 
530 #[test]
531 fn try_recv_states() {
532     let (tx1, rx1) = sync_channel::<i32>(1);
533     let (tx2, rx2) = sync_channel::<()>(1);
534     let (tx3, rx3) = sync_channel::<()>(1);
535     let _t = thread::spawn(move || {
536         rx2.recv().unwrap();
537         tx1.send(1).unwrap();
538         tx3.send(()).unwrap();
539         rx2.recv().unwrap();
540         drop(tx1);
541         tx3.send(()).unwrap();
542     });
543 
544     assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
545     tx2.send(()).unwrap();
546     rx3.recv().unwrap();
547     assert_eq!(rx1.try_recv(), Ok(1));
548     assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
549     tx2.send(()).unwrap();
550     rx3.recv().unwrap();
551     assert_eq!(rx1.try_recv(), Err(TryRecvError::Disconnected));
552 }
553 
554 // This bug used to end up in a livelock inside of the Receiver destructor
555 // because the internal state of the Shared packet was corrupted
556 #[test]
557 fn destroy_upgraded_shared_port_when_sender_still_active() {
558     let (tx, rx) = sync_channel::<()>(0);
559     let (tx2, rx2) = sync_channel::<()>(0);
560     let _t = thread::spawn(move || {
561         rx.recv().unwrap(); // wait on a oneshot
562         drop(rx); // destroy a shared
563         tx2.send(()).unwrap();
564     });
565     // make sure the other thread has gone to sleep
566     for _ in 0..5000 {
567         thread::yield_now();
568     }
569 
570     // upgrade to a shared chan and send a message
571     let t = tx.clone();
572     drop(tx);
573     t.send(()).unwrap();
574 
575     // wait for the child thread to exit before we exit
576     rx2.recv().unwrap();
577 }
578 
579 #[test]
580 fn send1() {
581     let (tx, rx) = sync_channel::<i32>(0);
582     let _t = thread::spawn(move || {
583         rx.recv().unwrap();
584     });
585     assert_eq!(tx.send(1), Ok(()));
586 }
587 
588 #[test]
589 fn send2() {
590     let (tx, rx) = sync_channel::<i32>(0);
591     let _t = thread::spawn(move || {
592         drop(rx);
593     });
594     assert!(tx.send(1).is_err());
595 }
596 
597 #[test]
598 fn send3() {
599     let (tx, rx) = sync_channel::<i32>(1);
600     assert_eq!(tx.send(1), Ok(()));
601     let _t = thread::spawn(move || {
602         drop(rx);
603     });
604     assert!(tx.send(1).is_err());
605 }
606 
607 #[test]
608 fn send4() {
609     let (tx, rx) = sync_channel::<i32>(0);
610     let tx2 = tx.clone();
611     let (done, donerx) = channel();
612     let done2 = done.clone();
613     let _t = thread::spawn(move || {
614         assert!(tx.send(1).is_err());
615         done.send(()).unwrap();
616     });
617     let _t = thread::spawn(move || {
618         assert!(tx2.send(2).is_err());
619         done2.send(()).unwrap();
620     });
621     drop(rx);
622     donerx.recv().unwrap();
623     donerx.recv().unwrap();
624 }
625 
626 #[test]
627 fn try_send1() {
628     let (tx, _rx) = sync_channel::<i32>(0);
629     assert_eq!(tx.try_send(1), Err(TrySendError::Full(1)));
630 }
631 
632 #[test]
633 fn try_send2() {
634     let (tx, _rx) = sync_channel::<i32>(1);
635     assert_eq!(tx.try_send(1), Ok(()));
636     assert_eq!(tx.try_send(1), Err(TrySendError::Full(1)));
637 }
638 
639 #[test]
640 fn try_send3() {
641     let (tx, rx) = sync_channel::<i32>(1);
642     assert_eq!(tx.try_send(1), Ok(()));
643     drop(rx);
644     assert_eq!(tx.try_send(1), Err(TrySendError::Disconnected(1)));
645 }
646 
647 #[test]
648 fn issue_15761() {
649     fn repro() {
650         let (tx1, rx1) = sync_channel::<()>(3);
651         let (tx2, rx2) = sync_channel::<()>(3);
652 
653         let _t = thread::spawn(move || {
654             rx1.recv().unwrap();
655             tx2.try_send(()).unwrap();
656         });
657 
658         tx1.try_send(()).unwrap();
659         rx2.recv().unwrap();
660     }
661 
662     for _ in 0..100 {
663         repro()
664     }
665 }
666 
667 #[test]
668 fn drop_unreceived() {
669     let (tx, rx) = sync_channel::<Rc<()>>(1);
670     let msg = Rc::new(());
671     let weak = Rc::downgrade(&msg);
672     assert!(tx.send(msg).is_ok());
673     drop(rx);
674     // Messages should be dropped immediately when the last receiver is destroyed.
675     assert!(weak.upgrade().is_none());
676     drop(tx);
677 }
678