static_thingbuf.rs 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520
  1. use crate::{
  2. recycling::{self, Recycle},
  3. Core, Full, Ref, Slot,
  4. };
  5. use core::fmt;
  6. /// A statically allocated, fixed-size lock-free multi-producer multi-consumer
  7. /// queue.
  8. ///
  9. /// This type is identical to the [`ThingBuf`] type, except that the queue's
  10. /// capacity is controlled by a const generic parameter, rather than dynamically
  11. /// allocated at runtime. This means that a `StaticThingBuf` can be used without
  12. /// requiring heap allocation, and can be stored in a `static`.
  13. ///
  14. /// `StaticThingBuf` will likely be particularly useful for embedded systems,
  15. /// bare-metal code like operating system kernels or device drivers, and other
  16. /// contexts where allocations may not be available. It can also be used to
  17. /// implement a global queue that lives for the entire lifespan of a program,
  18. /// without having to pass around reference-counted [`std::sync::Arc`] pointers.
  19. ///
  20. /// # Examples
  21. ///
  22. /// Storing a `StaticThingBuf` in a `static`:
  23. ///
  24. /// ```rust
  25. /// use thingbuf::StaticThingBuf;
  26. ///
  27. /// static MY_THINGBUF: StaticThingBuf<i32, 8> = StaticThingBuf::new();
  28. ///
  29. /// fn main() {
  30. /// assert_eq!(MY_THINGBUF.push(1), Ok(()));
  31. /// assert_eq!(MY_THINGBUF.pop(), Some(1));
  32. /// }
  33. /// ```
  34. ///
  35. /// Constructing a `StaticThingBuf` on the stack:
  36. ///
  37. /// ```rust
  38. /// use thingbuf::StaticThingBuf;
  39. ///
  40. /// let q = StaticThingBuf::<_, 2>::new();
  41. ///
  42. /// // Push some values to the queue.
  43. /// q.push(1).unwrap();
  44. /// q.push(2).unwrap();
  45. ///
  46. /// // Now, the queue is at capacity.
  47. /// assert!(q.push(3).is_err());
  48. /// ```
  49. ///
  50. /// # Allocation
  51. ///
  52. /// A `StaticThingBuf` is a fixed-size, array-based queue. Elements in the queue are
  53. /// stored in a single array whose capacity is specified at compile time as a
  54. /// `const` generic parameter. This means that a `StaticThingBuf` requires *no*
  55. /// heap allocations. Calling [`StaticThingBuf::new`], [`push`] or [`pop`] will
  56. /// never allocate or deallocate memory.
  57. ///
  58. /// If the size of the queue is dynamic and not known at compile-time, the [`ThingBuf`]
  59. /// type, which performs a single heap allocation, can be used instead (provided
  60. /// that the `alloc` feature flag is enabled).
  61. ///
  62. /// ## Reusing Allocations
  63. ///
  64. /// Of course, if the *elements* in the queue are themselves heap-allocated
  65. /// (such as `String`s or `Vec`s), heap allocations and deallocations may still
  66. /// occur when those types are created or dropped. However, `StaticThingBuf` also
  67. /// provides an API for enqueueing and dequeueing elements *by reference*. In
  68. /// some use cases, this API can be used to reduce allocations for queue elements.
  69. ///
  70. /// As an example, consider the case where multiple threads in a program format
  71. /// log messages and send them to a dedicated worker thread that writes those
  72. /// messages to a file. A naive implementation might look something like this:
  73. ///
  74. /// ```rust
  75. /// use thingbuf::StaticThingBuf;
  76. /// use std::{sync::Arc, fmt, thread, fs::File, error::Error, io::Write};
  77. ///
  78. /// static LOG_QUEUE: StaticThingBuf<String, 1048> = StaticThingBuf::new();
  79. ///
  80. /// // Called by application threads to log a message.
  81. /// fn log_event(message: &dyn fmt::Debug) {
  82. /// // Format the log line to a `String`.
  83. /// let line = format!("{:?}\n", message);
  84. /// // Send the string to the worker thread.
  85. /// let _ = LOG_QUEUE.push(line);
  86. /// // If the queue was full, ignore the error and drop the log line.
  87. /// }
  88. ///
  89. /// fn main() -> Result<(), Box<dyn Error>> {
  90. /// # // wrap the actual code in a function that's never called so that running
  91. /// # // the test never actually creates the log file.
  92. /// # fn docs() -> Result<(), Box<dyn Error>> {
  93. /// // Spawn the background worker thread.
  94. /// let mut file = File::create("myapp.log")?;
  95. /// thread::spawn(move || {
  96. /// use std::io::Write;
  97. /// loop {
  98. /// // Pop from the queue, and write each log line to the file.
  99. /// while let Some(line) = LOG_QUEUE.pop() {
  100. /// file.write_all(line.as_bytes()).unwrap();
  101. /// }
  102. ///
  103. /// // No more messages in the queue!
  104. /// file.flush().unwrap();
  105. /// thread::yield_now();
  106. /// }
  107. /// });
  108. ///
  109. /// // ...
  110. /// # Ok(())
  111. /// # }
  112. /// # Ok(())
  113. /// }
  114. /// ```
  115. ///
  116. /// With this design, however, new `String`s are allocated for every message
  117. /// that's logged, and then are immediately deallocated once they are written to
  118. /// the file. This can have a negative performance impact.
  119. ///
  120. /// Using `ThingBuf`'s [`push_ref`] and [`pop_ref`] methods, this code can be
  121. /// redesigned to _reuse_ `String` allocations in place. With these methods,
  122. /// rather than moving an element by-value into the queue when enqueueing, we
  123. /// instead reserve the rights to _mutate_ a slot in the queue in place,
  124. /// returning a [`Ref`]. Similarly, when dequeueing, we also recieve a [`Ref`]
  125. /// that allows reading from (and mutating) the dequeued element. This allows
  126. /// the queue to _own_ the set of `String`s used for formatting log messages,
  127. /// which are cleared in place and the existing allocation reused.
  128. ///
  129. /// The rewritten code might look like this:
  130. ///
  131. /// ```rust
  132. /// use thingbuf::StaticThingBuf;
  133. /// use std::{sync::Arc, fmt, thread, fs::File, error::Error};
  134. ///
  135. /// static LOG_QUEUE: StaticThingBuf<String, 1048> = StaticThingBuf::new();
  136. ///
  137. /// // Called by application threads to log a message.
  138. /// fn log_event( message: &dyn fmt::Debug) {
  139. /// use std::fmt::Write;
  140. ///
  141. /// // Reserve a slot in the queue to write to.
  142. /// if let Ok(mut slot) = LOG_QUEUE.push_ref() {
  143. /// // Clear the string in place, retaining the allocated capacity.
  144. /// slot.clear();
  145. /// // Write the log message to the string.
  146. /// write!(&mut *slot, "{:?}\n", message);
  147. /// }
  148. /// // Otherwise, if `push_ref` returns an error, the queue is full;
  149. /// // ignore this log line.
  150. /// }
  151. ///
  152. /// fn main() -> Result<(), Box<dyn Error>> {
  153. /// # // wrap the actual code in a function that's never called so that running
  154. /// # // the test never actually creates the log file.
  155. /// # fn docs() -> Result<(), Box<dyn Error>> {
  156. /// // Spawn the background worker thread.
  157. /// let mut file = File::create("myapp.log")?;
  158. /// thread::spawn(move || {
  159. /// use std::io::Write;
  160. /// loop {
  161. /// // Pop from the queue, and write each log line to the file.
  162. /// while let Some(line) = LOG_QUEUE.pop_ref() {
  163. /// file.write_all(line.as_bytes()).unwrap();
  164. /// }
  165. ///
  166. /// // No more messages in the queue!
  167. /// file.flush().unwrap();
  168. /// thread::yield_now();
  169. /// }
  170. /// });
  171. ///
  172. /// // ...
  173. /// # Ok(())
  174. /// # }
  175. /// # Ok(())
  176. /// }
  177. /// ```
  178. ///
  179. /// In this implementation, the strings will only be reallocated if their
  180. /// current capacity is not large enough to store the formatted representation
  181. /// of the log message.
  182. ///
  183. /// When using a `ThingBuf` in this manner, it can be thought of as a
  184. /// combination of a concurrent queue and an [object pool].
  185. ///
  186. /// [`push`]: Self::push
  187. /// [`pop`]: Self::pop
  188. /// [`push_ref`]: Self::push_ref
  189. /// [`pop_ref`]: Self::pop_ref
  190. /// [`ThingBuf`]: crate::ThingBuf
  191. /// [vyukov]: https://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue
  192. /// [object pool]: https://en.wikipedia.org/wiki/Object_pool_pattern
  193. #[cfg_attr(docsrs, doc(cfg(feature = "static")))]
  194. pub struct StaticThingBuf<T, const CAP: usize, R = recycling::DefaultRecycle> {
  195. core: Core,
  196. recycle: R,
  197. slots: [Slot<T>; CAP],
  198. }
  199. // === impl ThingBuf ===
  200. #[cfg(not(test))]
  201. impl<T, const CAP: usize> StaticThingBuf<T, CAP> {
  202. /// Returns a new `StaticThingBuf` with space for `capacity` elements.
  203. pub const fn new() -> Self {
  204. Self::with_recycle(recycling::DefaultRecycle::new())
  205. }
  206. }
  207. impl<T, const CAP: usize, R> StaticThingBuf<T, CAP, R> {
  208. pub const fn with_recycle(recycle: R) -> Self {
  209. StaticThingBuf {
  210. core: Core::new(CAP),
  211. recycle,
  212. slots: Slot::make_static_array::<CAP>(),
  213. }
  214. }
  215. }
  216. impl<T, const CAP: usize, R> StaticThingBuf<T, CAP, R> {
  217. /// Returns the *total* capacity of this queue. This includes both
  218. /// occupied and unoccupied entries.
  219. ///
  220. /// To determine the queue's remaining *unoccupied* capacity, use
  221. /// [`remaining`] instead.
  222. ///
  223. /// # Examples
  224. ///
  225. /// ```
  226. /// # use thingbuf::StaticThingBuf;
  227. /// static MY_THINGBUF: StaticThingBuf::<usize, 100> = StaticThingBuf::new();
  228. ///
  229. /// assert_eq!(MY_THINGBUF.capacity(), 100);
  230. /// ```
  231. ///
  232. /// Even after pushing several messages to the queue, the capacity remains
  233. /// the same:
  234. /// ```
  235. /// # use thingbuf::StaticThingBuf;
  236. /// static MY_THINGBUF: StaticThingBuf::<usize, 100> = StaticThingBuf::new();
  237. ///
  238. /// *MY_THINGBUF.push_ref().unwrap() = 1;
  239. /// *MY_THINGBUF.push_ref().unwrap() = 2;
  240. /// *MY_THINGBUF.push_ref().unwrap() = 3;
  241. ///
  242. /// assert_eq!(MY_THINGBUF.capacity(), 100);
  243. /// ```
  244. ///
  245. /// [`remaining`]: Self::remaining
  246. #[inline]
  247. pub fn capacity(&self) -> usize {
  248. CAP
  249. }
  250. /// Returns the unoccupied capacity of the queue (i.e., how many additional
  251. /// elements can be enqueued before the queue will be full).
  252. ///
  253. /// This is equivalent to subtracting the queue's [`len`] from its [`capacity`].
  254. ///
  255. /// [`len`]: Self::len
  256. /// [`capacity`]: Self::capacity
  257. pub fn remaining(&self) -> usize {
  258. self.capacity() - self.len()
  259. }
  260. /// Returns the number of elements in the queue
  261. ///
  262. /// To determine the queue's remaining *unoccupied* capacity, use
  263. /// [`remaining`] instead.
  264. ///
  265. /// # Examples
  266. ///
  267. /// ```
  268. /// # use thingbuf::StaticThingBuf;
  269. /// static MY_THINGBUF: StaticThingBuf::<usize, 100> = StaticThingBuf::new();
  270. ///
  271. /// assert_eq!(MY_THINGBUF.len(), 0);
  272. ///
  273. /// *MY_THINGBUF.push_ref().unwrap() = 1;
  274. /// *MY_THINGBUF.push_ref().unwrap() = 2;
  275. /// *MY_THINGBUF.push_ref().unwrap() = 3;
  276. /// assert_eq!(MY_THINGBUF.len(), 3);
  277. ///
  278. /// let _ = MY_THINGBUF.pop_ref();
  279. /// assert_eq!(MY_THINGBUF.len(), 2);
  280. /// ```
  281. ///
  282. /// [`remaining`]: Self::remaining
  283. #[inline]
  284. pub fn len(&self) -> usize {
  285. self.core.len()
  286. }
  287. /// Returns `true` if there are currently no elements in this `StaticThingBuf`.
  288. ///
  289. /// # Examples
  290. ///
  291. /// ```
  292. /// # use thingbuf::StaticThingBuf;
  293. /// static MY_THINGBUF: StaticThingBuf::<usize, 100> = StaticThingBuf::new();
  294. ///
  295. /// assert!(MY_THINGBUF.is_empty());
  296. ///
  297. /// *MY_THINGBUF.push_ref().unwrap() = 1;
  298. /// assert!(!MY_THINGBUF.is_empty());
  299. ///
  300. /// let _ = MY_THINGBUF.pop_ref();
  301. /// assert!(MY_THINGBUF.is_empty());
  302. /// ```
  303. #[inline]
  304. pub fn is_empty(&self) -> bool {
  305. self.len() == 0
  306. }
  307. }
  308. impl<T, const CAP: usize, R> StaticThingBuf<T, CAP, R>
  309. where
  310. R: Recycle<T>,
  311. {
  312. /// Reserves a slot to push an element into the queue, returning a [`Ref`] that
  313. /// can be used to write to that slot.
  314. ///
  315. /// This can be used to reuse allocations for queue elements in place,
  316. /// by clearing previous data prior to writing. In order to ensure
  317. /// allocations can be reused in place, elements should be dequeued using
  318. /// [`pop_ref`] rather than [`pop`]. If values are expensive to produce,
  319. /// `push_ref` can also be used to avoid producing a value if there is no
  320. /// capacity for it in the queue.
  321. ///
  322. /// For values that don't own heap allocations, or heap allocated values
  323. /// that cannot be reused in place, [`push`] can also be used.
  324. ///
  325. /// # Returns
  326. ///
  327. /// - `Ok(`[`Ref`]`)` if there is space for a new element
  328. /// - `Err(`[`Full`]`)`, if there is no capacity remaining in the queue
  329. ///
  330. /// # Examples
  331. ///
  332. /// ```rust
  333. /// use thingbuf::StaticThingBuf;
  334. ///
  335. /// static MY_THINGBUF: StaticThingBuf<i32, 1> = StaticThingBuf::new();
  336. ///
  337. /// // Reserve a `Ref` and enqueue an element.
  338. /// *MY_THINGBUF.push_ref().expect("queue should have capacity") = 10;
  339. ///
  340. /// // Now that the queue has one element in it, a subsequent `push_ref`
  341. /// // call will fail.
  342. /// assert!(MY_THINGBUF.push_ref().is_err());
  343. /// ```
  344. ///
  345. /// Avoiding producing an expensive element when the queue is at capacity:
  346. ///
  347. /// ```rust
  348. /// use thingbuf::StaticThingBuf;
  349. ///
  350. /// static MESSAGES: StaticThingBuf<Message, 16> = StaticThingBuf::new();
  351. ///
  352. /// #[derive(Clone, Default)]
  353. /// struct Message {
  354. /// // ...
  355. /// }
  356. ///
  357. /// fn make_expensive_message() -> Message {
  358. /// // Imagine this function performs some costly operation, or acquires
  359. /// // a limited resource...
  360. /// # Message::default()
  361. /// }
  362. ///
  363. /// fn enqueue_message() {
  364. /// loop {
  365. /// match MESSAGES.push_ref() {
  366. // // If `push_ref` returns `Ok`, we've reserved a slot in
  367. /// // the queue for our message, so it's okay to generate
  368. /// // the expensive message.
  369. /// Ok(mut slot) => {
  370. /// *slot = make_expensive_message();
  371. /// return;
  372. /// },
  373. ///
  374. /// // If there's no space in the queue, avoid generating
  375. /// // an expensive message that won't be sent.
  376. /// Err(_) => {
  377. /// // Wait until the queue has capacity...
  378. /// std::thread::yield_now();
  379. /// }
  380. /// }
  381. /// }
  382. /// }
  383. /// ```
  384. ///
  385. /// [`pop`]: Self::pop
  386. /// [`pop_ref`]: Self::pop_ref
  387. /// [`push`]: Self::push_ref
  388. pub fn push_ref(&self) -> Result<Ref<'_, T>, Full> {
  389. self.core
  390. .push_ref(&self.slots, &self.recycle)
  391. .map_err(|e| match e {
  392. crate::mpsc::TrySendError::Full(()) => Full(()),
  393. _ => unreachable!(),
  394. })
  395. }
  396. /// Attempt to enqueue an element by value.
  397. ///
  398. /// If the queue is full, the element is returned in the [`Full`] error.
  399. ///
  400. /// Unlike [`push_ref`], this method does not enable the reuse of previously
  401. /// allocated elements. If allocations are being reused by using
  402. /// [`push_ref`] and [`pop_ref`], this method should not be used, as it will
  403. /// drop pooled allocations.
  404. ///
  405. /// # Returns
  406. ///
  407. /// - `Ok(())` if the element was enqueued
  408. /// - `Err(`[`Full`]`)`, containing the value, if there is no capacity
  409. /// remaining in the queue
  410. ///
  411. /// [`push_ref`]: Self::push_ref
  412. /// [`pop_ref`]: Self::pop_ref
  413. #[inline]
  414. pub fn push(&self, val: T) -> Result<(), Full<T>> {
  415. match self.push_ref() {
  416. Err(_) => Err(Full(val)),
  417. Ok(mut slot) => {
  418. *slot = val;
  419. Ok(())
  420. }
  421. }
  422. }
  423. /// Reserves a slot to push an element into the queue, and invokes the
  424. /// provided function `f` with a mutable reference to that element.
  425. ///
  426. /// # Returns
  427. ///
  428. /// - `Ok(U)` containing the return value of the provided function, if the
  429. /// element was enqueued
  430. /// - `Err(`[`Full`]`)`, if there is no capacity remaining in the queue
  431. #[inline]
  432. pub fn push_with<U>(&self, f: impl FnOnce(&mut T) -> U) -> Result<U, Full> {
  433. self.push_ref().map(|mut r| r.with_mut(f))
  434. }
  435. /// Dequeue the first element in the queue, returning a [`Ref`] that can be
  436. /// used to read from (or mutate) the element.
  437. ///
  438. /// **Note**: A `StaticThingBuf` is *not* a "broadcast"-style queue. Each element
  439. /// is dequeued a single time. Once a thread has dequeued a given element,
  440. /// it is no longer the head of the queue.
  441. ///
  442. /// This can be used to reuse allocations for queue elements in place,
  443. /// by clearing previous data prior to writing. In order to ensure
  444. /// allocations can be reused in place, elements should be enqueued using
  445. /// [`push_ref`] rather than [`push`].
  446. ///
  447. /// For values that don't own heap allocations, or heap allocated values
  448. /// that cannot be reused in place, [`pop`] can also be used.
  449. ///
  450. /// # Returns
  451. ///
  452. /// - `Some(`[`Ref<T>`](Ref)`)` if an element was dequeued
  453. /// - `None` if there are no elements in the queue
  454. ///
  455. /// [`push_ref`]: Self::push_ref
  456. /// [`push`]: Self::push
  457. /// [`pop`]: Self::pop
  458. pub fn pop_ref(&self) -> Option<Ref<'_, T>> {
  459. self.core.pop_ref(&self.slots).ok()
  460. }
  461. /// Dequeue the first element in the queue *by value*, moving it out of the
  462. /// queue.
  463. ///
  464. /// **Note**: A `StaticThingBuf` is *not* a "broadcast"-style queue. Each element
  465. /// is dequeued a single time. Once a thread has dequeued a given element,
  466. /// it is no longer the head of the queue.
  467. ///
  468. /// # Returns
  469. ///
  470. /// - `Some(T)` if an element was dequeued
  471. /// - `None` if there are no elements in the queue
  472. #[inline]
  473. pub fn pop(&self) -> Option<T> {
  474. let mut slot = self.pop_ref()?;
  475. Some(recycling::take(&mut *slot, &self.recycle))
  476. }
  477. /// Dequeue the first element in the queue by reference, and invoke the
  478. /// provided function `f` with a mutable reference to the dequeued element.
  479. ///
  480. /// # Returns
  481. ///
  482. /// - `Some(U)` containing the return value of the provided function, if the
  483. /// element was dequeued
  484. /// - `None` if the queue is empty
  485. #[inline]
  486. pub fn pop_with<U>(&self, f: impl FnOnce(&mut T) -> U) -> Option<U> {
  487. self.pop_ref().map(|mut r| r.with_mut(f))
  488. }
  489. }
  490. impl<T, const CAP: usize, R: fmt::Debug> fmt::Debug for StaticThingBuf<T, CAP, R> {
  491. fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
  492. f.debug_struct("StaticThingBuf")
  493. .field("len", &self.len())
  494. .field("slots", &format_args!("[...]"))
  495. .field("core", &self.core)
  496. .field("recycle", &self.recycle)
  497. .finish()
  498. }
  499. }
  500. impl<T, const CAP: usize, R> Drop for StaticThingBuf<T, CAP, R> {
  501. fn drop(&mut self) {
  502. self.core.drop_slots(&mut self.slots[..]);
  503. }
  504. }