static_storage.rs 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192
  1. use std::{
  2. fmt::Write,
  3. sync::atomic::{AtomicBool, Ordering},
  4. thread,
  5. };
  6. use thingbuf::{StaticStringBuf, StaticThingBuf};
  7. #[test]
  8. fn static_storage_thingbuf() {
  9. static BUF: StaticThingBuf<i32, 16> = StaticThingBuf::new();
  10. static PRODUCER_LIVE: AtomicBool = AtomicBool::new(true);
  11. let producer = thread::spawn(move || {
  12. for i in 0..32 {
  13. let mut thing = 'write: loop {
  14. match BUF.push_ref() {
  15. Ok(thing) => break 'write thing,
  16. _ => thread::yield_now(),
  17. }
  18. };
  19. thing.with_mut(|thing| *thing = i);
  20. }
  21. PRODUCER_LIVE.store(false, Ordering::Release);
  22. });
  23. let mut i = 0;
  24. // While the producer is writing to the queue, push each entry to the
  25. // results string.
  26. while PRODUCER_LIVE.load(Ordering::Acquire) {
  27. match BUF.pop_ref() {
  28. Some(thing) => thing.with(|thing| {
  29. assert_eq!(*thing, i);
  30. i += 1;
  31. }),
  32. None => thread::yield_now(),
  33. }
  34. }
  35. producer.join().unwrap();
  36. // drain the queue.
  37. while let Some(thing) = BUF.pop_ref() {
  38. thing.with(|thing| {
  39. assert_eq!(*thing, i);
  40. i += 1;
  41. })
  42. }
  43. }
  44. #[test]
  45. fn static_storage_stringbuf() {
  46. static BUF: StaticStringBuf<8> = StaticStringBuf::new();
  47. static PRODUCER_LIVE: AtomicBool = AtomicBool::new(true);
  48. let producer = thread::spawn(move || {
  49. for i in 0..16 {
  50. let mut string = 'write: loop {
  51. match BUF.write() {
  52. Ok(string) => break 'write string,
  53. _ => thread::yield_now(),
  54. }
  55. };
  56. write!(&mut string, "{:?}", i).unwrap();
  57. }
  58. PRODUCER_LIVE.store(false, Ordering::Release);
  59. println!("producer done");
  60. });
  61. let mut results = String::new();
  62. // While the producer is writing to the queue, push each entry to the
  63. // results string.
  64. while PRODUCER_LIVE.load(Ordering::Acquire) {
  65. if let Some(string) = BUF.pop_ref() {
  66. writeln!(results, "{}", string).unwrap();
  67. }
  68. thread::yield_now();
  69. }
  70. producer.join().unwrap();
  71. println!("producer done...");
  72. // drain the queue.
  73. while let Some(string) = BUF.pop_ref() {
  74. writeln!(results, "{}", string).unwrap();
  75. }
  76. println!("results:\n{}", results);
  77. for (n, ln) in results.lines().enumerate() {
  78. assert_eq!(ln.parse::<usize>(), Ok(n));
  79. }
  80. }
  81. #[tokio::test]
  82. async fn static_async_channel() {
  83. use std::collections::HashSet;
  84. use thingbuf::mpsc;
  85. const N_PRODUCERS: usize = 8;
  86. const N_SENDS: usize = N_PRODUCERS * 2;
  87. static CHANNEL: mpsc::StaticChannel<usize, N_PRODUCERS> = mpsc::StaticChannel::new();
  88. async fn do_producer(tx: mpsc::StaticSender<usize>, n: usize) {
  89. let tag = n * N_SENDS;
  90. for i in 0..N_SENDS {
  91. let msg = i + tag;
  92. println!("sending {}...", msg);
  93. tx.send(msg).await.unwrap();
  94. println!("sent {}!", msg);
  95. }
  96. println!("PRODUCER {} DONE!", n);
  97. }
  98. let (tx, rx) = CHANNEL.split();
  99. for n in 0..N_PRODUCERS {
  100. tokio::spawn(do_producer(tx.clone(), n));
  101. }
  102. drop(tx);
  103. let mut results = HashSet::new();
  104. while let Some(val) = {
  105. println!("receiving...");
  106. rx.recv().await
  107. } {
  108. println!("received {}!", val);
  109. results.insert(val);
  110. }
  111. let results = dbg!(results);
  112. for n in 0..N_PRODUCERS {
  113. let tag = n * N_SENDS;
  114. for i in 0..N_SENDS {
  115. let msg = i + tag;
  116. assert!(results.contains(&msg), "missing message {:?}", msg);
  117. }
  118. }
  119. }
  120. #[test]
  121. fn static_sync_channel() {
  122. use std::collections::HashSet;
  123. use thingbuf::mpsc::sync;
  124. const N_PRODUCERS: usize = 8;
  125. const N_SENDS: usize = N_PRODUCERS * 2;
  126. static CHANNEL: sync::StaticChannel<usize, N_PRODUCERS> = sync::StaticChannel::new();
  127. fn do_producer(tx: sync::StaticSender<usize>, n: usize) -> impl FnOnce() {
  128. move || {
  129. let tag = n * N_SENDS;
  130. for i in 0..N_SENDS {
  131. let msg = i + tag;
  132. println!("sending {}...", msg);
  133. tx.send(msg).unwrap();
  134. println!("sent {}!", msg);
  135. }
  136. println!("PRODUCER {} DONE!", n);
  137. }
  138. }
  139. let (tx, rx) = CHANNEL.split();
  140. for n in 0..N_PRODUCERS {
  141. std::thread::spawn(do_producer(tx.clone(), n));
  142. }
  143. drop(tx);
  144. let mut results = HashSet::new();
  145. while let Some(val) = {
  146. println!("receiving...");
  147. rx.recv()
  148. } {
  149. println!("received {}!", val);
  150. results.insert(val);
  151. }
  152. let results = dbg!(results);
  153. for n in 0..N_PRODUCERS {
  154. let tag = n * N_SENDS;
  155. for i in 0..N_SENDS {
  156. let msg = i + tag;
  157. assert!(results.contains(&msg), "missing message {:?}", msg);
  158. }
  159. }
  160. }