Browse Source

docs: a bunch of internal implementation docs (#18)

Signed-off-by: Eliza Weisman <[email protected]>
Eliza Weisman 3 years ago
parent
commit
e8f7107ba0
7 changed files with 179 additions and 15 deletions
  1. 5 1
      Cargo.toml
  2. 14 0
      src/lib.rs
  3. 2 2
      src/mpsc.rs
  4. 1 0
      src/thingbuf.rs
  5. 40 1
      src/wait.rs
  6. 3 3
      src/wait/cell.rs
  7. 114 8
      src/wait/queue.rs

+ 5 - 1
Cargo.toml

@@ -37,4 +37,8 @@ lto = true
 opt-level = 3
 
 [patch.crates-io]
-loom = { git = "https://github.com/tokio-rs/loom", branch = "eliza/fix-double-panic-in-drop" }
+loom = { git = "https://github.com/tokio-rs/loom", branch = "eliza/fix-double-panic-in-drop" }
+
+[package.metadata.docs.rs]
+all-features = true
+rustdoc-args = ["--cfg", "docsrs"]

+ 14 - 0
src/lib.rs

@@ -1,3 +1,4 @@
+#![cfg_attr(docsrs, doc = include_str!("../README.md"))]
 #![cfg_attr(not(feature = "std"), no_std)]
 #![cfg_attr(docsrs, feature(doc_cfg))]
 use core::{cmp, fmt, mem::MaybeUninit, ops};
@@ -41,6 +42,19 @@ pub struct Ref<'slot, T> {
 
 pub struct Full<T = ()>(T);
 
+/// State variables for the atomic ring buffer algorithm.
+///
+/// This is separated from the actual storage array used to implement the ring
+/// buffer, so that it can be used by both the dynamically-sized implementation
+/// (`Box<[Slot<T>>]`) and the statically-sized (`[T; const usize]`)
+/// implementation.
+///
+/// A `Core`, when provided with a reference to the storage array, knows how to
+/// actually perform the ring buffer operations on that array.
+///
+/// The atomic ring buffer is based on the [MPMC bounded queue from 1024cores][1].
+///
+/// [1]: https://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue
 #[derive(Debug)]
 struct Core {
     head: CachePadded<AtomicUsize>,

+ 2 - 2
src/mpsc.rs

@@ -139,7 +139,7 @@ impl<T: Default, N: Notify + Unpin> Inner<T, N> {
             let pushed_waiter = self.tx_wait.push_waiter(&mut node, &mut register);
 
             match test_dbg!(pushed_waiter) {
-                WaitResult::TxClosed => {
+                WaitResult::Closed => {
                     // the channel closed while we were registering the waiter!
                     return Poll::Ready(Err(Closed(())));
                 }
@@ -189,7 +189,7 @@ impl<T: Default, N: Notify + Unpin> Inner<T, N> {
                     try_poll_recv!();
                     return Poll::Pending;
                 }
-                WaitResult::TxClosed => {
+                WaitResult::Closed => {
                     // the channel is closed (all the receivers are dropped).
                     // however, there may be messages left in the queue. try
                     // popping from the queue until it's empty.

+ 1 - 0
src/thingbuf.rs

@@ -6,6 +6,7 @@ use core::{fmt, ptr};
 #[cfg(all(loom, test))]
 mod tests;
 
+#[cfg_attr(docsrs, doc(cfg(feature = "alloc")))]
 pub struct ThingBuf<T> {
     pub(crate) core: Core,
     pub(crate) slots: Box<[Slot<T>]>,

+ 40 - 1
src/wait.rs

@@ -1,3 +1,24 @@
+//! Waiter cells and queues to allow threads/tasks to wait for notifications.
+//!
+//! The MPSC channel enhance a ThingBuf --- which implements a non-blocking
+//! queue --- with the capacity to wait. A `ThingBuf` only has `try_send` and
+//! `try_recv`-like operations, which immediately return in the case where the
+//! queue is full or empty, respectively. In a MPSC channel, the sender is
+//! provided with the ability to *wait* until there is capacity in the queue to
+//! send its message. Similarly, a receiver can wait until the channel has
+//! messages to receive.
+//!
+//! This module implements two types of structure for waiting: a wait *cell*,
+//! which stores a *single* waiting thread or task, and a wait *queue*, which
+//! stores a queue of waiting tasks. Since the channel is a MPSC (multiple
+//! producer, single consumer) channel, the wait queue is used to store waiting
+//! senders, while the wait cell is used to store a waiting receiver (as there
+//! is only ever one thread/task waiting to receive from a channel).
+//!
+//! This module is generic over the _type_ of the waiter; they may either be
+//! [`core::task::Waker`]s, for the async MPSC, or [`std::thread::Thread`]s, for
+//! the blocking MPSC. In either case, the role played by these types is fairly
+//! analogous.
 use crate::util::panic::UnwindSafe;
 use core::{fmt, task::Waker};
 
@@ -8,10 +29,28 @@ pub(crate) use self::{cell::WaitCell, queue::WaitQueue};
 #[cfg(feature = "std")]
 use crate::loom::thread;
 
+/// What happened while trying to register to wait.
 #[derive(Debug, Eq, PartialEq)]
 pub(crate) enum WaitResult {
+    /// The waiter was registered, and the calling thread/task can now wait.
     Wait,
-    TxClosed,
+    /// The channel is closed.
+    ///
+    /// When registering a sender, this means the receiver was dropped; when
+    /// registering a receiver, this means that all senders have been dropped.
+    /// In this case, the waiting thread/task should *not* wait, because it will
+    /// never be woken back up.
+    ///
+    /// If this is returned, the waiter (`Thread`/`Waker`) was *not* registered.
+    Closed,
+    /// We were notified while trying to register a waiter.
+    ///
+    /// This means that, while we were trying to access the wait cell or wait
+    /// queue, the other side of the channel sent a notification. In this case,
+    /// we don't need to wait, and we can try the operation we were attempting
+    /// again, as it may now be ready.
+    ///
+    /// If this is returned, the waiter (`Thread`/`Waker`) was *not* registered.
     Notified,
 }
 

+ 3 - 3
src/wait/cell.rs

@@ -64,7 +64,7 @@ impl<T: Notify> WaitCell<T> {
         match test_dbg!(self.compare_exchange(State::WAITING, State::PARKING, Acquire)) {
             // someone else is notifying the receiver, so don't park!
             Err(actual) if test_dbg!(actual.contains(State::TX_CLOSED)) => {
-                return WaitResult::TxClosed;
+                return WaitResult::Closed;
             }
             Err(actual) if test_dbg!(actual.contains(State::NOTIFYING)) => {
                 f().notify();
@@ -125,7 +125,7 @@ impl<T: Notify> WaitCell<T> {
                 }
 
                 if test_dbg!(state.contains(State::TX_CLOSED)) {
-                    WaitResult::TxClosed
+                    WaitResult::Closed
                 } else {
                     WaitResult::Notified
                 }
@@ -288,7 +288,7 @@ mod tests {
                 return Poll::Ready(());
             }
 
-            if res == WaitResult::Notified || res == WaitResult::TxClosed {
+            if res == WaitResult::Notified || res == WaitResult::Closed {
                 return Poll::Ready(());
             }
 

+ 114 - 8
src/wait/queue.rs

@@ -9,12 +9,80 @@ use crate::{
 
 use core::{fmt, marker::PhantomPinned, pin::Pin, ptr::NonNull};
 
+/// A queue of waiters ([`core::task::Waker`]s or [`std;:thread::Thread`]s)
+/// implemented as a doubly-linked intrusive list.
+///
+/// The *[intrusive]* aspect of this list is important, as it means that it does
+/// not allocate memory. Instead, nodes in the linked list are stored in the
+/// futures of tasks trying to wait for capacity, or on the stack frames of
+/// threads blocking on channel capacity.
+///
+/// Using an intrusive list is critical if the MPSC is to *truly* have bounded
+/// memory use: if the channel has capacity for a bounded number of *messages*,
+/// but when it is full, must allocate memory to store the threads or tasks that
+/// are waiting to send messages, it does not actually bound the maximum amount
+/// of memory used by the channel! This is, potentially, quite bad, as
+/// increasing backpressure can cause out of memory conditions.
+///
+/// Also, not allocating can have a performance advantage, since tasks waiting
+/// for capacity will never hit `malloc` or `free`. This reduces the overhead of
+/// waiting, but (more importantly!) it also avoids allocator churn that can
+/// hurt `malloc` performance for *other* parts of the program that need to
+/// allocate memory.
+///
+/// Finally, this will allow using a `ThingBuf` MPSC channel with exclusively
+/// static allocations, making it much easier to use in embedded systems or on
+/// bare metal when an allocator may not always be available.
+///
+/// However, the intrusive linked list introduces one new danger: because
+/// futures can be *cancelled*, and the linked list nodes live within the
+/// futures trying to wait for channel capacity, we *must* ensure that the node
+/// is unlinked from the list before dropping a cancelled future. Failure to do
+/// so would result in the list containing dangling pointers. Therefore, we must
+/// use a *doubly-linked* list, so that nodes can edit both the previous and
+/// next node when they have to remove themselves. This is kind of a bummer, as
+/// it means we can't use something nice like this [intrusive queue by Dmitry
+/// Vyukov][2], and there are not really practical designs for lock-free
+/// doubly-linked lists that don't rely on some kind of deferred reclamation
+/// scheme such as hazard pointers or QSBR.
+///
+/// Instead, we just stick a mutex around the linked list, which must be
+/// acquired to pop nodes from it, or for nodes to remove themselves when
+/// futures are cancelled. This is a bit sad, but the critical sections for this
+/// mutex are short enough that we still get pretty good performance despite it.
+///
+/// A spinlock is used on `no_std` platforms; [`std::sync::Mutex`] is used when
+/// the standard library is available.
+/// [intrusive]: https://fuchsia.dev/fuchsia-src/development/languages/c-cpp/fbl_containers_guide/introduction
+/// [2]: https://www.1024cores.net/home/lock-free-algorithms/queues/intrusive-mpsc-node-based-queue
 #[derive(Debug)]
 pub(crate) struct WaitQueue<T> {
+    /// The wait queue's state variable. The first bit indicates whether the
+    /// queue is closed; the remainder is a counter of notifications assigned to
+    /// the queue because no waiters were currently available to be woken.
+    ///
+    /// These stored notifications are "consumed" when new waiters are
+    /// registered; those waiters will be woken immediately rather than being
+    /// enqueued to wait.
     state: CachePadded<AtomicUsize>,
+    /// The linked list of waiters.
+    ///
+    /// # Safety
+    ///
+    /// This is protected by a mutex; the mutex *must* be acquired when
+    /// manipulating the linked list, OR when manipulating waiter nodes that may
+    /// be linked into the list. If a node is known to not be linked, it is safe
+    /// to modify that node (such as by setting or unsetting its
+    /// `Waker`/`Thread`) without holding the lock; otherwise, it may be
+    /// modified through the list, so the lock must be held when modifying the
+    /// node.
+    ///
+    /// A spinlock is used on `no_std` platforms; [`std::sync::Mutex`] is used
+    /// when the standard library is available.
     list: Mutex<List<T>>,
 }
 
+/// A waiter node which may be linked into a wait queue.
 #[derive(Debug)]
 pub(crate) struct Waiter<T> {
     node: UnsafeCell<Node<T>>,
@@ -57,31 +125,59 @@ impl<T: Notify + Unpin> WaitQueue<T> {
         register: impl FnOnce(&mut Option<T>),
     ) -> WaitResult {
         test_println!("WaitQueue::push_waiter()");
+
         let mut state = test_dbg!(self.state.load(Acquire));
+
+        // First, go ahead and check if the queue has been closed. This is
+        // necessary even if `waiter` is `None`, as the waiter may already be
+        // queued, and just checking if the list was closed.
+        // TODO(eliza): that actually kind of sucks lol...
         if test_dbg!(state & CLOSED != 0) {
-            return WaitResult::TxClosed;
+            return WaitResult::Closed;
         }
 
+        // If we were actually called with a real waiter, try to queue the node.
         if test_dbg!(waiter.is_some()) {
-            while test_dbg!(state > CLOSED) {
+            // Is there at least one queued notification assigned to the wait
+            // queue? If so, try to consume that now, rather than waiting.
+            while test_dbg!(state >= ONE_QUEUED) {
                 match test_dbg!(self.state.compare_exchange_weak(
                     state,
+                    // Subtract one queued notification from the current state.
                     state.saturating_sub(ONE_QUEUED),
                     AcqRel,
                     Acquire
                 )) {
+                    // We consumed a queued notification! Return `Notified`
+                    // now, so that we'll try our operation again, instead
+                    // of waiting.
                     Ok(_) => return WaitResult::Notified,
+                    // Someone else was updating the state variable. Try again
+                    // --- but they may have closed the queue, or consumed the last
+                    // queued notification!
                     Err(actual) => state = test_dbg!(actual),
                 }
             }
 
+            // Okay, did the queue close while we were trying to consume a
+            // queued notification?
             if test_dbg!(state & CLOSED != 0) {
-                return WaitResult::TxClosed;
+                return WaitResult::Closed;
             }
 
+            // There are no queued notifications to consume, and the queue is
+            // still open. Therefore, it's time to actually push the waiter to
+            // the queue...finally lol :)
+
+            // Grab the lock...
             let mut list = self.list.lock();
-            // Reload the state inside the lock.
+
+            // Okay, we have the lock...but what if someone changed the state
+            // WHILE we were waiting to acquire the lock? isn't concurrent
+            // programming great? :) :) :) :) :)
             state = test_dbg!(self.state.load(Acquire));
+            // Try to consume a queued notification *again* in case any were
+            // assigned to the queue while we were waiting to acquire the lock.
             while test_dbg!(state >= ONE_QUEUED) {
                 match test_dbg!(self.state.compare_exchange(
                     state,
@@ -94,9 +190,14 @@ impl<T: Notify + Unpin> WaitQueue<T> {
                 }
             }
 
+            // We didn't consume a queued notification. it is now, finally, time
+            // to actually put the waiter in the linked list. wasn't that fun?
+
             if let Some(waiter) = waiter.take() {
                 test_println!("WaitQueue::push_waiter -> pushing {:p}", waiter);
 
+                // Now that we have the lock, register the `Waker` or `Thread`
+                // to
                 unsafe {
                     // Safety: the waker can only be registered while holding
                     // the wait queue lock. We are holding the lock, so no one
@@ -105,6 +206,9 @@ impl<T: Notify + Unpin> WaitQueue<T> {
                 }
                 list.push_front(waiter);
             } else {
+                // XXX(eliza): in practice we can't ever get here because of the
+                // `if` above. this should probably be `unreachable_unchecked`
+                // but i'm a coward...
                 unreachable!("this could be unchecked...")
             }
         }
@@ -112,6 +216,11 @@ impl<T: Notify + Unpin> WaitQueue<T> {
         WaitResult::Wait
     }
 
+    /// Notify one waiter from the queue. If there are no waiters in the linked
+    /// list, the notification is instead assigned to the queue itself.
+    ///
+    /// If a waiter was popped from the queue, returns `true`. Otherwise, if the
+    /// notification was assigned to the queue, returns `false`.
     pub(crate) fn notify(&self) -> bool {
         test_println!("WaitQueue::notify()");
         if let Some(node) = test_dbg!(self.list.lock().pop_back()) {
@@ -123,6 +232,7 @@ impl<T: Notify + Unpin> WaitQueue<T> {
         }
     }
 
+    /// Close the queue, notifying all waiting tasks.
     pub(crate) fn close(&self) {
         test_println!("WaitQueue::close()");
         test_dbg!(self.state.fetch_or(CLOSED, Release));
@@ -167,10 +277,6 @@ impl<T> Waiter<T> {
         self.node.with_mut(|node| (*node).prev = prev);
     }
 
-    // unsafe fn set_next(&mut self, next: Option<NonNull<Waiter<T>>>) {
-    //     self.node.with_mut(|node| (*node).next = next);
-    // }
-
     unsafe fn take_prev(&mut self) -> Option<NonNull<Waiter<T>>> {
         self.node.with_mut(|node| (*node).prev.take())
     }