lib.rs 8.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354
  1. use core::{
  2. fmt,
  3. ops::{Deref, DerefMut},
  4. };
  5. #[cfg(feature = "alloc")]
  6. extern crate alloc;
  7. macro_rules! test_println {
  8. ($($arg:tt)*) => {
  9. if cfg!(test) {
  10. if std::thread::panicking() {
  11. // getting the thread ID while panicking doesn't seem to play super nicely with loom's
  12. // mock lazy_static...
  13. println!("[PANIC {:>17}:{:<3}] {}", file!(), line!(), format_args!($($arg)*))
  14. } else {
  15. #[cfg(test)]
  16. println!("[{:?} {:>17}:{:<3}] {}", crate::loom::thread::current().id(), file!(), line!(), format_args!($($arg)*))
  17. }
  18. }
  19. }
  20. }
  21. macro_rules! test_dbg {
  22. ($e:expr) => {
  23. match $e {
  24. e => {
  25. #[cfg(test)]
  26. test_println!("{} = {:?}", stringify!($e), &e);
  27. e
  28. }
  29. }
  30. };
  31. }
  32. mod loom;
  33. #[cfg(test)]
  34. mod tests;
  35. use crate::loom::{
  36. atomic::{AtomicUsize, Ordering},
  37. UnsafeCell,
  38. };
  39. #[cfg(feature = "alloc")]
  40. mod stringbuf;
  41. pub struct ThingBuf<T> {
  42. head: CachePadded<AtomicUsize>,
  43. tail: CachePadded<AtomicUsize>,
  44. gen: usize,
  45. gen_mask: usize,
  46. idx_mask: usize,
  47. slots: Box<[Slot<T>]>,
  48. }
  49. pub struct Ref<'slot, T> {
  50. slot: &'slot Slot<T>,
  51. new_state: usize,
  52. }
  53. #[derive(Debug)]
  54. pub struct AtCapacity(usize);
  55. struct Slot<T> {
  56. value: UnsafeCell<T>,
  57. state: AtomicUsize,
  58. }
  59. #[derive(Debug)]
  60. struct Backoff(u8);
  61. #[cfg_attr(any(target_arch = "x86_64", target_arch = "aarch64"), repr(align(128)))]
  62. #[cfg_attr(
  63. not(any(target_arch = "x86_64", target_arch = "aarch64")),
  64. repr(align(64))
  65. )]
  66. #[derive(Clone, Copy, Default, Hash, PartialEq, Eq, Debug)]
  67. struct CachePadded<T>(T);
  68. // === impl ThingBuf ===
  69. impl<T: Default> ThingBuf<T> {
  70. pub fn new(capacity: usize) -> Self {
  71. Self::new_with(capacity, T::default)
  72. }
  73. }
  74. impl<T> ThingBuf<T> {
  75. pub fn new_with(capacity: usize, mut initializer: impl FnMut() -> T) -> Self {
  76. assert!(capacity > 0);
  77. let slots = (0..capacity)
  78. .map(|idx| Slot {
  79. state: AtomicUsize::new(idx),
  80. value: UnsafeCell::new(initializer()),
  81. })
  82. .collect();
  83. let gen = (capacity + 1).next_power_of_two();
  84. let idx_mask = gen - 1;
  85. let gen_mask = !(gen - 1);
  86. Self {
  87. head: CachePadded(AtomicUsize::new(0)),
  88. tail: CachePadded(AtomicUsize::new(0)),
  89. gen,
  90. gen_mask,
  91. idx_mask,
  92. slots,
  93. }
  94. }
  95. #[inline]
  96. fn idx_gen(&self, val: usize) -> (usize, usize) {
  97. (val & self.idx_mask, val & self.gen_mask)
  98. }
  99. #[inline]
  100. fn next(&self, idx: usize, gen: usize) -> usize {
  101. // Are we still in the same generation?
  102. if idx + 1 < self.slots.len() {
  103. // If we're still within the current generation, increment the index
  104. // by 1.
  105. (idx | gen) + 1
  106. } else {
  107. // We've reached the end of the current lap, wrap the index around
  108. // to 0.
  109. gen.wrapping_add(self.gen)
  110. }
  111. }
  112. pub fn push_ref(&self) -> Result<Ref<'_, T>, AtCapacity> {
  113. let mut backoff = Backoff::new();
  114. let mut tail = self.tail.load(Ordering::Relaxed);
  115. loop {
  116. let (idx, gen) = self.idx_gen(tail);
  117. test_dbg!(idx);
  118. test_dbg!(gen);
  119. let slot = &self.slots[idx];
  120. let state = slot.state.load(Ordering::Acquire);
  121. if state == tail {
  122. // Move the tail index forward by 1.
  123. let next_tail = self.next(idx, gen);
  124. match self.tail.compare_exchange(
  125. tail,
  126. next_tail,
  127. Ordering::AcqRel,
  128. Ordering::Relaxed,
  129. ) {
  130. Ok(_) => {
  131. return Ok(Ref {
  132. new_state: tail + 1,
  133. slot,
  134. })
  135. }
  136. Err(actual) => {
  137. tail = actual;
  138. backoff.spin();
  139. continue;
  140. }
  141. }
  142. }
  143. if state.wrapping_add(self.gen) == tail + 1 {
  144. if self.head.load(Ordering::Acquire).wrapping_add(self.gen) == tail {
  145. return Err(AtCapacity(self.slots.len()));
  146. }
  147. backoff.spin();
  148. } else {
  149. backoff.spin_yield();
  150. }
  151. tail = self.tail.load(Ordering::Relaxed)
  152. }
  153. }
  154. pub fn pop_ref(&self) -> Option<Ref<'_, T>> {
  155. let mut backoff = Backoff::new();
  156. let mut head = self.head.load(Ordering::Relaxed);
  157. loop {
  158. let (idx, gen) = self.idx_gen(head);
  159. test_dbg!(idx);
  160. test_dbg!(gen);
  161. let slot = &self.slots[idx];
  162. let state = slot.state.load(Ordering::Acquire);
  163. // If the slot's state is ahead of the head index by one, we can pop
  164. // it.
  165. if state == head + 1 {
  166. let next_head = self.next(idx, gen);
  167. match self.head.compare_exchange(
  168. head,
  169. next_head,
  170. Ordering::AcqRel,
  171. Ordering::Relaxed,
  172. ) {
  173. Ok(_) => {
  174. return Some(Ref {
  175. new_state: head.wrapping_add(self.gen),
  176. slot,
  177. })
  178. }
  179. Err(actual) => {
  180. head = actual;
  181. backoff.spin();
  182. continue;
  183. }
  184. }
  185. }
  186. if state == head {
  187. let tail = self.tail.load(Ordering::Acquire);
  188. if tail == head {
  189. return None;
  190. }
  191. backoff.spin();
  192. } else {
  193. backoff.spin_yield();
  194. }
  195. head = self.head.load(Ordering::Relaxed);
  196. }
  197. }
  198. pub fn capacity(&self) -> usize {
  199. self.slots.len()
  200. }
  201. }
  202. impl<T> fmt::Debug for ThingBuf<T> {
  203. fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
  204. f.debug_struct("ThingBuf")
  205. .field("capacity", &self.capacity())
  206. .finish()
  207. }
  208. }
  209. // === impl Ref ===
  210. impl<T> Ref<'_, T> {
  211. #[inline]
  212. pub fn with<U>(&self, f: impl Fn(&T) -> U) -> U {
  213. self.slot.value.with(|value| unsafe {
  214. // Safety: if a `Ref` exists, we have exclusive ownership of the slot.
  215. f(&*value)
  216. })
  217. }
  218. #[inline]
  219. pub fn with_mut<U>(&mut self, f: impl Fn(&mut T) -> U) -> U {
  220. self.slot.value.with_mut(|value| unsafe {
  221. // Safety: if a `Ref` exists, we have exclusive ownership of the slot.
  222. f(&mut *value)
  223. })
  224. }
  225. }
  226. impl<T> Drop for Ref<'_, T> {
  227. #[inline]
  228. fn drop(&mut self) {
  229. self.slot.state.store(self.new_state, Ordering::Release);
  230. }
  231. }
  232. impl<T: fmt::Debug> fmt::Debug for Ref<'_, T> {
  233. fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
  234. self.with(|val| fmt::Debug::fmt(val, f))
  235. }
  236. }
  237. impl<T: fmt::Display> fmt::Display for Ref<'_, T> {
  238. fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
  239. self.with(|val| fmt::Display::fmt(val, f))
  240. }
  241. }
  242. impl<T: fmt::Write> fmt::Write for Ref<'_, T> {
  243. #[inline]
  244. fn write_str(&mut self, s: &str) -> fmt::Result {
  245. self.with_mut(|val| val.write_str(s))
  246. }
  247. #[inline]
  248. fn write_char(&mut self, c: char) -> fmt::Result {
  249. self.with_mut(|val| val.write_char(c))
  250. }
  251. #[inline]
  252. fn write_fmt(&mut self, f: fmt::Arguments<'_>) -> fmt::Result {
  253. self.with_mut(|val| val.write_fmt(f))
  254. }
  255. }
  256. // === impl Backoff ===
  257. impl Backoff {
  258. const MAX_SPINS: u8 = 6;
  259. const MAX_YIELDS: u8 = 10;
  260. #[inline]
  261. fn new() -> Self {
  262. Self(0)
  263. }
  264. #[inline]
  265. fn spin(&mut self) {
  266. for _ in 0..test_dbg!(1 << self.0.min(Self::MAX_SPINS)) {
  267. loom::hint::spin_loop();
  268. test_println!("spin_loop_hint");
  269. }
  270. if self.0 <= Self::MAX_SPINS {
  271. self.0 += 1;
  272. }
  273. }
  274. #[inline]
  275. fn spin_yield(&mut self) {
  276. if self.0 <= Self::MAX_SPINS || cfg!(not(any(feature = "std", test))) {
  277. for _ in 0..1 << self.0 {
  278. loom::hint::spin_loop();
  279. test_println!("spin_loop_hint");
  280. }
  281. }
  282. #[cfg(any(test, feature = "std"))]
  283. loom::thread::yield_now();
  284. if self.0 <= Self::MAX_YIELDS {
  285. self.0 += 1;
  286. }
  287. }
  288. }
  289. // === impl CachePadded ===
  290. impl<T> Deref for CachePadded<T> {
  291. type Target = T;
  292. fn deref(&self) -> &T {
  293. &self.0
  294. }
  295. }
  296. impl<T> DerefMut for CachePadded<T> {
  297. fn deref_mut(&mut self) -> &mut T {
  298. &mut self.0
  299. }
  300. }