queue.rs 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399
  1. use crate::{
  2. loom::{
  3. atomic::{AtomicUsize, Ordering::*},
  4. cell::UnsafeCell,
  5. },
  6. util::{mutex::Mutex, CachePadded},
  7. wait::{Notify, WaitResult},
  8. };
  9. use core::{fmt, marker::PhantomPinned, pin::Pin, ptr::NonNull};
  10. /// A queue of waiters ([`core::task::Waker`]s or [`std;:thread::Thread`]s)
  11. /// implemented as a doubly-linked intrusive list.
  12. ///
  13. /// The *[intrusive]* aspect of this list is important, as it means that it does
  14. /// not allocate memory. Instead, nodes in the linked list are stored in the
  15. /// futures of tasks trying to wait for capacity, or on the stack frames of
  16. /// threads blocking on channel capacity.
  17. ///
  18. /// Using an intrusive list is critical if the MPSC is to *truly* have bounded
  19. /// memory use: if the channel has capacity for a bounded number of *messages*,
  20. /// but when it is full, must allocate memory to store the threads or tasks that
  21. /// are waiting to send messages, it does not actually bound the maximum amount
  22. /// of memory used by the channel! This is, potentially, quite bad, as
  23. /// increasing backpressure can cause out of memory conditions.
  24. ///
  25. /// Also, not allocating can have a performance advantage, since tasks waiting
  26. /// for capacity will never hit `malloc` or `free`. This reduces the overhead of
  27. /// waiting, but (more importantly!) it also avoids allocator churn that can
  28. /// hurt `malloc` performance for *other* parts of the program that need to
  29. /// allocate memory.
  30. ///
  31. /// Finally, this will allow using a `ThingBuf` MPSC channel with exclusively
  32. /// static allocations, making it much easier to use in embedded systems or on
  33. /// bare metal when an allocator may not always be available.
  34. ///
  35. /// However, the intrusive linked list introduces one new danger: because
  36. /// futures can be *cancelled*, and the linked list nodes live within the
  37. /// futures trying to wait for channel capacity, we *must* ensure that the node
  38. /// is unlinked from the list before dropping a cancelled future. Failure to do
  39. /// so would result in the list containing dangling pointers. Therefore, we must
  40. /// use a *doubly-linked* list, so that nodes can edit both the previous and
  41. /// next node when they have to remove themselves. This is kind of a bummer, as
  42. /// it means we can't use something nice like this [intrusive queue by Dmitry
  43. /// Vyukov][2], and there are not really practical designs for lock-free
  44. /// doubly-linked lists that don't rely on some kind of deferred reclamation
  45. /// scheme such as hazard pointers or QSBR.
  46. ///
  47. /// Instead, we just stick a mutex around the linked list, which must be
  48. /// acquired to pop nodes from it, or for nodes to remove themselves when
  49. /// futures are cancelled. This is a bit sad, but the critical sections for this
  50. /// mutex are short enough that we still get pretty good performance despite it.
  51. ///
  52. /// A spinlock is used on `no_std` platforms; [`std::sync::Mutex`] is used when
  53. /// the standard library is available.
  54. /// [intrusive]: https://fuchsia.dev/fuchsia-src/development/languages/c-cpp/fbl_containers_guide/introduction
  55. /// [2]: https://www.1024cores.net/home/lock-free-algorithms/queues/intrusive-mpsc-node-based-queue
  56. #[derive(Debug)]
  57. pub(crate) struct WaitQueue<T> {
  58. /// The wait queue's state variable. The first bit indicates whether the
  59. /// queue is closed; the remainder is a counter of notifications assigned to
  60. /// the queue because no waiters were currently available to be woken.
  61. ///
  62. /// These stored notifications are "consumed" when new waiters are
  63. /// registered; those waiters will be woken immediately rather than being
  64. /// enqueued to wait.
  65. state: CachePadded<AtomicUsize>,
  66. /// The linked list of waiters.
  67. ///
  68. /// # Safety
  69. ///
  70. /// This is protected by a mutex; the mutex *must* be acquired when
  71. /// manipulating the linked list, OR when manipulating waiter nodes that may
  72. /// be linked into the list. If a node is known to not be linked, it is safe
  73. /// to modify that node (such as by setting or unsetting its
  74. /// `Waker`/`Thread`) without holding the lock; otherwise, it may be
  75. /// modified through the list, so the lock must be held when modifying the
  76. /// node.
  77. ///
  78. /// A spinlock is used on `no_std` platforms; [`std::sync::Mutex`] is used
  79. /// when the standard library is available.
  80. list: Mutex<List<T>>,
  81. }
  82. /// A waiter node which may be linked into a wait queue.
  83. #[derive(Debug)]
  84. pub(crate) struct Waiter<T> {
  85. node: UnsafeCell<Node<T>>,
  86. }
  87. #[derive(Debug)]
  88. #[pin_project::pin_project]
  89. struct Node<T> {
  90. next: Link<Waiter<T>>,
  91. prev: Link<Waiter<T>>,
  92. waiter: Option<T>,
  93. // This type is !Unpin due to the heuristic from:
  94. // <https://github.com/rust-lang/rust/pull/82834>
  95. #[pin]
  96. _pin: PhantomPinned,
  97. }
  98. type Link<T> = Option<NonNull<T>>;
  99. struct List<T> {
  100. head: Link<Waiter<T>>,
  101. tail: Link<Waiter<T>>,
  102. }
  103. const CLOSED: usize = 1 << 0;
  104. const ONE_QUEUED: usize = 1 << 1;
  105. impl<T: Notify + Unpin> WaitQueue<T> {
  106. pub(crate) fn new() -> Self {
  107. Self {
  108. state: CachePadded(AtomicUsize::new(0)),
  109. list: Mutex::new(List::new()),
  110. }
  111. }
  112. pub(crate) fn push_waiter(
  113. &self,
  114. waiter: &mut Option<Pin<&mut Waiter<T>>>,
  115. register: impl FnOnce(&mut Option<T>),
  116. ) -> WaitResult {
  117. test_println!("WaitQueue::push_waiter()");
  118. let mut state = test_dbg!(self.state.load(Acquire));
  119. // First, go ahead and check if the queue has been closed. This is
  120. // necessary even if `waiter` is `None`, as the waiter may already be
  121. // queued, and just checking if the list was closed.
  122. // TODO(eliza): that actually kind of sucks lol...
  123. if test_dbg!(state & CLOSED != 0) {
  124. return WaitResult::Closed;
  125. }
  126. // If we were actually called with a real waiter, try to queue the node.
  127. if test_dbg!(waiter.is_some()) {
  128. // Is there at least one queued notification assigned to the wait
  129. // queue? If so, try to consume that now, rather than waiting.
  130. while test_dbg!(state >= ONE_QUEUED) {
  131. match test_dbg!(self.state.compare_exchange_weak(
  132. state,
  133. // Subtract one queued notification from the current state.
  134. state.saturating_sub(ONE_QUEUED),
  135. AcqRel,
  136. Acquire
  137. )) {
  138. // We consumed a queued notification! Return `Notified`
  139. // now, so that we'll try our operation again, instead
  140. // of waiting.
  141. Ok(_) => return WaitResult::Notified,
  142. // Someone else was updating the state variable. Try again
  143. // --- but they may have closed the queue, or consumed the last
  144. // queued notification!
  145. Err(actual) => state = test_dbg!(actual),
  146. }
  147. }
  148. // Okay, did the queue close while we were trying to consume a
  149. // queued notification?
  150. if test_dbg!(state & CLOSED != 0) {
  151. return WaitResult::Closed;
  152. }
  153. // There are no queued notifications to consume, and the queue is
  154. // still open. Therefore, it's time to actually push the waiter to
  155. // the queue...finally lol :)
  156. // Grab the lock...
  157. let mut list = self.list.lock();
  158. // Okay, we have the lock...but what if someone changed the state
  159. // WHILE we were waiting to acquire the lock? isn't concurrent
  160. // programming great? :) :) :) :) :)
  161. state = test_dbg!(self.state.load(Acquire));
  162. // Try to consume a queued notification *again* in case any were
  163. // assigned to the queue while we were waiting to acquire the lock.
  164. while test_dbg!(state >= ONE_QUEUED) {
  165. match test_dbg!(self.state.compare_exchange(
  166. state,
  167. state.saturating_sub(ONE_QUEUED),
  168. AcqRel,
  169. Acquire
  170. )) {
  171. Ok(_) => return WaitResult::Notified,
  172. Err(actual) => state = actual,
  173. }
  174. }
  175. // We didn't consume a queued notification. it is now, finally, time
  176. // to actually put the waiter in the linked list. wasn't that fun?
  177. if let Some(waiter) = waiter.take() {
  178. test_println!("WaitQueue::push_waiter -> pushing {:p}", waiter);
  179. // Now that we have the lock, register the `Waker` or `Thread`
  180. // to
  181. unsafe {
  182. // Safety: the waker can only be registered while holding
  183. // the wait queue lock. We are holding the lock, so no one
  184. // else will try to touch the waker until we're done.
  185. waiter.with_node(|node| register(&mut node.waiter));
  186. }
  187. list.push_front(waiter);
  188. } else {
  189. // XXX(eliza): in practice we can't ever get here because of the
  190. // `if` above. this should probably be `unreachable_unchecked`
  191. // but i'm a coward...
  192. unreachable!("this could be unchecked...")
  193. }
  194. }
  195. WaitResult::Wait
  196. }
  197. /// Notify one waiter from the queue. If there are no waiters in the linked
  198. /// list, the notification is instead assigned to the queue itself.
  199. ///
  200. /// If a waiter was popped from the queue, returns `true`. Otherwise, if the
  201. /// notification was assigned to the queue, returns `false`.
  202. pub(crate) fn notify(&self) -> bool {
  203. test_println!("WaitQueue::notify()");
  204. if let Some(node) = test_dbg!(self.list.lock().pop_back()) {
  205. node.notify();
  206. true
  207. } else {
  208. test_dbg!(self.state.fetch_add(ONE_QUEUED, Release));
  209. false
  210. }
  211. }
  212. /// Close the queue, notifying all waiting tasks.
  213. pub(crate) fn close(&self) {
  214. test_println!("WaitQueue::close()");
  215. test_dbg!(self.state.fetch_or(CLOSED, Release));
  216. let mut list = self.list.lock();
  217. while let Some(node) = list.pop_back() {
  218. node.notify();
  219. }
  220. }
  221. }
  222. // === impl Waiter ===
  223. impl<T: Notify> Waiter<T> {
  224. pub(crate) fn new() -> Self {
  225. Self {
  226. node: UnsafeCell::new(Node {
  227. next: None,
  228. prev: None,
  229. waiter: None,
  230. _pin: PhantomPinned,
  231. }),
  232. }
  233. }
  234. #[inline]
  235. fn notify(self: Pin<&mut Self>) -> bool {
  236. let waker = unsafe { self.with_node(|node| node.waiter.take()) };
  237. if let Some(waker) = waker {
  238. waker.notify();
  239. return true;
  240. }
  241. false
  242. }
  243. }
  244. impl<T> Waiter<T> {
  245. unsafe fn with_node<U>(&self, f: impl FnOnce(&mut Node<T>) -> U) -> U {
  246. self.node.with_mut(|node| f(&mut *node))
  247. }
  248. unsafe fn set_prev(&mut self, prev: Option<NonNull<Waiter<T>>>) {
  249. self.node.with_mut(|node| (*node).prev = prev);
  250. }
  251. unsafe fn take_prev(&mut self) -> Option<NonNull<Waiter<T>>> {
  252. self.node.with_mut(|node| (*node).prev.take())
  253. }
  254. unsafe fn take_next(&mut self) -> Option<NonNull<Waiter<T>>> {
  255. self.node.with_mut(|node| (*node).next.take())
  256. }
  257. }
  258. impl<T: Notify> Waiter<T> {
  259. pub(crate) fn remove(self: Pin<&mut Self>, q: &WaitQueue<T>) {
  260. test_println!("Waiter::remove({:p})", self);
  261. unsafe {
  262. // Safety: removing a node is unsafe even when the list is locked,
  263. // because there's no way to guarantee that the node is part of
  264. // *this* list. However, the potential callers of this method will
  265. // never have access to any other linked lists, so we can just kind
  266. // of assume that this is safe.
  267. q.list.lock().remove(self);
  268. }
  269. }
  270. }
  271. unsafe impl<T: Send> Send for Waiter<T> {}
  272. unsafe impl<T: Send> Sync for Waiter<T> {}
  273. // === impl List ===
  274. impl<T> List<T> {
  275. fn new() -> Self {
  276. Self {
  277. head: None,
  278. tail: None,
  279. }
  280. }
  281. fn push_front(&mut self, waiter: Pin<&mut Waiter<T>>) {
  282. unsafe {
  283. waiter.with_node(|node| {
  284. node.next = self.head;
  285. node.prev = None;
  286. })
  287. }
  288. let ptr = unsafe { NonNull::from(Pin::into_inner_unchecked(waiter)) };
  289. debug_assert_ne!(self.head, Some(ptr), "tried to push the same waiter twice!");
  290. if let Some(mut head) = self.head.replace(ptr) {
  291. unsafe {
  292. head.as_mut().set_prev(Some(ptr));
  293. }
  294. }
  295. if self.tail.is_none() {
  296. self.tail = Some(ptr);
  297. }
  298. }
  299. fn pop_back(&mut self) -> Option<Pin<&mut Waiter<T>>> {
  300. let mut last = self.tail?;
  301. test_println!("List::pop_back() -> {:p}", last);
  302. unsafe {
  303. let last = last.as_mut();
  304. let prev = last.take_prev();
  305. match prev {
  306. Some(mut prev) => {
  307. let _ = prev.as_mut().take_next();
  308. }
  309. None => self.head = None,
  310. }
  311. self.tail = prev;
  312. last.take_next();
  313. Some(Pin::new_unchecked(last))
  314. }
  315. }
  316. unsafe fn remove(&mut self, node: Pin<&mut Waiter<T>>) {
  317. let node_ref = node.get_unchecked_mut();
  318. let prev = node_ref.take_prev();
  319. let next = node_ref.take_next();
  320. let ptr = NonNull::from(node_ref);
  321. match prev {
  322. Some(mut prev) => prev.as_mut().with_node(|prev| {
  323. debug_assert_eq!(prev.next, Some(ptr));
  324. prev.next = next;
  325. }),
  326. None => {
  327. debug_assert_eq!(self.head, Some(ptr));
  328. self.head = next;
  329. }
  330. }
  331. match next {
  332. Some(mut next) => next.as_mut().with_node(|next| {
  333. debug_assert_eq!(next.prev, Some(ptr));
  334. next.prev = prev;
  335. }),
  336. None => {
  337. debug_assert_eq!(self.tail, Some(ptr));
  338. self.tail = prev;
  339. }
  340. }
  341. }
  342. }
  343. impl<T> fmt::Debug for List<T> {
  344. fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
  345. f.debug_struct("List")
  346. .field("head", &self.head)
  347. .field("tail", &self.tail)
  348. .finish()
  349. }
  350. }
  351. unsafe impl<T: Send> Send for List<T> {}