mpsc_async.rs 1.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445
  1. use thingbuf::mpsc;
  2. #[tokio::test(flavor = "multi_thread")]
  3. async fn basically_works() {
  4. use std::collections::HashSet;
  5. const N_SENDS: usize = 10;
  6. const N_PRODUCERS: usize = 10;
  7. async fn do_producer(tx: mpsc::Sender<usize>, n: usize) {
  8. let tag = n * N_SENDS;
  9. for i in 0..N_SENDS {
  10. let msg = i + tag;
  11. println!("sending {}...", msg);
  12. tx.send(msg).await.unwrap();
  13. println!("sent {}!", msg);
  14. }
  15. println!("PRODUCER {} DONE!", n);
  16. }
  17. let (tx, rx) = mpsc::channel(N_SENDS / 2);
  18. for n in 0..N_PRODUCERS {
  19. tokio::spawn(do_producer(tx.clone(), n));
  20. }
  21. drop(tx);
  22. let mut results = HashSet::new();
  23. while let Some(val) = {
  24. println!("receiving...");
  25. rx.recv().await
  26. } {
  27. println!("received {}!", val);
  28. results.insert(val);
  29. }
  30. let results = dbg!(results);
  31. for n in 0..N_PRODUCERS {
  32. let tag = n * N_SENDS;
  33. for i in 0..N_SENDS {
  34. let msg = i + tag;
  35. assert!(results.contains(&msg), "missing message {:?}", msg);
  36. }
  37. }
  38. }