lib.rs 9.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363
  1. use core::{
  2. fmt,
  3. ops::{Deref, DerefMut},
  4. };
  5. #[cfg(feature = "alloc")]
  6. extern crate alloc;
  7. macro_rules! test_println {
  8. ($($arg:tt)*) => {
  9. if cfg!(test) {
  10. if std::thread::panicking() {
  11. // getting the thread ID while panicking doesn't seem to play super nicely with loom's
  12. // mock lazy_static...
  13. println!("[PANIC {:>17}:{:<3}] {}", file!(), line!(), format_args!($($arg)*))
  14. } else {
  15. #[cfg(test)]
  16. println!("[{:?} {:>17}:{:<3}] {}", crate::loom::thread::current().id(), file!(), line!(), format_args!($($arg)*))
  17. }
  18. }
  19. }
  20. }
  21. macro_rules! test_dbg {
  22. ($e:expr) => {
  23. match $e {
  24. e => {
  25. #[cfg(test)]
  26. test_println!("{} = {:?}", stringify!($e), &e);
  27. e
  28. }
  29. }
  30. };
  31. }
  32. mod loom;
  33. #[cfg(test)]
  34. mod tests;
  35. use crate::loom::{
  36. atomic::{AtomicUsize, Ordering},
  37. UnsafeCell,
  38. };
  39. #[cfg(feature = "alloc")]
  40. mod stringbuf;
  41. pub struct ThingBuf<T> {
  42. head: CachePadded<AtomicUsize>,
  43. tail: CachePadded<AtomicUsize>,
  44. gen: usize,
  45. gen_mask: usize,
  46. idx_mask: usize,
  47. slots: Box<[Slot<T>]>,
  48. }
  49. pub struct Ref<'slot, T> {
  50. slot: &'slot Slot<T>,
  51. new_state: usize,
  52. }
  53. #[derive(Debug)]
  54. pub struct AtCapacity(usize);
  55. struct Slot<T> {
  56. value: UnsafeCell<T>,
  57. state: AtomicUsize,
  58. }
  59. #[derive(Debug)]
  60. struct Backoff(u8);
  61. #[cfg_attr(any(target_arch = "x86_64", target_arch = "aarch64"), repr(align(128)))]
  62. #[cfg_attr(
  63. not(any(target_arch = "x86_64", target_arch = "aarch64")),
  64. repr(align(64))
  65. )]
  66. #[derive(Clone, Copy, Default, Hash, PartialEq, Eq, Debug)]
  67. struct CachePadded<T>(T);
  68. // === impl ThingBuf ===
  69. impl<T: Default> ThingBuf<T> {
  70. pub fn new(capacity: usize) -> Self {
  71. Self::new_with(capacity, T::default)
  72. }
  73. }
  74. impl<T> ThingBuf<T> {
  75. pub fn new_with(capacity: usize, mut initializer: impl FnMut() -> T) -> Self {
  76. assert!(capacity > 0);
  77. let slots = (0..capacity)
  78. .map(|idx| Slot {
  79. state: AtomicUsize::new(idx),
  80. value: UnsafeCell::new(initializer()),
  81. })
  82. .collect();
  83. let gen = (capacity + 1).next_power_of_two();
  84. let idx_mask = gen - 1;
  85. let gen_mask = !(gen - 1);
  86. Self {
  87. head: CachePadded(AtomicUsize::new(0)),
  88. tail: CachePadded(AtomicUsize::new(0)),
  89. gen,
  90. gen_mask,
  91. idx_mask,
  92. slots,
  93. }
  94. }
  95. #[inline]
  96. fn idx_gen(&self, val: usize) -> (usize, usize) {
  97. (val & self.idx_mask, val & self.gen_mask)
  98. }
  99. #[inline]
  100. fn next(&self, idx: usize, gen: usize) -> usize {
  101. // Are we still in the same generation?
  102. if idx + 1 < self.slots.len() {
  103. // If we're still within the current generation, increment the index
  104. // by 1.
  105. (idx | gen) + 1
  106. } else {
  107. // We've reached the end of the current lap, wrap the index around
  108. // to 0.
  109. gen.wrapping_add(self.gen)
  110. }
  111. }
  112. #[inline]
  113. pub fn push_with<U>(&self, f: impl FnOnce(&mut T) -> U) -> Result<U, AtCapacity> {
  114. self.push_ref().map(|mut r| r.with_mut(f))
  115. }
  116. pub fn push_ref(&self) -> Result<Ref<'_, T>, AtCapacity> {
  117. let mut backoff = Backoff::new();
  118. let mut tail = self.tail.load(Ordering::Relaxed);
  119. loop {
  120. let (idx, gen) = self.idx_gen(tail);
  121. test_dbg!(idx);
  122. test_dbg!(gen);
  123. let slot = &self.slots[idx];
  124. let state = slot.state.load(Ordering::Acquire);
  125. if state == tail {
  126. // Move the tail index forward by 1.
  127. let next_tail = self.next(idx, gen);
  128. match self.tail.compare_exchange(
  129. tail,
  130. next_tail,
  131. Ordering::AcqRel,
  132. Ordering::Relaxed,
  133. ) {
  134. Ok(_) => {
  135. return Ok(Ref {
  136. new_state: tail + 1,
  137. slot,
  138. })
  139. }
  140. Err(actual) => {
  141. tail = actual;
  142. backoff.spin();
  143. continue;
  144. }
  145. }
  146. }
  147. if state.wrapping_add(self.gen) == tail + 1 {
  148. if self.head.load(Ordering::Acquire).wrapping_add(self.gen) == tail {
  149. return Err(AtCapacity(self.slots.len()));
  150. }
  151. backoff.spin();
  152. } else {
  153. backoff.spin_yield();
  154. }
  155. tail = self.tail.load(Ordering::Relaxed)
  156. }
  157. }
  158. #[inline]
  159. pub fn pop_with<U>(&self, f: impl FnOnce(&mut T) -> U) -> Option<U> {
  160. self.pop_ref().map(|mut r| r.with_mut(f))
  161. }
  162. pub fn pop_ref(&self) -> Option<Ref<'_, T>> {
  163. let mut backoff = Backoff::new();
  164. let mut head = self.head.load(Ordering::Relaxed);
  165. loop {
  166. let (idx, gen) = self.idx_gen(head);
  167. test_dbg!(idx);
  168. test_dbg!(gen);
  169. let slot = &self.slots[idx];
  170. let state = slot.state.load(Ordering::Acquire);
  171. // If the slot's state is ahead of the head index by one, we can pop
  172. // it.
  173. if state == head + 1 {
  174. let next_head = self.next(idx, gen);
  175. match self.head.compare_exchange(
  176. head,
  177. next_head,
  178. Ordering::AcqRel,
  179. Ordering::Relaxed,
  180. ) {
  181. Ok(_) => {
  182. return Some(Ref {
  183. new_state: head.wrapping_add(self.gen),
  184. slot,
  185. })
  186. }
  187. Err(actual) => {
  188. head = actual;
  189. backoff.spin();
  190. continue;
  191. }
  192. }
  193. }
  194. if state == head {
  195. let tail = self.tail.load(Ordering::Acquire);
  196. if tail == head {
  197. return None;
  198. }
  199. backoff.spin();
  200. } else {
  201. backoff.spin_yield();
  202. }
  203. head = self.head.load(Ordering::Relaxed);
  204. }
  205. }
  206. pub fn capacity(&self) -> usize {
  207. self.slots.len()
  208. }
  209. }
  210. impl<T> fmt::Debug for ThingBuf<T> {
  211. fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
  212. f.debug_struct("ThingBuf")
  213. .field("capacity", &self.capacity())
  214. .finish()
  215. }
  216. }
  217. // === impl Ref ===
  218. impl<T> Ref<'_, T> {
  219. #[inline]
  220. pub fn with<U>(&self, f: impl FnOnce(&T) -> U) -> U {
  221. self.slot.value.with(|value| unsafe {
  222. // Safety: if a `Ref` exists, we have exclusive ownership of the slot.
  223. f(&*value)
  224. })
  225. }
  226. #[inline]
  227. pub fn with_mut<U>(&mut self, f: impl FnOnce(&mut T) -> U) -> U {
  228. self.slot.value.with_mut(|value| unsafe {
  229. // Safety: if a `Ref` exists, we have exclusive ownership of the slot.
  230. f(&mut *value)
  231. })
  232. }
  233. }
  234. impl<T> Drop for Ref<'_, T> {
  235. #[inline]
  236. fn drop(&mut self) {
  237. self.slot.state.store(self.new_state, Ordering::Release);
  238. }
  239. }
  240. impl<T: fmt::Debug> fmt::Debug for Ref<'_, T> {
  241. fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
  242. self.with(|val| fmt::Debug::fmt(val, f))
  243. }
  244. }
  245. impl<T: fmt::Display> fmt::Display for Ref<'_, T> {
  246. fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
  247. self.with(|val| fmt::Display::fmt(val, f))
  248. }
  249. }
  250. impl<T: fmt::Write> fmt::Write for Ref<'_, T> {
  251. #[inline]
  252. fn write_str(&mut self, s: &str) -> fmt::Result {
  253. self.with_mut(|val| val.write_str(s))
  254. }
  255. #[inline]
  256. fn write_char(&mut self, c: char) -> fmt::Result {
  257. self.with_mut(|val| val.write_char(c))
  258. }
  259. #[inline]
  260. fn write_fmt(&mut self, f: fmt::Arguments<'_>) -> fmt::Result {
  261. self.with_mut(|val| val.write_fmt(f))
  262. }
  263. }
  264. // === impl Backoff ===
  265. impl Backoff {
  266. const MAX_SPINS: u8 = 6;
  267. const MAX_YIELDS: u8 = 10;
  268. #[inline]
  269. fn new() -> Self {
  270. Self(0)
  271. }
  272. #[inline]
  273. fn spin(&mut self) {
  274. for _ in 0..test_dbg!(1 << self.0.min(Self::MAX_SPINS)) {
  275. loom::hint::spin_loop();
  276. test_println!("spin_loop_hint");
  277. }
  278. if self.0 <= Self::MAX_SPINS {
  279. self.0 += 1;
  280. }
  281. }
  282. #[inline]
  283. fn spin_yield(&mut self) {
  284. if self.0 <= Self::MAX_SPINS || cfg!(not(any(feature = "std", test))) {
  285. for _ in 0..1 << self.0 {
  286. loom::hint::spin_loop();
  287. test_println!("spin_loop_hint");
  288. }
  289. }
  290. #[cfg(any(test, feature = "std"))]
  291. loom::thread::yield_now();
  292. if self.0 <= Self::MAX_YIELDS {
  293. self.0 += 1;
  294. }
  295. }
  296. }
  297. // === impl CachePadded ===
  298. impl<T> Deref for CachePadded<T> {
  299. type Target = T;
  300. fn deref(&self) -> &T {
  301. &self.0
  302. }
  303. }
  304. impl<T> DerefMut for CachePadded<T> {
  305. fn deref_mut(&mut self) -> &mut T {
  306. &mut self.0
  307. }
  308. }