lib.rs 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366
  1. #![cfg_attr(not(feature = "std"), no_std)]
  2. use core::{fmt, mem::MaybeUninit, ops::Index};
  3. #[cfg(feature = "alloc")]
  4. extern crate alloc;
  5. macro_rules! test_println {
  6. ($($arg:tt)*) => {
  7. if cfg!(test) {
  8. if crate::util::panicking() {
  9. // getting the thread ID while panicking doesn't seem to play super nicely with loom's
  10. // mock lazy_static...
  11. println!("[PANIC {:>17}:{:<3}] {}", file!(), line!(), format_args!($($arg)*))
  12. } else {
  13. println!("[{:?} {:>17}:{:<3}] {}", crate::loom::thread::current().id(), file!(), line!(), format_args!($($arg)*))
  14. }
  15. }
  16. }
  17. }
  18. macro_rules! test_dbg {
  19. ($e:expr) => {
  20. match $e {
  21. e => {
  22. #[cfg(test)]
  23. test_println!("{} = {:?}", stringify!($e), &e);
  24. e
  25. }
  26. }
  27. };
  28. }
  29. mod loom;
  30. mod util;
  31. #[cfg(feature = "alloc")]
  32. mod thingbuf;
  33. #[cfg(feature = "alloc")]
  34. pub use self::thingbuf::ThingBuf;
  35. #[cfg(feature = "alloc")]
  36. mod stringbuf;
  37. #[cfg(feature = "alloc")]
  38. pub use stringbuf::{StaticStringBuf, StringBuf};
  39. mod static_thingbuf;
  40. pub use self::static_thingbuf::StaticThingBuf;
  41. use crate::{
  42. loom::{
  43. atomic::{AtomicUsize, Ordering},
  44. UnsafeCell,
  45. },
  46. util::{Backoff, CachePadded},
  47. };
  48. #[derive(Debug)]
  49. struct Core {
  50. head: CachePadded<AtomicUsize>,
  51. tail: CachePadded<AtomicUsize>,
  52. gen: usize,
  53. gen_mask: usize,
  54. idx_mask: usize,
  55. capacity: usize,
  56. }
  57. pub struct Ref<'slot, T> {
  58. slot: &'slot Slot<T>,
  59. new_state: usize,
  60. }
  61. #[derive(Debug)]
  62. pub struct AtCapacity(usize);
  63. pub struct Slot<T> {
  64. value: UnsafeCell<MaybeUninit<T>>,
  65. state: AtomicUsize,
  66. }
  67. impl Core {
  68. #[cfg(not(test))]
  69. const fn new(capacity: usize) -> Self {
  70. let gen = (capacity + 1).next_power_of_two();
  71. let idx_mask = gen - 1;
  72. let gen_mask = !(gen - 1);
  73. Self {
  74. head: CachePadded(AtomicUsize::new(0)),
  75. tail: CachePadded(AtomicUsize::new(0)),
  76. gen,
  77. gen_mask,
  78. idx_mask,
  79. capacity,
  80. }
  81. }
  82. #[cfg(test)]
  83. fn new(capacity: usize) -> Self {
  84. let gen = (capacity + 1).next_power_of_two();
  85. let idx_mask = gen - 1;
  86. let gen_mask = !(gen - 1);
  87. Self {
  88. head: CachePadded(AtomicUsize::new(0)),
  89. tail: CachePadded(AtomicUsize::new(0)),
  90. gen,
  91. gen_mask,
  92. idx_mask,
  93. capacity,
  94. }
  95. }
  96. #[inline]
  97. fn idx_gen(&self, val: usize) -> (usize, usize) {
  98. (val & self.idx_mask, val & self.gen_mask)
  99. }
  100. #[inline]
  101. fn next(&self, idx: usize, gen: usize) -> usize {
  102. // Are we still in the same generation?
  103. if idx + 1 < self.capacity() {
  104. // If we're still within the current generation, increment the index
  105. // by 1.
  106. (idx | gen) + 1
  107. } else {
  108. // We've reached the end of the current lap, wrap the index around
  109. // to 0.
  110. gen.wrapping_add(self.gen)
  111. }
  112. }
  113. #[inline]
  114. fn capacity(&self) -> usize {
  115. self.capacity
  116. }
  117. fn push_ref<'slots, T, S>(&self, slots: &'slots S) -> Result<Ref<'slots, T>, AtCapacity>
  118. where
  119. T: Default,
  120. S: Index<usize, Output = Slot<T>> + ?Sized,
  121. {
  122. test_println!("push_ref");
  123. let mut backoff = Backoff::new();
  124. let mut tail = self.tail.load(Ordering::Relaxed);
  125. loop {
  126. let (idx, gen) = self.idx_gen(tail);
  127. test_dbg!(idx);
  128. test_dbg!(gen);
  129. let slot = &slots[idx];
  130. let state = slot.state.load(Ordering::Acquire);
  131. if state == tail || (state == 0 && gen == 0) {
  132. // Move the tail index forward by 1.
  133. let next_tail = self.next(idx, gen);
  134. match self.tail.compare_exchange_weak(
  135. tail,
  136. next_tail,
  137. Ordering::AcqRel,
  138. Ordering::Relaxed,
  139. ) {
  140. Ok(_) => {
  141. // We got the slot! It's now okay to write to it
  142. test_println!("claimed tail slot");
  143. if gen == 0 {
  144. slot.value.with_mut(|value| unsafe {
  145. // Safety: we have just claimed exclusive ownership over
  146. // this slot.
  147. (*value).write(T::default());
  148. });
  149. test_println!("-> initialized");
  150. }
  151. return Ok(Ref {
  152. new_state: tail + 1,
  153. slot,
  154. });
  155. }
  156. Err(actual) => {
  157. // Someone else took this slot and advanced the tail
  158. // index. Try to claim the new tail.
  159. tail = actual;
  160. backoff.spin();
  161. continue;
  162. }
  163. }
  164. }
  165. if state.wrapping_add(self.gen) == tail + 1 {
  166. if self.head.load(Ordering::SeqCst).wrapping_add(self.gen) == tail {
  167. return Err(AtCapacity(self.capacity()));
  168. }
  169. backoff.spin();
  170. } else {
  171. backoff.spin_yield();
  172. }
  173. tail = self.tail.load(Ordering::Relaxed)
  174. }
  175. }
  176. fn pop_ref<'slots, T, S>(&self, slots: &'slots S) -> Option<Ref<'slots, T>>
  177. where
  178. S: Index<usize, Output = Slot<T>> + ?Sized,
  179. {
  180. test_println!("pop_ref");
  181. let mut backoff = Backoff::new();
  182. let mut head = self.head.load(Ordering::Relaxed);
  183. loop {
  184. test_dbg!(head);
  185. let (idx, gen) = self.idx_gen(head);
  186. test_dbg!(idx);
  187. test_dbg!(gen);
  188. let slot = &slots[idx];
  189. let state = slot.state.load(Ordering::Acquire);
  190. test_dbg!(state);
  191. // If the slot's state is ahead of the head index by one, we can pop
  192. // it.
  193. if test_dbg!(state == head + 1) {
  194. let next_head = self.next(idx, gen);
  195. match self.head.compare_exchange(
  196. head,
  197. next_head,
  198. Ordering::SeqCst,
  199. Ordering::Relaxed,
  200. ) {
  201. Ok(_) => {
  202. test_println!("claimed head slot");
  203. return Some(Ref {
  204. new_state: head.wrapping_add(self.gen),
  205. slot,
  206. });
  207. }
  208. Err(actual) => {
  209. head = actual;
  210. backoff.spin();
  211. continue;
  212. }
  213. }
  214. }
  215. if test_dbg!(state == head) {
  216. let tail = self.tail.load(Ordering::SeqCst);
  217. if test_dbg!(tail == head) {
  218. return None;
  219. }
  220. backoff.spin();
  221. } else {
  222. backoff.spin_yield();
  223. }
  224. head = self.head.load(Ordering::Relaxed);
  225. }
  226. }
  227. fn len(&self) -> usize {
  228. use std::cmp;
  229. loop {
  230. let tail = self.tail.load(Ordering::SeqCst);
  231. let head = self.head.load(Ordering::SeqCst);
  232. if self.tail.load(Ordering::SeqCst) == tail {
  233. let (head_idx, _) = self.idx_gen(head);
  234. let (tail_idx, _) = self.idx_gen(tail);
  235. return match head_idx.cmp(&tail_idx) {
  236. cmp::Ordering::Less => head_idx - tail_idx,
  237. cmp::Ordering::Greater => self.capacity - head_idx + tail_idx,
  238. _ if tail == head => 0,
  239. _ => self.capacity,
  240. };
  241. }
  242. }
  243. }
  244. }
  245. // === impl Ref ===
  246. impl<T> Ref<'_, T> {
  247. #[inline]
  248. pub fn with<U>(&self, f: impl FnOnce(&T) -> U) -> U {
  249. self.slot.value.with(|value| unsafe {
  250. // Safety: if a `Ref` exists, we have exclusive ownership of the
  251. // slot. A `Ref` is only created if the slot has already been
  252. // initialized.
  253. // TODO(eliza): use `MaybeUninit::assume_init_ref` here once it's
  254. // supported by `tracing-appender`'s MSRV.
  255. f(&*(&*value).as_ptr())
  256. })
  257. }
  258. #[inline]
  259. pub fn with_mut<U>(&mut self, f: impl FnOnce(&mut T) -> U) -> U {
  260. self.slot.value.with_mut(|value| unsafe {
  261. // Safety: if a `Ref` exists, we have exclusive ownership of the
  262. // slot.
  263. // TODO(eliza): use `MaybeUninit::assume_init_mut` here once it's
  264. // supported by `tracing-appender`'s MSRV.
  265. f(&mut *(&mut *value).as_mut_ptr())
  266. })
  267. }
  268. }
  269. impl<T> Drop for Ref<'_, T> {
  270. #[inline]
  271. fn drop(&mut self) {
  272. test_println!("drop_ref");
  273. self.slot
  274. .state
  275. .store(test_dbg!(self.new_state), Ordering::Release);
  276. }
  277. }
  278. impl<T: fmt::Debug> fmt::Debug for Ref<'_, T> {
  279. fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
  280. self.with(|val| fmt::Debug::fmt(val, f))
  281. }
  282. }
  283. impl<T: fmt::Display> fmt::Display for Ref<'_, T> {
  284. fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
  285. self.with(|val| fmt::Display::fmt(val, f))
  286. }
  287. }
  288. impl<T: fmt::Write> fmt::Write for Ref<'_, T> {
  289. #[inline]
  290. fn write_str(&mut self, s: &str) -> fmt::Result {
  291. self.with_mut(|val| val.write_str(s))
  292. }
  293. #[inline]
  294. fn write_char(&mut self, c: char) -> fmt::Result {
  295. self.with_mut(|val| val.write_char(c))
  296. }
  297. #[inline]
  298. fn write_fmt(&mut self, f: fmt::Arguments<'_>) -> fmt::Result {
  299. self.with_mut(|val| val.write_fmt(f))
  300. }
  301. }
  302. // === impl Slot ===
  303. impl<T> Slot<T> {
  304. #[cfg(not(test))]
  305. const fn empty() -> Self {
  306. Self {
  307. value: UnsafeCell::new(MaybeUninit::uninit()),
  308. state: AtomicUsize::new(0),
  309. }
  310. }
  311. #[cfg(test)]
  312. fn empty() -> Self {
  313. Self {
  314. value: UnsafeCell::new(MaybeUninit::uninit()),
  315. state: AtomicUsize::new(0),
  316. }
  317. }
  318. }
  319. unsafe impl<T: Sync> Sync for Slot<T> {}