mpsc_async.rs 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258
  1. use super::*;
  2. use crate::{
  3. loom::{self, future, thread},
  4. ThingBuf,
  5. };
  6. #[test]
  7. fn mpsc_try_send_recv() {
  8. loom::model(|| {
  9. let (tx, rx) = channel(ThingBuf::new(3));
  10. let p1 = {
  11. let tx = tx.clone();
  12. thread::spawn(move || {
  13. tx.try_send_ref().unwrap().with_mut(|val| *val = 1);
  14. })
  15. };
  16. let p2 = thread::spawn(move || {
  17. tx.try_send(2).unwrap();
  18. tx.try_send(3).unwrap();
  19. });
  20. let mut vals = future::block_on(async move {
  21. let mut vals = Vec::new();
  22. while let Some(val) = rx.recv_ref().await {
  23. val.with(|val| vals.push(*val));
  24. }
  25. vals
  26. });
  27. vals.sort_unstable();
  28. assert_eq!(vals, vec![1, 2, 3]);
  29. p1.join().unwrap();
  30. p2.join().unwrap();
  31. })
  32. }
  33. #[test]
  34. fn rx_closes() {
  35. const ITERATIONS: usize = 6;
  36. loom::model(|| {
  37. let (tx, rx) = channel(ThingBuf::new(ITERATIONS / 2));
  38. let producer = thread::spawn(move || {
  39. 'iters: for i in 0..=ITERATIONS {
  40. test_println!("sending {}...", i);
  41. 'send: loop {
  42. match tx.try_send(i) {
  43. Ok(_) => break 'send,
  44. Err(TrySendError::Full(_)) => thread::yield_now(),
  45. Err(TrySendError::Closed(_)) => break 'iters,
  46. }
  47. }
  48. test_println!("sent {}\n", i);
  49. }
  50. });
  51. future::block_on(async move {
  52. for i in 0..ITERATIONS - 1 {
  53. test_println!("receiving {}...", i);
  54. let n = rx.recv().await;
  55. test_println!("recv {:?}\n", n);
  56. assert_eq!(n, Some(i));
  57. }
  58. });
  59. producer.join().unwrap();
  60. })
  61. }
  62. #[test]
  63. fn spsc_recv_then_send() {
  64. loom::model(|| {
  65. let (tx, rx) = channel(ThingBuf::<i32>::new(4));
  66. let consumer = thread::spawn(move || {
  67. future::block_on(async move {
  68. assert_eq!(rx.recv().await.unwrap(), 10);
  69. })
  70. });
  71. tx.try_send(10).unwrap();
  72. consumer.join().unwrap();
  73. })
  74. }
  75. #[test]
  76. fn spsc_recv_then_close() {
  77. loom::model(|| {
  78. let (tx, rx) = channel(ThingBuf::<i32>::new(4));
  79. let producer = thread::spawn(move || {
  80. drop(tx);
  81. });
  82. future::block_on(async move {
  83. let recv = rx.recv().await;
  84. assert_eq!(recv, None);
  85. });
  86. producer.join().unwrap();
  87. });
  88. }
  89. #[test]
  90. fn spsc_recv_then_send_then_close() {
  91. loom::model(|| {
  92. let (tx, rx) = channel(ThingBuf::<i32>::new(2));
  93. let consumer = thread::spawn(move || {
  94. future::block_on(async move {
  95. assert_eq!(rx.recv().await.unwrap(), 10);
  96. assert_eq!(rx.recv().await.unwrap(), 20);
  97. assert_eq!(rx.recv().await, None);
  98. })
  99. });
  100. tx.try_send(10).unwrap();
  101. tx.try_send(20).unwrap();
  102. drop(tx);
  103. consumer.join().unwrap();
  104. })
  105. }
  106. #[test]
  107. fn spsc_send_recv_in_order_no_wrap() {
  108. const N_SENDS: usize = 4;
  109. loom::model(|| {
  110. let (tx, rx) = channel(ThingBuf::<usize>::new(N_SENDS));
  111. let consumer = thread::spawn(move || {
  112. future::block_on(async move {
  113. for i in 1..=N_SENDS {
  114. assert_eq!(rx.recv().await, Some(i));
  115. }
  116. assert_eq!(rx.recv().await, None);
  117. })
  118. });
  119. future::block_on(async move {
  120. for i in 1..=N_SENDS {
  121. tx.send(i).await.unwrap()
  122. }
  123. });
  124. consumer.join().unwrap();
  125. })
  126. }
  127. #[test]
  128. fn spsc_send_recv_in_order_wrap() {
  129. const N_SENDS: usize = 2;
  130. loom::model(|| {
  131. let (tx, rx) = channel(ThingBuf::<usize>::new(N_SENDS / 2));
  132. let consumer = thread::spawn(move || {
  133. future::block_on(async move {
  134. for i in 1..=N_SENDS {
  135. assert_eq!(rx.recv().await, Some(i));
  136. }
  137. assert_eq!(rx.recv().await, None);
  138. })
  139. });
  140. future::block_on(async move {
  141. for i in 1..=N_SENDS {
  142. tx.send(i).await.unwrap()
  143. }
  144. });
  145. consumer.join().unwrap();
  146. })
  147. }
  148. #[test]
  149. fn mpsc_send_recv_wrap() {
  150. loom::model(|| {
  151. let (tx, rx) = channel(ThingBuf::<usize>::new(1));
  152. let producer1 = do_producer(tx.clone(), 10);
  153. let producer2 = do_producer(tx, 20);
  154. let results = future::block_on(async move {
  155. let mut results = Vec::new();
  156. while let Some(val) = rx.recv().await {
  157. test_println!("RECEIVED {:?}", val);
  158. results.push(val);
  159. }
  160. results
  161. });
  162. producer1.join().expect("producer 1 panicked");
  163. producer2.join().expect("producer 2 panicked");
  164. assert_eq!(results.len(), 2);
  165. assert!(
  166. results.contains(&10),
  167. "missing value from producer 1; results={:?}",
  168. results
  169. );
  170. assert!(
  171. results.contains(&20),
  172. "missing value from producer 2; results={:?}",
  173. results
  174. );
  175. })
  176. }
  177. #[test]
  178. fn mpsc_send_recv_no_wrap() {
  179. loom::model(|| {
  180. let (tx, rx) = channel(ThingBuf::<usize>::new(2));
  181. let producer1 = do_producer(tx.clone(), 10);
  182. let producer2 = do_producer(tx, 20);
  183. let results = future::block_on(async move {
  184. let mut results = Vec::new();
  185. while let Some(val) = rx.recv().await {
  186. test_println!("RECEIVED {:?}", val);
  187. results.push(val);
  188. }
  189. results
  190. });
  191. producer1.join().expect("producer 1 panicked");
  192. producer2.join().expect("producer 2 panicked");
  193. assert_eq!(results.len(), 2);
  194. assert!(
  195. results.contains(&10),
  196. "missing value from producer 1; results={:?}",
  197. results
  198. );
  199. assert!(
  200. results.contains(&20),
  201. "missing value from producer 2; results={:?}",
  202. results
  203. );
  204. })
  205. }
  206. fn do_producer(tx: Sender<usize>, tag: usize) -> thread::JoinHandle<()> {
  207. thread::spawn(move || {
  208. future::block_on(async move {
  209. test_println!("SENDING {:?}", tag);
  210. tx.send(tag).await.unwrap();
  211. test_println!("SENT {:?}", tag);
  212. })
  213. })
  214. }
  215. #[test]
  216. fn tx_close_wakes() {
  217. loom::model(|| {
  218. let (tx, rx) = channel(ThingBuf::<i32>::new(2));
  219. let consumer = thread::spawn(move || {
  220. future::block_on(async move {
  221. assert_eq!(rx.recv().await, None);
  222. })
  223. });
  224. drop(tx);
  225. consumer.join().unwrap();
  226. });
  227. }