lib.rs 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449
  1. #![cfg_attr(docsrs, doc = include_str!("../README.md"))]
  2. #![cfg_attr(not(feature = "std"), no_std)]
  3. #![cfg_attr(docsrs, feature(doc_cfg))]
  4. use core::{cmp, fmt, mem::MaybeUninit, ops};
  5. #[macro_use]
  6. mod macros;
  7. mod loom;
  8. mod util;
  9. mod wait;
  10. feature! {
  11. #![feature = "alloc"]
  12. extern crate alloc;
  13. mod thingbuf;
  14. pub use self::thingbuf::ThingBuf;
  15. mod stringbuf;
  16. pub use stringbuf::{StaticStringBuf, StringBuf};
  17. pub mod mpsc;
  18. }
  19. mod static_thingbuf;
  20. pub use self::static_thingbuf::StaticThingBuf;
  21. use crate::{
  22. loom::{
  23. atomic::{AtomicUsize, Ordering::*},
  24. cell::{MutPtr, UnsafeCell},
  25. },
  26. util::{Backoff, CachePadded},
  27. };
  28. pub struct Ref<'slot, T> {
  29. ptr: MutPtr<MaybeUninit<T>>,
  30. slot: &'slot Slot<T>,
  31. new_state: usize,
  32. }
  33. pub struct Full<T = ()>(T);
  34. /// State variables for the atomic ring buffer algorithm.
  35. ///
  36. /// This is separated from the actual storage array used to implement the ring
  37. /// buffer, so that it can be used by both the dynamically-sized implementation
  38. /// (`Box<[Slot<T>>]`) and the statically-sized (`[T; const usize]`)
  39. /// implementation.
  40. ///
  41. /// A `Core`, when provided with a reference to the storage array, knows how to
  42. /// actually perform the ring buffer operations on that array.
  43. ///
  44. /// The atomic ring buffer is based on the [MPMC bounded queue from 1024cores][1].
  45. ///
  46. /// [1]: https://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue
  47. #[derive(Debug)]
  48. struct Core {
  49. head: CachePadded<AtomicUsize>,
  50. tail: CachePadded<AtomicUsize>,
  51. gen: usize,
  52. gen_mask: usize,
  53. idx_mask: usize,
  54. closed: usize,
  55. capacity: usize,
  56. }
  57. struct Slot<T> {
  58. value: UnsafeCell<MaybeUninit<T>>,
  59. state: AtomicUsize,
  60. }
  61. impl Core {
  62. #[cfg(not(all(loom, test)))]
  63. const fn new(capacity: usize) -> Self {
  64. let closed = (capacity + 1).next_power_of_two();
  65. let idx_mask = closed - 1;
  66. let gen = closed << 1;
  67. let gen_mask = !(closed | idx_mask);
  68. Self {
  69. head: CachePadded(AtomicUsize::new(0)),
  70. tail: CachePadded(AtomicUsize::new(0)),
  71. gen,
  72. gen_mask,
  73. closed,
  74. idx_mask,
  75. capacity,
  76. }
  77. }
  78. #[cfg(all(loom, test))]
  79. fn new(capacity: usize) -> Self {
  80. let closed = (capacity + 1).next_power_of_two();
  81. let idx_mask = closed - 1;
  82. let gen = closed << 1;
  83. let gen_mask = !(closed | idx_mask);
  84. Self {
  85. head: CachePadded(AtomicUsize::new(0)),
  86. tail: CachePadded(AtomicUsize::new(0)),
  87. gen,
  88. closed,
  89. gen_mask,
  90. idx_mask,
  91. capacity,
  92. }
  93. }
  94. #[inline]
  95. fn idx_gen(&self, val: usize) -> (usize, usize) {
  96. (val & self.idx_mask, val & self.gen_mask)
  97. }
  98. #[inline]
  99. fn next(&self, idx: usize, gen: usize) -> usize {
  100. // Are we still in the same generation?
  101. if idx + 1 < self.capacity() {
  102. // If we're still within the current generation, increment the index
  103. // by 1.
  104. (idx | gen) + 1
  105. } else {
  106. // We've reached the end of the current lap, wrap the index around
  107. // to 0.
  108. gen.wrapping_add(self.gen)
  109. }
  110. }
  111. #[inline]
  112. fn capacity(&self) -> usize {
  113. self.capacity
  114. }
  115. fn close(&self) -> bool {
  116. test_println!("Core::close");
  117. if std::thread::panicking() {
  118. return false;
  119. }
  120. test_dbg!(self.tail.fetch_or(self.closed, SeqCst) & self.closed == 0)
  121. }
  122. fn push_ref<'slots, T, S>(
  123. &self,
  124. slots: &'slots S,
  125. ) -> Result<Ref<'slots, T>, mpsc::TrySendError<()>>
  126. where
  127. T: Default,
  128. S: ops::Index<usize, Output = Slot<T>> + ?Sized,
  129. {
  130. test_println!("push_ref");
  131. let mut backoff = Backoff::new();
  132. let mut tail = self.tail.load(Relaxed);
  133. loop {
  134. if test_dbg!(tail & self.closed != 0) {
  135. return Err(mpsc::TrySendError::Closed(()));
  136. }
  137. let (idx, gen) = self.idx_gen(tail);
  138. test_dbg!(idx);
  139. test_dbg!(gen);
  140. let slot = &slots[idx];
  141. let actual_state = test_dbg!(slot.state.load(Acquire));
  142. let state = if actual_state == EMPTY_STATE {
  143. idx
  144. } else {
  145. actual_state
  146. };
  147. if test_dbg!(state == tail) || test_dbg!(actual_state == EMPTY_STATE && gen == 0) {
  148. // Move the tail index forward by 1.
  149. let next_tail = self.next(idx, gen);
  150. match test_dbg!(self
  151. .tail
  152. .compare_exchange_weak(tail, next_tail, SeqCst, Acquire))
  153. {
  154. Ok(_) => {
  155. // We got the slot! It's now okay to write to it
  156. test_println!("claimed tail slot [{}]", idx);
  157. // Claim exclusive ownership over the slot
  158. let ptr = slot.value.get_mut();
  159. if gen == 0 {
  160. unsafe {
  161. // Safety: we have just claimed exclusive ownership over
  162. // this slot.
  163. ptr.deref().write(T::default());
  164. };
  165. test_println!("-> initialized");
  166. }
  167. return Ok(Ref {
  168. ptr,
  169. new_state: tail + 1,
  170. slot,
  171. });
  172. }
  173. Err(actual) => {
  174. // Someone else took this slot and advanced the tail
  175. // index. Try to claim the new tail.
  176. tail = actual;
  177. backoff.spin();
  178. continue;
  179. }
  180. }
  181. }
  182. if test_dbg!(state.wrapping_add(self.gen) == tail + 1) {
  183. // fake RMW op to placate loom. this should be equivalent to
  184. // doing a relaxed load after a SeqCst fence (per Godbolt
  185. // https://godbolt.org/z/zb15qfEa9), however, loom understands
  186. // this correctly, while it does not understand an explicit
  187. // SeqCst fence and a load.
  188. // XXX(eliza): this makes me DEEPLY UNCOMFORTABLE but if it's a
  189. // load it gets reordered differently in the model checker lmao...
  190. let head = test_dbg!(self.head.fetch_or(0, SeqCst));
  191. if test_dbg!(head.wrapping_add(self.gen) == tail) {
  192. test_println!("channel full");
  193. return Err(mpsc::TrySendError::Full(()));
  194. }
  195. backoff.spin();
  196. } else {
  197. backoff.spin_yield();
  198. }
  199. tail = test_dbg!(self.tail.load(Acquire));
  200. }
  201. }
  202. fn pop_ref<'slots, T, S>(&self, slots: &'slots S) -> Result<Ref<'slots, T>, mpsc::TrySendError>
  203. where
  204. S: ops::Index<usize, Output = Slot<T>> + ?Sized,
  205. {
  206. test_println!("pop_ref");
  207. let mut backoff = Backoff::new();
  208. let mut head = self.head.load(Relaxed);
  209. loop {
  210. test_dbg!(head);
  211. let (idx, gen) = self.idx_gen(head);
  212. test_dbg!(idx);
  213. test_dbg!(gen);
  214. let slot = &slots[idx];
  215. let state = test_dbg!(slot.state.load(Acquire));
  216. let state = if state == EMPTY_STATE { idx } else { state };
  217. // If the slot's state is ahead of the head index by one, we can pop
  218. // it.
  219. if test_dbg!(state == head + 1) {
  220. let next_head = self.next(idx, gen);
  221. match test_dbg!(self
  222. .head
  223. .compare_exchange_weak(head, next_head, SeqCst, Acquire))
  224. {
  225. Ok(_) => {
  226. test_println!("claimed head slot [{}]", idx);
  227. return Ok(Ref {
  228. new_state: head.wrapping_add(self.gen),
  229. ptr: slot.value.get_mut(),
  230. slot,
  231. });
  232. }
  233. Err(actual) => {
  234. head = actual;
  235. backoff.spin();
  236. continue;
  237. }
  238. }
  239. }
  240. if test_dbg!(state == head) {
  241. // fake RMW op to placate loom. this should be equivalent to
  242. // doing a relaxed load after a SeqCst fence (per Godbolt
  243. // https://godbolt.org/z/zb15qfEa9), however, loom understands
  244. // this correctly, while it does not understand an explicit
  245. // SeqCst fence and a load.
  246. // XXX(eliza): this makes me DEEPLY UNCOMFORTABLE but if it's a
  247. // load it gets reordered differently in the model checker lmao...
  248. let tail = test_dbg!(self.tail.fetch_or(0, SeqCst));
  249. if test_dbg!(tail & !self.closed == head) {
  250. return if test_dbg!(tail & self.closed != 0) {
  251. Err(mpsc::TrySendError::Closed(()))
  252. } else {
  253. test_println!("--> channel full!");
  254. Err(mpsc::TrySendError::Full(()))
  255. };
  256. }
  257. if test_dbg!(backoff.done_spinning()) {
  258. return Err(mpsc::TrySendError::Full(()));
  259. }
  260. backoff.spin();
  261. } else {
  262. backoff.spin_yield();
  263. }
  264. head = test_dbg!(self.head.load(Acquire));
  265. }
  266. }
  267. fn len(&self) -> usize {
  268. loop {
  269. let tail = self.tail.load(SeqCst);
  270. let head = self.head.load(SeqCst);
  271. if self.tail.load(SeqCst) == tail {
  272. let (head_idx, _) = self.idx_gen(head);
  273. let (tail_idx, _) = self.idx_gen(tail);
  274. return match head_idx.cmp(&tail_idx) {
  275. cmp::Ordering::Less => tail_idx - head_idx,
  276. cmp::Ordering::Greater => self.capacity - head_idx + tail_idx,
  277. _ if tail == head => 0,
  278. _ => self.capacity,
  279. };
  280. }
  281. }
  282. }
  283. }
  284. // === impl Ref ===
  285. impl<T> Ref<'_, T> {
  286. #[inline]
  287. pub fn with<U>(&self, f: impl FnOnce(&T) -> U) -> U {
  288. self.ptr.with(|value| unsafe {
  289. // Safety: if a `Ref` exists, we have exclusive ownership of the
  290. // slot. A `Ref` is only created if the slot has already been
  291. // initialized.
  292. // TODO(eliza): use `MaybeUninit::assume_init_ref` here once it's
  293. // supported by `tracing-appender`'s MSRV.
  294. f(&*(&*value).as_ptr())
  295. })
  296. }
  297. #[inline]
  298. pub fn with_mut<U>(&mut self, f: impl FnOnce(&mut T) -> U) -> U {
  299. self.ptr.with(|value| unsafe {
  300. // Safety: if a `Ref` exists, we have exclusive ownership of the
  301. // slot.
  302. // TODO(eliza): use `MaybeUninit::assume_init_mut` here once it's
  303. // supported by `tracing-appender`'s MSRV.
  304. f(&mut *(&mut *value).as_mut_ptr())
  305. })
  306. }
  307. }
  308. impl<T> ops::Deref for Ref<'_, T> {
  309. type Target = T;
  310. #[inline]
  311. fn deref(&self) -> &Self::Target {
  312. unsafe {
  313. // Safety: if a `Ref` exists, we have exclusive ownership of the
  314. // slot. A `Ref` is only created if the slot has already been
  315. // initialized.
  316. &*self.ptr.deref().as_ptr()
  317. }
  318. }
  319. }
  320. impl<T> ops::DerefMut for Ref<'_, T> {
  321. #[inline]
  322. fn deref_mut(&mut self) -> &mut Self::Target {
  323. unsafe {
  324. // Safety: if a `Ref` exists, we have exclusive ownership of the
  325. // slot. A `Ref` is only created if the slot has already been
  326. // initialized.
  327. &mut *self.ptr.deref().as_mut_ptr()
  328. }
  329. }
  330. }
  331. impl<T> Drop for Ref<'_, T> {
  332. #[inline]
  333. fn drop(&mut self) {
  334. test_println!("drop Ref<{}>", core::any::type_name::<T>());
  335. test_dbg!(self.slot.state.store(test_dbg!(self.new_state), Release));
  336. }
  337. }
  338. impl<T: fmt::Debug> fmt::Debug for Ref<'_, T> {
  339. fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
  340. self.with(|val| fmt::Debug::fmt(val, f))
  341. }
  342. }
  343. impl<T: fmt::Display> fmt::Display for Ref<'_, T> {
  344. fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
  345. self.with(|val| fmt::Display::fmt(val, f))
  346. }
  347. }
  348. impl<T: fmt::Write> fmt::Write for Ref<'_, T> {
  349. #[inline]
  350. fn write_str(&mut self, s: &str) -> fmt::Result {
  351. self.with_mut(|val| val.write_str(s))
  352. }
  353. #[inline]
  354. fn write_char(&mut self, c: char) -> fmt::Result {
  355. self.with_mut(|val| val.write_char(c))
  356. }
  357. #[inline]
  358. fn write_fmt(&mut self, f: fmt::Arguments<'_>) -> fmt::Result {
  359. self.with_mut(|val| val.write_fmt(f))
  360. }
  361. }
  362. unsafe impl<T: Send> Send for Ref<'_, T> {}
  363. unsafe impl<T: Send> Sync for Ref<'_, T> {}
  364. // === impl Slot ===
  365. const EMPTY_STATE: usize = usize::MAX;
  366. impl<T> Slot<T> {
  367. #[cfg(not(all(loom, test)))]
  368. const fn empty() -> Self {
  369. Self {
  370. value: UnsafeCell::new(MaybeUninit::uninit()),
  371. state: AtomicUsize::new(EMPTY_STATE),
  372. }
  373. }
  374. #[cfg(all(loom, test))]
  375. fn empty() -> Self {
  376. Self {
  377. value: UnsafeCell::new(MaybeUninit::uninit()),
  378. state: AtomicUsize::new(EMPTY_STATE),
  379. }
  380. }
  381. }
  382. unsafe impl<T: Sync> Sync for Slot<T> {}
  383. impl<T> fmt::Debug for Full<T> {
  384. fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
  385. f.write_str("Full(..)")
  386. }
  387. }
  388. impl<T> fmt::Display for Full<T> {
  389. fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
  390. f.write_str("channel full")
  391. }
  392. }