lib.rs 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537
  1. #![cfg_attr(not(feature = "std"), no_std)]
  2. #![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
  3. #![doc = include_str!("../README.md")]
  4. #![warn(missing_docs)]
  5. use core::{cmp, fmt, mem::MaybeUninit, ops, ptr};
  6. #[macro_use]
  7. mod macros;
  8. mod loom;
  9. pub mod mpsc;
  10. pub mod recycling;
  11. mod util;
  12. mod wait;
  13. pub use self::recycling::Recycle;
  14. // TODO(eliza): finish writing this
  15. // #[doc = include_str!("../mpsc_perf_comparison.md")]
  16. // pub mod mpsc_perf_comparison {
  17. // // Empty module, used only for documentation.
  18. // }
  19. feature! {
  20. #![all(feature = "static", not(all(loom, test)))]
  21. mod static_thingbuf;
  22. pub use self::static_thingbuf::StaticThingBuf;
  23. }
  24. feature! {
  25. #![feature = "alloc"]
  26. extern crate alloc;
  27. mod thingbuf;
  28. pub use self::thingbuf::ThingBuf;
  29. }
  30. use crate::{
  31. loom::{
  32. atomic::{AtomicUsize, Ordering::*},
  33. cell::{MutPtr, UnsafeCell},
  34. },
  35. mpsc::errors::TrySendError,
  36. util::{Backoff, CachePadded},
  37. };
  38. /// A reference to an entry in a [`ThingBuf`].
  39. ///
  40. /// A `Ref` represents the exclusive permission to mutate a given element in a
  41. /// queue. A `Ref<T>` [implements `DerefMut<T>`] to allow writing to that
  42. /// element.
  43. ///
  44. /// `Ref`s are returned by the [`ThingBuf::push_ref`] and [`ThingBuf::pop_ref`]
  45. /// methods. When the `Ref` is dropped, the exclusive write access to that
  46. /// element is released, and the push or pop operation is completed &mdash;
  47. /// calling `push_ref` or `pop_ref` *begins* a push or pop operation, which ends
  48. /// when the returned `Ref` is complete. When the `Ref` is dropped, the pushed
  49. /// element will become available to a subsequent `pop_ref`, or the popped
  50. /// element will be able to be written to by a `push_ref`, respectively.
  51. ///
  52. /// [implements `DerefMut<T>`]: #impl-DerefMut
  53. pub struct Ref<'slot, T> {
  54. ptr: MutPtr<MaybeUninit<T>>,
  55. slot: &'slot Slot<T>,
  56. new_state: usize,
  57. }
  58. /// Error returned when sending a message failed because a channel is at capacity.
  59. #[derive(Eq, PartialEq)]
  60. pub struct Full<T = ()>(T);
  61. /// State variables for the atomic ring buffer algorithm.
  62. ///
  63. /// This is separated from the actual storage array used to implement the ring
  64. /// buffer, so that it can be used by both the dynamically-sized implementation
  65. /// (`Box<[Slot<T>>]`) and the statically-sized (`[T; const usize]`)
  66. /// implementation.
  67. ///
  68. /// A `Core`, when provided with a reference to the storage array, knows how to
  69. /// actually perform the ring buffer operations on that array.
  70. ///
  71. /// The atomic ring buffer is based on the [MPMC bounded queue from 1024cores][1].
  72. ///
  73. /// [1]: https://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue
  74. #[derive(Debug)]
  75. struct Core {
  76. head: CachePadded<AtomicUsize>,
  77. tail: CachePadded<AtomicUsize>,
  78. gen: usize,
  79. gen_mask: usize,
  80. idx_mask: usize,
  81. closed: usize,
  82. capacity: usize,
  83. /// Set when dropping the slots in the ring buffer, to avoid potential double-frees.
  84. has_dropped_slots: bool,
  85. }
  86. struct Slot<T> {
  87. value: UnsafeCell<MaybeUninit<T>>,
  88. state: AtomicUsize,
  89. }
  90. impl Core {
  91. #[cfg(not(all(loom, test)))]
  92. const fn new(capacity: usize) -> Self {
  93. let closed = (capacity + 1).next_power_of_two();
  94. let idx_mask = closed - 1;
  95. let gen = closed << 1;
  96. let gen_mask = !(closed | idx_mask);
  97. Self {
  98. head: CachePadded(AtomicUsize::new(0)),
  99. tail: CachePadded(AtomicUsize::new(0)),
  100. gen,
  101. gen_mask,
  102. closed,
  103. idx_mask,
  104. capacity,
  105. has_dropped_slots: false,
  106. }
  107. }
  108. #[cfg(all(loom, test))]
  109. fn new(capacity: usize) -> Self {
  110. let closed = (capacity + 1).next_power_of_two();
  111. let idx_mask = closed - 1;
  112. let gen = closed << 1;
  113. let gen_mask = !(closed | idx_mask);
  114. Self {
  115. head: CachePadded(AtomicUsize::new(0)),
  116. tail: CachePadded(AtomicUsize::new(0)),
  117. gen,
  118. closed,
  119. gen_mask,
  120. idx_mask,
  121. capacity,
  122. #[cfg(debug_assertions)]
  123. has_dropped_slots: false,
  124. }
  125. }
  126. #[inline(always)]
  127. fn idx_gen(&self, val: usize) -> (usize, usize) {
  128. (val & self.idx_mask, val & self.gen_mask)
  129. }
  130. #[inline]
  131. fn next(&self, idx: usize, gen: usize) -> usize {
  132. // Are we still in the same generation?
  133. if idx + 1 < self.capacity() {
  134. // If we're still within the current generation, increment the index
  135. // by 1.
  136. (idx | gen) + 1
  137. } else {
  138. // We've reached the end of the current lap, wrap the index around
  139. // to 0.
  140. gen.wrapping_add(self.gen)
  141. }
  142. }
  143. #[inline]
  144. fn capacity(&self) -> usize {
  145. self.capacity
  146. }
  147. fn close(&self) -> bool {
  148. test_println!("Core::close");
  149. if std::thread::panicking() {
  150. return false;
  151. }
  152. test_dbg!(self.tail.fetch_or(self.closed, SeqCst) & self.closed == 0)
  153. }
  154. #[inline(always)]
  155. fn push_ref<'slots, T, S, R>(
  156. &self,
  157. slots: &'slots S,
  158. recycle: &R,
  159. ) -> Result<Ref<'slots, T>, TrySendError<()>>
  160. where
  161. R: Recycle<T>,
  162. S: ops::Index<usize, Output = Slot<T>> + ?Sized,
  163. {
  164. test_println!("push_ref");
  165. let mut backoff = Backoff::new();
  166. let mut tail = self.tail.load(Relaxed);
  167. loop {
  168. if test_dbg!(tail & self.closed != 0) {
  169. return Err(TrySendError::Closed(()));
  170. }
  171. let (idx, gen) = self.idx_gen(tail);
  172. test_dbg!(idx);
  173. test_dbg!(gen);
  174. let slot = &slots[idx];
  175. let state = test_dbg!(slot.state.load(Acquire));
  176. if test_dbg!(state == tail) {
  177. // Move the tail index forward by 1.
  178. let next_tail = self.next(idx, gen);
  179. match test_dbg!(self
  180. .tail
  181. .compare_exchange_weak(tail, next_tail, SeqCst, Acquire))
  182. {
  183. Ok(_) => {
  184. // We got the slot! It's now okay to write to it
  185. test_println!("claimed tail slot [{}]", idx);
  186. // Claim exclusive ownership over the slot
  187. let ptr = slot.value.get_mut();
  188. // Initialize or recycle the element.
  189. unsafe {
  190. // Safety: we have just claimed exclusive ownership over
  191. // this slot.
  192. let ptr = ptr.deref();
  193. if gen == 0 {
  194. ptr.write(recycle.new_element());
  195. test_println!("-> initialized");
  196. } else {
  197. // Safety: if the generation is > 0, then the
  198. // slot has already been initialized.
  199. recycle.recycle(ptr.assume_init_mut());
  200. test_println!("-> recycled");
  201. }
  202. }
  203. return Ok(Ref {
  204. ptr,
  205. new_state: tail + 1,
  206. slot,
  207. });
  208. }
  209. Err(actual) => {
  210. // Someone else took this slot and advanced the tail
  211. // index. Try to claim the new tail.
  212. tail = actual;
  213. backoff.spin();
  214. continue;
  215. }
  216. }
  217. }
  218. if test_dbg!(state.wrapping_add(self.gen) == tail + 1) {
  219. // fake RMW op to placate loom. this should be equivalent to
  220. // doing a relaxed load after a SeqCst fence (per Godbolt
  221. // https://godbolt.org/z/zb15qfEa9), however, loom understands
  222. // this correctly, while it does not understand an explicit
  223. // SeqCst fence and a load.
  224. // XXX(eliza): this makes me DEEPLY UNCOMFORTABLE but if it's a
  225. // load it gets reordered differently in the model checker lmao...
  226. let head = test_dbg!(self.head.fetch_or(0, SeqCst));
  227. if test_dbg!(head.wrapping_add(self.gen) == tail) {
  228. test_println!("channel full");
  229. return Err(TrySendError::Full(()));
  230. }
  231. backoff.spin();
  232. } else {
  233. backoff.spin_yield();
  234. }
  235. tail = test_dbg!(self.tail.load(Acquire));
  236. }
  237. }
  238. #[inline(always)]
  239. fn pop_ref<'slots, T, S>(&self, slots: &'slots S) -> Result<Ref<'slots, T>, TrySendError>
  240. where
  241. S: ops::Index<usize, Output = Slot<T>> + ?Sized,
  242. {
  243. test_println!("pop_ref");
  244. let mut backoff = Backoff::new();
  245. let mut head = self.head.load(Relaxed);
  246. loop {
  247. test_dbg!(head);
  248. let (idx, gen) = self.idx_gen(head);
  249. test_dbg!(idx);
  250. test_dbg!(gen);
  251. let slot = &slots[idx];
  252. let state = test_dbg!(slot.state.load(Acquire));
  253. // If the slot's state is ahead of the head index by one, we can pop
  254. // it.
  255. if test_dbg!(state == head + 1) {
  256. let next_head = self.next(idx, gen);
  257. match test_dbg!(self
  258. .head
  259. .compare_exchange_weak(head, next_head, SeqCst, Acquire))
  260. {
  261. Ok(_) => {
  262. test_println!("claimed head slot [{}]", idx);
  263. return Ok(Ref {
  264. new_state: head.wrapping_add(self.gen),
  265. ptr: slot.value.get_mut(),
  266. slot,
  267. });
  268. }
  269. Err(actual) => {
  270. head = actual;
  271. backoff.spin();
  272. continue;
  273. }
  274. }
  275. }
  276. if test_dbg!(state == head) {
  277. // fake RMW op to placate loom. this should be equivalent to
  278. // doing a relaxed load after a SeqCst fence (per Godbolt
  279. // https://godbolt.org/z/zb15qfEa9), however, loom understands
  280. // this correctly, while it does not understand an explicit
  281. // SeqCst fence and a load.
  282. // XXX(eliza): this makes me DEEPLY UNCOMFORTABLE but if it's a
  283. // load it gets reordered differently in the model checker lmao...
  284. let tail = test_dbg!(self.tail.fetch_or(0, SeqCst));
  285. if test_dbg!(tail & !self.closed == head) {
  286. return if test_dbg!(tail & self.closed != 0) {
  287. Err(TrySendError::Closed(()))
  288. } else {
  289. test_println!("--> channel full!");
  290. Err(TrySendError::Full(()))
  291. };
  292. }
  293. if test_dbg!(backoff.done_spinning()) {
  294. return Err(TrySendError::Full(()));
  295. }
  296. backoff.spin();
  297. } else {
  298. backoff.spin_yield();
  299. }
  300. head = test_dbg!(self.head.load(Acquire));
  301. }
  302. }
  303. fn len(&self) -> usize {
  304. loop {
  305. let tail = self.tail.load(SeqCst);
  306. let head = self.head.load(SeqCst);
  307. if self.tail.load(SeqCst) == tail {
  308. let (head_idx, _) = self.idx_gen(head);
  309. let (tail_idx, _) = self.idx_gen(tail);
  310. return match head_idx.cmp(&tail_idx) {
  311. cmp::Ordering::Less => tail_idx - head_idx,
  312. cmp::Ordering::Greater => self.capacity - head_idx + tail_idx,
  313. _ if tail == head => 0,
  314. _ => self.capacity,
  315. };
  316. }
  317. }
  318. }
  319. fn drop_slots<T>(&mut self, slots: &mut [Slot<T>]) {
  320. debug_assert!(
  321. !self.has_dropped_slots,
  322. "tried to drop slots twice! core={:#?}",
  323. self
  324. );
  325. if self.has_dropped_slots {
  326. return;
  327. }
  328. let tail = self.tail.load(SeqCst);
  329. let (idx, gen) = self.idx_gen(tail);
  330. let num_initialized = if gen > 0 { self.capacity() } else { idx };
  331. for slot in &mut slots[..num_initialized] {
  332. unsafe {
  333. slot.value
  334. .with_mut(|value| ptr::drop_in_place((*value).as_mut_ptr()));
  335. }
  336. }
  337. self.has_dropped_slots = true;
  338. }
  339. }
  340. impl Drop for Core {
  341. fn drop(&mut self) {
  342. debug_assert!(
  343. self.has_dropped_slots,
  344. "tried to drop Core without dropping slots! core={:#?}",
  345. self
  346. );
  347. }
  348. }
  349. // === impl Ref ===
  350. impl<T> Ref<'_, T> {
  351. #[inline]
  352. fn with<U>(&self, f: impl FnOnce(&T) -> U) -> U {
  353. self.ptr.with(|value| unsafe {
  354. // Safety: if a `Ref` exists, we have exclusive ownership of the
  355. // slot. A `Ref` is only created if the slot has already been
  356. // initialized.
  357. // TODO(eliza): use `MaybeUninit::assume_init_ref` here once it's
  358. // supported by `tracing-appender`'s MSRV.
  359. f(&*(&*value).as_ptr())
  360. })
  361. }
  362. #[inline]
  363. fn with_mut<U>(&mut self, f: impl FnOnce(&mut T) -> U) -> U {
  364. self.ptr.with(|value| unsafe {
  365. // Safety: if a `Ref` exists, we have exclusive ownership of the
  366. // slot.
  367. // TODO(eliza): use `MaybeUninit::assume_init_mut` here once it's
  368. // supported by `tracing-appender`'s MSRV.
  369. f(&mut *(&mut *value).as_mut_ptr())
  370. })
  371. }
  372. }
  373. impl<T> ops::Deref for Ref<'_, T> {
  374. type Target = T;
  375. #[inline]
  376. fn deref(&self) -> &Self::Target {
  377. unsafe {
  378. // Safety: if a `Ref` exists, we have exclusive ownership of the
  379. // slot. A `Ref` is only created if the slot has already been
  380. // initialized.
  381. &*self.ptr.deref().as_ptr()
  382. }
  383. }
  384. }
  385. impl<T> ops::DerefMut for Ref<'_, T> {
  386. #[inline]
  387. fn deref_mut(&mut self) -> &mut Self::Target {
  388. unsafe {
  389. // Safety: if a `Ref` exists, we have exclusive ownership of the
  390. // slot. A `Ref` is only created if the slot has already been
  391. // initialized.
  392. &mut *self.ptr.deref().as_mut_ptr()
  393. }
  394. }
  395. }
  396. impl<T> Drop for Ref<'_, T> {
  397. #[inline]
  398. fn drop(&mut self) {
  399. test_println!("drop Ref<{}>", core::any::type_name::<T>());
  400. test_dbg!(self.slot.state.store(test_dbg!(self.new_state), Release));
  401. }
  402. }
  403. impl<T: fmt::Debug> fmt::Debug for Ref<'_, T> {
  404. fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
  405. self.with(|val| fmt::Debug::fmt(val, f))
  406. }
  407. }
  408. impl<T: fmt::Display> fmt::Display for Ref<'_, T> {
  409. fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
  410. self.with(|val| fmt::Display::fmt(val, f))
  411. }
  412. }
  413. impl<T: fmt::Write> fmt::Write for Ref<'_, T> {
  414. #[inline]
  415. fn write_str(&mut self, s: &str) -> fmt::Result {
  416. self.with_mut(|val| val.write_str(s))
  417. }
  418. #[inline]
  419. fn write_char(&mut self, c: char) -> fmt::Result {
  420. self.with_mut(|val| val.write_char(c))
  421. }
  422. #[inline]
  423. fn write_fmt(&mut self, f: fmt::Arguments<'_>) -> fmt::Result {
  424. self.with_mut(|val| val.write_fmt(f))
  425. }
  426. }
  427. unsafe impl<T: Send> Send for Ref<'_, T> {}
  428. unsafe impl<T: Send> Sync for Ref<'_, T> {}
  429. // === impl Slot ===
  430. impl<T> Slot<T> {
  431. #[cfg(feature = "alloc")]
  432. pub(crate) fn make_boxed_array(capacity: usize) -> Box<[Self]> {
  433. (0..capacity).map(|i| Slot::new(i)).collect()
  434. }
  435. feature! {
  436. #![all(feature = "static", not(all(loom, test)))]
  437. const EMPTY: Self = Self::new(usize::MAX);
  438. pub(crate) const fn make_static_array<const CAPACITY: usize>() -> [Self; CAPACITY] {
  439. let mut array = [Self::EMPTY; CAPACITY];
  440. let mut i = 0;
  441. while i < CAPACITY {
  442. array[i] = Self::new(i);
  443. i += 1;
  444. }
  445. array
  446. }
  447. }
  448. #[cfg(not(all(loom, test)))]
  449. const fn new(idx: usize) -> Self {
  450. Self {
  451. value: UnsafeCell::new(MaybeUninit::uninit()),
  452. state: AtomicUsize::new(idx),
  453. }
  454. }
  455. #[cfg(all(loom, test))]
  456. fn new(idx: usize) -> Self {
  457. Self {
  458. value: UnsafeCell::new(MaybeUninit::uninit()),
  459. state: AtomicUsize::new(idx),
  460. }
  461. }
  462. }
  463. unsafe impl<T: Sync> Sync for Slot<T> {}
  464. impl<T> fmt::Debug for Full<T> {
  465. fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
  466. f.write_str("Full(..)")
  467. }
  468. }
  469. impl<T> fmt::Display for Full<T> {
  470. fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
  471. f.write_str("channel full")
  472. }
  473. }