static_storage.rs 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205
  1. #![feature(offset_of)]
  2. #![cfg(feature = "static")]
  3. use std::{
  4. fmt::Write,
  5. mem::size_of,
  6. sync::atomic::{AtomicBool, Ordering},
  7. thread,
  8. };
  9. use thingbuf::{
  10. recycling::{self, DefaultRecycle},
  11. StaticThingBuf,
  12. };
  13. #[test]
  14. fn static_storage_thingbuf() {
  15. static BUF: StaticThingBuf<i32, 16> = StaticThingBuf::new();
  16. static PRODUCER_LIVE: AtomicBool = AtomicBool::new(true);
  17. let producer = thread::spawn(move || {
  18. for i in 0..32 {
  19. let mut thing = 'write: loop {
  20. match BUF.push_ref() {
  21. Ok(thing) => break 'write thing,
  22. _ => thread::yield_now(),
  23. }
  24. };
  25. *thing = i;
  26. }
  27. PRODUCER_LIVE.store(false, Ordering::Release);
  28. });
  29. let mut i = 0;
  30. // While the producer is writing to the queue, push each entry to the
  31. // results string.
  32. while PRODUCER_LIVE.load(Ordering::Acquire) {
  33. match BUF.pop_ref() {
  34. Some(thing) => {
  35. assert_eq!(*thing, i);
  36. i += 1;
  37. }
  38. None => thread::yield_now(),
  39. }
  40. }
  41. producer.join().unwrap();
  42. // drain the queue.
  43. while let Some(thing) = BUF.pop_ref() {
  44. assert_eq!(*thing, i);
  45. i += 1;
  46. }
  47. }
  48. #[test]
  49. fn static_storage_thingbuf_offset_of_slot_field() {
  50. static BUF: StaticThingBuf<i32, 16> = StaticThingBuf::new();
  51. assert_eq!(BUF.offset_of_slots(), 384 + size_of::<DefaultRecycle>());
  52. }
  53. #[test]
  54. fn static_storage_stringbuf() {
  55. use recycling::WithCapacity;
  56. static BUF: StaticThingBuf<String, 8, WithCapacity> =
  57. StaticThingBuf::with_recycle(WithCapacity::new().with_max_capacity(8));
  58. static PRODUCER_LIVE: AtomicBool = AtomicBool::new(true);
  59. let producer = thread::spawn(move || {
  60. for i in 0..16 {
  61. let mut string = 'write: loop {
  62. match BUF.push_ref() {
  63. Ok(string) => break 'write string,
  64. _ => thread::yield_now(),
  65. }
  66. };
  67. write!(&mut string, "{:?}", i).unwrap();
  68. }
  69. PRODUCER_LIVE.store(false, Ordering::Release);
  70. println!("producer done");
  71. });
  72. let mut results = String::new();
  73. // While the producer is writing to the queue, push each entry to the
  74. // results string.
  75. while PRODUCER_LIVE.load(Ordering::Acquire) {
  76. if let Some(string) = BUF.pop_ref() {
  77. writeln!(results, "{}", string).unwrap();
  78. }
  79. thread::yield_now();
  80. }
  81. producer.join().unwrap();
  82. println!("producer done...");
  83. // drain the queue.
  84. while let Some(string) = BUF.pop_ref() {
  85. writeln!(results, "{}", string).unwrap();
  86. }
  87. println!("results:\n{}", results);
  88. for (n, ln) in results.lines().enumerate() {
  89. assert_eq!(ln.parse::<usize>(), Ok(n));
  90. }
  91. }
  92. #[tokio::test]
  93. async fn static_async_channel() {
  94. use std::collections::HashSet;
  95. use thingbuf::mpsc;
  96. const N_PRODUCERS: usize = 8;
  97. const N_SENDS: usize = N_PRODUCERS * 2;
  98. static CHANNEL: mpsc::StaticChannel<usize, N_PRODUCERS> = mpsc::StaticChannel::new();
  99. async fn do_producer(tx: mpsc::StaticSender<usize>, n: usize) {
  100. let tag = n * N_SENDS;
  101. for i in 0..N_SENDS {
  102. let msg = i + tag;
  103. println!("sending {}...", msg);
  104. tx.send(msg).await.unwrap();
  105. println!("sent {}!", msg);
  106. }
  107. println!("PRODUCER {} DONE!", n);
  108. }
  109. let (tx, rx) = CHANNEL.split();
  110. for n in 0..N_PRODUCERS {
  111. tokio::spawn(do_producer(tx.clone(), n));
  112. }
  113. drop(tx);
  114. let mut results = HashSet::new();
  115. while let Some(val) = {
  116. println!("receiving...");
  117. rx.recv().await
  118. } {
  119. println!("received {}!", val);
  120. results.insert(val);
  121. }
  122. let results = dbg!(results);
  123. for n in 0..N_PRODUCERS {
  124. let tag = n * N_SENDS;
  125. for i in 0..N_SENDS {
  126. let msg = i + tag;
  127. assert!(results.contains(&msg), "missing message {:?}", msg);
  128. }
  129. }
  130. }
  131. #[test]
  132. fn static_blocking_channel() {
  133. use std::collections::HashSet;
  134. use thingbuf::mpsc::blocking;
  135. const N_PRODUCERS: usize = 8;
  136. const N_SENDS: usize = N_PRODUCERS * 2;
  137. static CHANNEL: blocking::StaticChannel<usize, N_PRODUCERS> = blocking::StaticChannel::new();
  138. fn do_producer(tx: blocking::StaticSender<usize>, n: usize) -> impl FnOnce() {
  139. move || {
  140. let tag = n * N_SENDS;
  141. for i in 0..N_SENDS {
  142. let msg = i + tag;
  143. println!("sending {}...", msg);
  144. tx.send(msg).unwrap();
  145. println!("sent {}!", msg);
  146. }
  147. println!("PRODUCER {} DONE!", n);
  148. }
  149. }
  150. let (tx, rx) = CHANNEL.split();
  151. for n in 0..N_PRODUCERS {
  152. std::thread::spawn(do_producer(tx.clone(), n));
  153. }
  154. drop(tx);
  155. let mut results = HashSet::new();
  156. while let Some(val) = {
  157. println!("receiving...");
  158. rx.recv()
  159. } {
  160. println!("received {}!", val);
  161. results.insert(val);
  162. }
  163. let results = dbg!(results);
  164. for n in 0..N_PRODUCERS {
  165. let tag = n * N_SENDS;
  166. for i in 0..N_SENDS {
  167. let msg = i + tag;
  168. assert!(results.contains(&msg), "missing message {:?}", msg);
  169. }
  170. }
  171. }