static_storage.rs 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196
  1. #![cfg(feature = "static")]
  2. use std::{
  3. fmt::Write,
  4. sync::atomic::{AtomicBool, Ordering},
  5. thread,
  6. };
  7. use thingbuf::{recycling, StaticThingBuf};
  8. #[test]
  9. fn static_storage_thingbuf() {
  10. static BUF: StaticThingBuf<i32, 16> = StaticThingBuf::new();
  11. static PRODUCER_LIVE: AtomicBool = AtomicBool::new(true);
  12. let producer = thread::spawn(move || {
  13. for i in 0..32 {
  14. let mut thing = 'write: loop {
  15. match BUF.push_ref() {
  16. Ok(thing) => break 'write thing,
  17. _ => thread::yield_now(),
  18. }
  19. };
  20. thing.with_mut(|thing| *thing = i);
  21. }
  22. PRODUCER_LIVE.store(false, Ordering::Release);
  23. });
  24. let mut i = 0;
  25. // While the producer is writing to the queue, push each entry to the
  26. // results string.
  27. while PRODUCER_LIVE.load(Ordering::Acquire) {
  28. match BUF.pop_ref() {
  29. Some(thing) => thing.with(|thing| {
  30. assert_eq!(*thing, i);
  31. i += 1;
  32. }),
  33. None => thread::yield_now(),
  34. }
  35. }
  36. producer.join().unwrap();
  37. // drain the queue.
  38. while let Some(thing) = BUF.pop_ref() {
  39. thing.with(|thing| {
  40. assert_eq!(*thing, i);
  41. i += 1;
  42. })
  43. }
  44. }
  45. #[test]
  46. fn static_storage_stringbuf() {
  47. use recycling::WithCapacity;
  48. static BUF: StaticThingBuf<String, 8, WithCapacity> =
  49. StaticThingBuf::with_recycle(WithCapacity::new().with_max_capacity(8));
  50. static PRODUCER_LIVE: AtomicBool = AtomicBool::new(true);
  51. let producer = thread::spawn(move || {
  52. for i in 0..16 {
  53. let mut string = 'write: loop {
  54. match BUF.push_ref() {
  55. Ok(string) => break 'write string,
  56. _ => thread::yield_now(),
  57. }
  58. };
  59. write!(&mut string, "{:?}", i).unwrap();
  60. }
  61. PRODUCER_LIVE.store(false, Ordering::Release);
  62. println!("producer done");
  63. });
  64. let mut results = String::new();
  65. // While the producer is writing to the queue, push each entry to the
  66. // results string.
  67. while PRODUCER_LIVE.load(Ordering::Acquire) {
  68. if let Some(string) = BUF.pop_ref() {
  69. writeln!(results, "{}", string).unwrap();
  70. }
  71. thread::yield_now();
  72. }
  73. producer.join().unwrap();
  74. println!("producer done...");
  75. // drain the queue.
  76. while let Some(string) = BUF.pop_ref() {
  77. writeln!(results, "{}", string).unwrap();
  78. }
  79. println!("results:\n{}", results);
  80. for (n, ln) in results.lines().enumerate() {
  81. assert_eq!(ln.parse::<usize>(), Ok(n));
  82. }
  83. }
  84. #[tokio::test]
  85. async fn static_async_channel() {
  86. use std::collections::HashSet;
  87. use thingbuf::mpsc;
  88. const N_PRODUCERS: usize = 8;
  89. const N_SENDS: usize = N_PRODUCERS * 2;
  90. static CHANNEL: mpsc::StaticChannel<usize, N_PRODUCERS> = mpsc::StaticChannel::new();
  91. async fn do_producer(tx: mpsc::StaticSender<usize>, n: usize) {
  92. let tag = n * N_SENDS;
  93. for i in 0..N_SENDS {
  94. let msg = i + tag;
  95. println!("sending {}...", msg);
  96. tx.send(msg).await.unwrap();
  97. println!("sent {}!", msg);
  98. }
  99. println!("PRODUCER {} DONE!", n);
  100. }
  101. let (tx, rx) = CHANNEL.split();
  102. for n in 0..N_PRODUCERS {
  103. tokio::spawn(do_producer(tx.clone(), n));
  104. }
  105. drop(tx);
  106. let mut results = HashSet::new();
  107. while let Some(val) = {
  108. println!("receiving...");
  109. rx.recv().await
  110. } {
  111. println!("received {}!", val);
  112. results.insert(val);
  113. }
  114. let results = dbg!(results);
  115. for n in 0..N_PRODUCERS {
  116. let tag = n * N_SENDS;
  117. for i in 0..N_SENDS {
  118. let msg = i + tag;
  119. assert!(results.contains(&msg), "missing message {:?}", msg);
  120. }
  121. }
  122. }
  123. #[test]
  124. fn static_sync_channel() {
  125. use std::collections::HashSet;
  126. use thingbuf::mpsc::sync;
  127. const N_PRODUCERS: usize = 8;
  128. const N_SENDS: usize = N_PRODUCERS * 2;
  129. static CHANNEL: sync::StaticChannel<usize, N_PRODUCERS> = sync::StaticChannel::new();
  130. fn do_producer(tx: sync::StaticSender<usize>, n: usize) -> impl FnOnce() {
  131. move || {
  132. let tag = n * N_SENDS;
  133. for i in 0..N_SENDS {
  134. let msg = i + tag;
  135. println!("sending {}...", msg);
  136. tx.send(msg).unwrap();
  137. println!("sent {}!", msg);
  138. }
  139. println!("PRODUCER {} DONE!", n);
  140. }
  141. }
  142. let (tx, rx) = CHANNEL.split();
  143. for n in 0..N_PRODUCERS {
  144. std::thread::spawn(do_producer(tx.clone(), n));
  145. }
  146. drop(tx);
  147. let mut results = HashSet::new();
  148. while let Some(val) = {
  149. println!("receiving...");
  150. rx.recv()
  151. } {
  152. println!("received {}!", val);
  153. results.insert(val);
  154. }
  155. let results = dbg!(results);
  156. for n in 0..N_PRODUCERS {
  157. let tag = n * N_SENDS;
  158. for i in 0..N_SENDS {
  159. let msg = i + tag;
  160. assert!(results.contains(&msg), "missing message {:?}", msg);
  161. }
  162. }
  163. }