mpsc_async.rs 9.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354
  1. use super::*;
  2. use crate::loom::{self, alloc::Track, future, thread};
  3. #[test]
  4. #[cfg_attr(ci_skip_slow_models, ignore)]
  5. fn mpsc_try_send_recv() {
  6. loom::model(|| {
  7. let (tx, rx) = channel(3);
  8. let p1 = {
  9. let tx = tx.clone();
  10. thread::spawn(move || {
  11. *tx.try_send_ref().unwrap() = Track::new(1);
  12. })
  13. };
  14. let p2 = thread::spawn(move || {
  15. *tx.try_send_ref().unwrap() = Track::new(2);
  16. *tx.try_send_ref().unwrap() = Track::new(3);
  17. });
  18. let mut vals = future::block_on(async move {
  19. let mut vals = Vec::new();
  20. while let Some(val) = rx.recv_ref().await {
  21. vals.push(*val.get_ref());
  22. }
  23. vals
  24. });
  25. vals.sort_unstable();
  26. assert_eq!(vals, vec![1, 2, 3]);
  27. p1.join().unwrap();
  28. p2.join().unwrap();
  29. })
  30. }
  31. #[test]
  32. fn rx_closes() {
  33. const ITERATIONS: usize = 6;
  34. loom::model(|| {
  35. let (tx, rx) = channel(ITERATIONS / 2);
  36. let producer = thread::spawn(move || {
  37. 'iters: for i in 0..=ITERATIONS {
  38. test_println!("sending {}...", i);
  39. 'send: loop {
  40. match tx.try_send_ref() {
  41. Ok(mut slot) => {
  42. *slot = i;
  43. break 'send;
  44. }
  45. Err(TrySendError::Full(_)) => thread::yield_now(),
  46. Err(TrySendError::Closed(_)) => break 'iters,
  47. }
  48. }
  49. test_println!("sent {}\n", i);
  50. }
  51. });
  52. future::block_on(async move {
  53. for i in 0..ITERATIONS - 1 {
  54. test_println!("receiving {}...", i);
  55. let n = rx.recv().await;
  56. test_println!("recv {:?}\n", n);
  57. assert_eq!(n, Some(i));
  58. }
  59. });
  60. producer.join().unwrap();
  61. })
  62. }
  63. #[test]
  64. fn rx_close_unconsumed_spsc() {
  65. // Tests that messages that have not been consumed by the receiver are
  66. // dropped when dropping the channel.
  67. const MESSAGES: usize = 4;
  68. loom::model(|| {
  69. let (tx, rx) = channel(MESSAGES);
  70. let consumer = thread::spawn(move || {
  71. future::block_on(async move {
  72. // recieve one message
  73. let msg = rx.recv().await;
  74. test_println!("recv {:?}", msg);
  75. assert!(msg.is_some());
  76. // drop the receiver...
  77. })
  78. });
  79. future::block_on(async move {
  80. let mut i = 1;
  81. while let Ok(mut slot) = tx.send_ref().await {
  82. test_println!("producer sending {}...", i);
  83. *slot = Track::new(i);
  84. i += 1;
  85. }
  86. });
  87. consumer.join().unwrap();
  88. })
  89. }
  90. #[test]
  91. #[ignore] // This is marked as `ignore` because it takes over an hour to run.
  92. fn rx_close_unconsumed_mpsc() {
  93. const MESSAGES: usize = 2;
  94. async fn do_producer(tx: Sender<Track<i32>>, n: usize) {
  95. let mut i = 1;
  96. while let Ok(mut slot) = tx.send_ref().await {
  97. test_println!("producer {} sending {}...", n, i);
  98. *slot = Track::new(i);
  99. i += 1;
  100. }
  101. }
  102. loom::model(|| {
  103. let (tx, rx) = channel(MESSAGES);
  104. let consumer = thread::spawn(move || {
  105. future::block_on(async move {
  106. // recieve one message
  107. let msg = rx.recv().await;
  108. test_println!("recv {:?}", msg);
  109. assert!(msg.is_some());
  110. // drop the receiver...
  111. })
  112. });
  113. let producer = {
  114. let tx = tx.clone();
  115. thread::spawn(move || future::block_on(do_producer(tx, 1)))
  116. };
  117. future::block_on(do_producer(tx, 2));
  118. producer.join().unwrap();
  119. consumer.join().unwrap();
  120. })
  121. }
  122. #[test]
  123. fn spsc_recv_then_send() {
  124. loom::model(|| {
  125. let (tx, rx) = channel::<i32>(4);
  126. let consumer = thread::spawn(move || {
  127. future::block_on(async move {
  128. assert_eq!(rx.recv().await.unwrap(), 10);
  129. })
  130. });
  131. tx.try_send(10).unwrap();
  132. consumer.join().unwrap();
  133. })
  134. }
  135. #[test]
  136. fn spsc_recv_then_close() {
  137. loom::model(|| {
  138. let (tx, rx) = channel::<i32>(4);
  139. let producer = thread::spawn(move || {
  140. drop(tx);
  141. });
  142. future::block_on(async move {
  143. let recv = rx.recv().await;
  144. assert_eq!(recv, None);
  145. });
  146. producer.join().unwrap();
  147. });
  148. }
  149. #[test]
  150. fn spsc_recv_then_send_then_close() {
  151. loom::model(|| {
  152. let (tx, rx) = channel::<i32>(2);
  153. let consumer = thread::spawn(move || {
  154. future::block_on(async move {
  155. assert_eq!(rx.recv().await.unwrap(), 10);
  156. assert_eq!(rx.recv().await.unwrap(), 20);
  157. assert_eq!(rx.recv().await, None);
  158. })
  159. });
  160. tx.try_send(10).unwrap();
  161. tx.try_send(20).unwrap();
  162. drop(tx);
  163. consumer.join().unwrap();
  164. })
  165. }
  166. #[test]
  167. fn spsc_send_recv_in_order_no_wrap() {
  168. const N_SENDS: usize = 4;
  169. loom::model(|| {
  170. let (tx, rx) = channel::<usize>(N_SENDS);
  171. let consumer = thread::spawn(move || {
  172. future::block_on(async move {
  173. for i in 1..=N_SENDS {
  174. assert_eq!(rx.recv().await, Some(i));
  175. }
  176. assert_eq!(rx.recv().await, None);
  177. })
  178. });
  179. future::block_on(async move {
  180. for i in 1..=N_SENDS {
  181. tx.send(i).await.unwrap()
  182. }
  183. });
  184. consumer.join().unwrap();
  185. })
  186. }
  187. #[test]
  188. fn spsc_send_recv_in_order_wrap() {
  189. const N_SENDS: usize = 2;
  190. loom::model(|| {
  191. let (tx, rx) = channel::<usize>(N_SENDS / 2);
  192. let consumer = thread::spawn(move || {
  193. future::block_on(async move {
  194. for i in 1..=N_SENDS {
  195. assert_eq!(rx.recv().await, Some(i));
  196. }
  197. assert_eq!(rx.recv().await, None);
  198. })
  199. });
  200. future::block_on(async move {
  201. for i in 1..=N_SENDS {
  202. tx.send(i).await.unwrap()
  203. }
  204. });
  205. consumer.join().unwrap();
  206. })
  207. }
  208. #[test]
  209. #[cfg_attr(ci_skip_slow_models, ignore)]
  210. fn mpsc_send_recv_wrap() {
  211. loom::model(|| {
  212. let (tx, rx) = channel::<usize>(1);
  213. let producer1 = do_producer(tx.clone(), 10);
  214. let producer2 = do_producer(tx, 20);
  215. let results = future::block_on(async move {
  216. let mut results = Vec::new();
  217. while let Some(val) = rx.recv().await {
  218. test_println!("RECEIVED {:?}", val);
  219. results.push(val);
  220. }
  221. results
  222. });
  223. producer1.join().expect("producer 1 panicked");
  224. producer2.join().expect("producer 2 panicked");
  225. assert_eq!(results.len(), 2);
  226. assert!(
  227. results.contains(&10),
  228. "missing value from producer 1; results={:?}",
  229. results
  230. );
  231. assert!(
  232. results.contains(&20),
  233. "missing value from producer 2; results={:?}",
  234. results
  235. );
  236. })
  237. }
  238. #[test]
  239. fn mpsc_send_recv_no_wrap() {
  240. loom::model(|| {
  241. let (tx, rx) = channel::<usize>(2);
  242. let producer1 = do_producer(tx.clone(), 10);
  243. let producer2 = do_producer(tx, 20);
  244. let results = future::block_on(async move {
  245. let mut results = Vec::new();
  246. while let Some(val) = rx.recv().await {
  247. test_println!("RECEIVED {:?}", val);
  248. results.push(val);
  249. }
  250. results
  251. });
  252. producer1.join().expect("producer 1 panicked");
  253. producer2.join().expect("producer 2 panicked");
  254. assert_eq!(results.len(), 2);
  255. assert!(
  256. results.contains(&10),
  257. "missing value from producer 1; results={:?}",
  258. results
  259. );
  260. assert!(
  261. results.contains(&20),
  262. "missing value from producer 2; results={:?}",
  263. results
  264. );
  265. })
  266. }
  267. fn do_producer(tx: Sender<usize>, tag: usize) -> thread::JoinHandle<()> {
  268. thread::spawn(move || {
  269. future::block_on(async move {
  270. test_println!("SENDING {:?}", tag);
  271. tx.send(tag).await.unwrap();
  272. test_println!("SENT {:?}", tag);
  273. })
  274. })
  275. }
  276. #[test]
  277. fn tx_close_wakes() {
  278. loom::model(|| {
  279. let (tx, rx) = channel::<i32>(2);
  280. let consumer = thread::spawn(move || {
  281. future::block_on(async move {
  282. assert_eq!(rx.recv().await, None);
  283. })
  284. });
  285. drop(tx);
  286. consumer.join().unwrap();
  287. });
  288. }
  289. #[test]
  290. fn tx_close_drains_queue() {
  291. const LEN: usize = 4;
  292. loom::model(|| {
  293. let (tx, rx) = channel(LEN);
  294. let producer = thread::spawn(move || {
  295. future::block_on(async move {
  296. for i in 0..LEN {
  297. tx.send(i).await.unwrap();
  298. }
  299. })
  300. });
  301. future::block_on(async move {
  302. for i in 0..LEN {
  303. assert_eq!(rx.recv().await, Some(i))
  304. }
  305. });
  306. producer.join().unwrap();
  307. });
  308. }