mpsc_blocking.rs 1.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172
  1. use std::thread;
  2. use thingbuf::mpsc::blocking;
  3. #[test]
  4. fn basically_works() {
  5. use std::collections::HashSet;
  6. const N_SENDS: usize = 10;
  7. const N_PRODUCERS: usize = 10;
  8. fn start_producer(tx: blocking::Sender<usize>, n: usize) -> thread::JoinHandle<()> {
  9. let tag = n * N_SENDS;
  10. thread::Builder::new()
  11. .name(format!("producer {}", n))
  12. .spawn(move || {
  13. for i in 0..N_SENDS {
  14. let msg = i + tag;
  15. println!("[producer {}] sending {}...", n, msg);
  16. tx.send(msg).unwrap();
  17. println!("[producer {}] sent {}!", n, msg);
  18. }
  19. println!("[producer {}] DONE!", n);
  20. })
  21. .expect("spawning threads should succeed")
  22. }
  23. let (tx, rx) = blocking::channel(N_SENDS / 2);
  24. for n in 0..N_PRODUCERS {
  25. start_producer(tx.clone(), n);
  26. }
  27. drop(tx);
  28. let mut results = HashSet::new();
  29. while let Some(val) = {
  30. println!("receiving...");
  31. rx.recv()
  32. } {
  33. println!("received {}!", val);
  34. results.insert(val);
  35. }
  36. let results = dbg!(results);
  37. for n in 0..N_PRODUCERS {
  38. let tag = n * N_SENDS;
  39. for i in 0..N_SENDS {
  40. let msg = i + tag;
  41. assert!(results.contains(&msg), "missing message {:?}", msg);
  42. }
  43. }
  44. }
  45. #[test]
  46. fn tx_close_drains_queue() {
  47. const LEN: usize = 4;
  48. for i in 0..10000 {
  49. println!("\n\n--- iteration {} ---\n\n", i);
  50. let (tx, rx) = blocking::channel(LEN);
  51. let producer = thread::spawn(move || {
  52. for i in 0..LEN {
  53. tx.send(i).unwrap();
  54. }
  55. });
  56. for i in 0..LEN {
  57. assert_eq!(rx.recv(), Some(i))
  58. }
  59. producer.join().unwrap();
  60. }
  61. }