mpsc_async.rs 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286
  1. use super::*;
  2. use crate::{
  3. loom::{self, future, thread},
  4. ThingBuf,
  5. };
  6. #[test]
  7. #[cfg_attr(ci_skip_slow_models, ignore)]
  8. fn mpsc_try_send_recv() {
  9. loom::model(|| {
  10. let (tx, rx) = channel(3);
  11. let p1 = {
  12. let tx = tx.clone();
  13. thread::spawn(move || {
  14. *tx.try_send_ref().unwrap() = 1;
  15. })
  16. };
  17. let p2 = thread::spawn(move || {
  18. *tx.try_send_ref().unwrap() = 2;
  19. *tx.try_send_ref().unwrap() = 3;
  20. });
  21. let mut vals = future::block_on(async move {
  22. let mut vals = Vec::new();
  23. while let Some(val) = rx.recv_ref().await {
  24. vals.push(*val);
  25. }
  26. vals
  27. });
  28. vals.sort_unstable();
  29. assert_eq!(vals, vec![1, 2, 3]);
  30. p1.join().unwrap();
  31. p2.join().unwrap();
  32. })
  33. }
  34. #[test]
  35. fn rx_closes() {
  36. const ITERATIONS: usize = 6;
  37. loom::model(|| {
  38. let (tx, rx) = channel(ITERATIONS / 2);
  39. let producer = thread::spawn(move || {
  40. 'iters: for i in 0..=ITERATIONS {
  41. test_println!("sending {}...", i);
  42. 'send: loop {
  43. match tx.try_send_ref() {
  44. Ok(mut slot) => {
  45. *slot = i;
  46. break 'send;
  47. }
  48. Err(TrySendError::Full(_)) => thread::yield_now(),
  49. Err(TrySendError::Closed(_)) => break 'iters,
  50. }
  51. }
  52. test_println!("sent {}\n", i);
  53. }
  54. });
  55. future::block_on(async move {
  56. for i in 0..ITERATIONS - 1 {
  57. test_println!("receiving {}...", i);
  58. let n = rx.recv().await;
  59. test_println!("recv {:?}\n", n);
  60. assert_eq!(n, Some(i));
  61. }
  62. });
  63. producer.join().unwrap();
  64. })
  65. }
  66. #[test]
  67. fn spsc_recv_then_send() {
  68. loom::model(|| {
  69. let (tx, rx) = channel::<i32>(4);
  70. let consumer = thread::spawn(move || {
  71. future::block_on(async move {
  72. assert_eq!(rx.recv().await.unwrap(), 10);
  73. })
  74. });
  75. tx.try_send(10).unwrap();
  76. consumer.join().unwrap();
  77. })
  78. }
  79. #[test]
  80. fn spsc_recv_then_close() {
  81. loom::model(|| {
  82. let (tx, rx) = channel::<i32>(4);
  83. let producer = thread::spawn(move || {
  84. drop(tx);
  85. });
  86. future::block_on(async move {
  87. let recv = rx.recv().await;
  88. assert_eq!(recv, None);
  89. });
  90. producer.join().unwrap();
  91. });
  92. }
  93. #[test]
  94. fn spsc_recv_then_send_then_close() {
  95. loom::model(|| {
  96. let (tx, rx) = channel::<i32>(2);
  97. let consumer = thread::spawn(move || {
  98. future::block_on(async move {
  99. assert_eq!(rx.recv().await.unwrap(), 10);
  100. assert_eq!(rx.recv().await.unwrap(), 20);
  101. assert_eq!(rx.recv().await, None);
  102. })
  103. });
  104. tx.try_send(10).unwrap();
  105. tx.try_send(20).unwrap();
  106. drop(tx);
  107. consumer.join().unwrap();
  108. })
  109. }
  110. #[test]
  111. fn spsc_send_recv_in_order_no_wrap() {
  112. const N_SENDS: usize = 4;
  113. loom::model(|| {
  114. let (tx, rx) = channel::<usize>(N_SENDS);
  115. let consumer = thread::spawn(move || {
  116. future::block_on(async move {
  117. for i in 1..=N_SENDS {
  118. assert_eq!(rx.recv().await, Some(i));
  119. }
  120. assert_eq!(rx.recv().await, None);
  121. })
  122. });
  123. future::block_on(async move {
  124. for i in 1..=N_SENDS {
  125. tx.send(i).await.unwrap()
  126. }
  127. });
  128. consumer.join().unwrap();
  129. })
  130. }
  131. #[test]
  132. fn spsc_send_recv_in_order_wrap() {
  133. const N_SENDS: usize = 2;
  134. loom::model(|| {
  135. let (tx, rx) = channel::<usize>(N_SENDS / 2);
  136. let consumer = thread::spawn(move || {
  137. future::block_on(async move {
  138. for i in 1..=N_SENDS {
  139. assert_eq!(rx.recv().await, Some(i));
  140. }
  141. assert_eq!(rx.recv().await, None);
  142. })
  143. });
  144. future::block_on(async move {
  145. for i in 1..=N_SENDS {
  146. tx.send(i).await.unwrap()
  147. }
  148. });
  149. consumer.join().unwrap();
  150. })
  151. }
  152. #[test]
  153. #[cfg_attr(ci_skip_slow_models, ignore)]
  154. fn mpsc_send_recv_wrap() {
  155. loom::model(|| {
  156. let (tx, rx) = channel::<usize>(1);
  157. let producer1 = do_producer(tx.clone(), 10);
  158. let producer2 = do_producer(tx, 20);
  159. let results = future::block_on(async move {
  160. let mut results = Vec::new();
  161. while let Some(val) = rx.recv().await {
  162. test_println!("RECEIVED {:?}", val);
  163. results.push(val);
  164. }
  165. results
  166. });
  167. producer1.join().expect("producer 1 panicked");
  168. producer2.join().expect("producer 2 panicked");
  169. assert_eq!(results.len(), 2);
  170. assert!(
  171. results.contains(&10),
  172. "missing value from producer 1; results={:?}",
  173. results
  174. );
  175. assert!(
  176. results.contains(&20),
  177. "missing value from producer 2; results={:?}",
  178. results
  179. );
  180. })
  181. }
  182. #[test]
  183. fn mpsc_send_recv_no_wrap() {
  184. loom::model(|| {
  185. let (tx, rx) = channel::<usize>(2);
  186. let producer1 = do_producer(tx.clone(), 10);
  187. let producer2 = do_producer(tx, 20);
  188. let results = future::block_on(async move {
  189. let mut results = Vec::new();
  190. while let Some(val) = rx.recv().await {
  191. test_println!("RECEIVED {:?}", val);
  192. results.push(val);
  193. }
  194. results
  195. });
  196. producer1.join().expect("producer 1 panicked");
  197. producer2.join().expect("producer 2 panicked");
  198. assert_eq!(results.len(), 2);
  199. assert!(
  200. results.contains(&10),
  201. "missing value from producer 1; results={:?}",
  202. results
  203. );
  204. assert!(
  205. results.contains(&20),
  206. "missing value from producer 2; results={:?}",
  207. results
  208. );
  209. })
  210. }
  211. fn do_producer(tx: Sender<usize>, tag: usize) -> thread::JoinHandle<()> {
  212. thread::spawn(move || {
  213. future::block_on(async move {
  214. test_println!("SENDING {:?}", tag);
  215. tx.send(tag).await.unwrap();
  216. test_println!("SENT {:?}", tag);
  217. })
  218. })
  219. }
  220. #[test]
  221. fn tx_close_wakes() {
  222. loom::model(|| {
  223. let (tx, rx) = channel::<i32>(2);
  224. let consumer = thread::spawn(move || {
  225. future::block_on(async move {
  226. assert_eq!(rx.recv().await, None);
  227. })
  228. });
  229. drop(tx);
  230. consumer.join().unwrap();
  231. });
  232. }
  233. #[test]
  234. fn tx_close_drains_queue() {
  235. const LEN: usize = 4;
  236. loom::model(|| {
  237. let (tx, rx) = channel(LEN);
  238. let producer = thread::spawn(move || {
  239. future::block_on(async move {
  240. for i in 0..LEN {
  241. tx.send(i).await.unwrap();
  242. }
  243. })
  244. });
  245. future::block_on(async move {
  246. for i in 0..LEN {
  247. assert_eq!(rx.recv().await, Some(i))
  248. }
  249. });
  250. producer.join().unwrap();
  251. });
  252. }