浏览代码

initial commit

Signed-off-by: Eliza Weisman <eliza@buoyant.io>
Eliza Weisman 3 年之前
当前提交
84b9ce7057
共有 8 个文件被更改,包括 565 次插入0 次删除
  1. 1 0
      .envrc
  2. 2 0
      .gitignore
  3. 14 0
      Cargo.toml
  4. 31 0
      default.nix
  5. 17 0
      shell.nix
  6. 331 0
      src/lib.rs
  7. 136 0
      src/loom.rs
  8. 33 0
      src/tests.rs

+ 1 - 0
.envrc

@@ -0,0 +1 @@
+use nix;

+ 2 - 0
.gitignore

@@ -0,0 +1,2 @@
+/target
+Cargo.lock

+ 14 - 0
Cargo.toml

@@ -0,0 +1,14 @@
+[package]
+name = "thingbuf"
+version = "0.1.0"
+edition = "2021"
+
+# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
+
+[features]
+std = []
+
+[dependencies]
+
+[dev-dependencies]
+loom = "0.5"

+ 31 - 0
default.nix

@@ -0,0 +1,31 @@
+scope@{ pkgs ? import <nixpkgs> { } }:
+
+pkgs.buildEnv {
+  name = "thingbuf";
+  paths = with pkgs; [
+    git
+    bash
+    direnv
+    binutils
+    stdenv
+    bashInteractive
+    cacert
+    glibc
+    gcc
+    cmake
+    rustup
+    pkg-config
+    (glibcLocales.override { locales = [ "en_US.UTF-8" ]; })
+    gnumake
+    autoconf
+  ];
+  passthru = {
+    LOCALE_ARCHIVE = "${pkgs.glibcLocales}/lib/locale/locale-archive";
+    LC_ALL = "en_US.UTF-8";
+    SSL_CERT_FILE = "${pkgs.cacert}/etc/ssl/certs/ca-bundle.crt";
+    GIT_SSL_CAINFO = "${pkgs.cacert}/etc/ssl/certs/ca-bundle.crt";
+    CURL_CA_BUNDLE = "${pkgs.cacert}/etc/ca-bundle.crt";
+    CARGO_TERM_COLOR = "always";
+    RUST_BACKTRACE = "1";
+  };
+}

+ 17 - 0
shell.nix

@@ -0,0 +1,17 @@
+scope@{ pkgs ? import <nixpkgs> { } }:
+
+let env = (import ./default.nix scope);
+
+in pkgs.mkShell {
+  name = "thingbuf";
+  LOCALE_ARCHIVE = "${pkgs.glibcLocales}/lib/locale/locale-archive";
+  LC_ALL = "en_US.UTF-8";
+  OPENSSL_DIR = "${pkgs.openssl.dev}";
+  OPENSSL_LIB_DIR = "${pkgs.openssl.out}/lib";
+  SSL_CERT_FILE = "${pkgs.cacert}/etc/ssl/certs/ca-bundle.crt";
+  GIT_SSL_CAINFO = "${pkgs.cacert}/etc/ssl/certs/ca-bundle.crt";
+  CURL_CA_BUNDLE = "${pkgs.cacert}/etc/ca-bundle.crt";
+  CARGO_TERM_COLOR = "always";
+  RUST_BACKTRACE = "1";
+  buildInputs = [ (import ./default.nix { inherit pkgs; }) ];
+}

+ 331 - 0
src/lib.rs

@@ -0,0 +1,331 @@
+use core::ops::{Deref, DerefMut};
+
+macro_rules! test_println {
+    ($($arg:tt)*) => {
+        if cfg!(test) {
+            if std::thread::panicking() {
+                // getting the thread ID while panicking doesn't seem to play super nicely with loom's
+                // mock lazy_static...
+                println!("[PANIC {:>17}:{:<3}] {}", file!(), line!(), format_args!($($arg)*))
+            } else {
+                #[cfg(test)]
+                println!("[{:?} {:>17}:{:<3}] {}", crate::loom::thread::current().id(), file!(), line!(), format_args!($($arg)*))
+            }
+        }
+    }
+}
+
+macro_rules! test_dbg {
+    ($e:expr) => {
+        match $e {
+            e => {
+                #[cfg(test)]
+                test_println!("{} = {:?}", stringify!($e), &e);
+                e
+            }
+        }
+    };
+}
+
+mod loom;
+
+#[cfg(test)]
+mod tests;
+
+use crate::loom::{
+    atomic::{AtomicUsize, Ordering},
+    UnsafeCell,
+};
+
+pub struct ThingBuf<T> {
+    head: CachePadded<AtomicUsize>,
+    tail: CachePadded<AtomicUsize>,
+    gen: usize,
+    gen_mask: usize,
+    idx_mask: usize,
+    slots: Box<[Slot<T>]>,
+}
+
+pub struct Ref<'slot, T> {
+    slot: &'slot Slot<T>,
+    new_state: usize,
+}
+
+#[derive(Debug)]
+pub struct AtCapacity(usize);
+
+struct Slot<T> {
+    value: UnsafeCell<T>,
+    state: AtomicUsize,
+}
+
+#[derive(Debug)]
+struct Backoff(u8);
+
+#[cfg_attr(any(target_arch = "x86_64", target_arch = "aarch64"), repr(align(128)))]
+#[cfg_attr(
+    not(any(target_arch = "x86_64", target_arch = "aarch64")),
+    repr(align(64))
+)]
+#[derive(Clone, Copy, Default, Hash, PartialEq, Eq, Debug)]
+struct CachePadded<T>(T);
+
+// === impl ThingBuf ===
+
+impl<T: Default> ThingBuf<T> {
+    pub fn new(capacity: usize) -> Self {
+        Self::new_with(capacity, T::default)
+    }
+}
+
+impl<T> ThingBuf<T> {
+    pub fn new_with(capacity: usize, mut initializer: impl FnMut() -> T) -> Self {
+        assert!(capacity > 0);
+        let slots = (0..capacity)
+            .map(|idx| Slot {
+                state: AtomicUsize::new(idx),
+                value: UnsafeCell::new(initializer()),
+            })
+            .collect();
+        let gen = (capacity + 1).next_power_of_two();
+        let idx_mask = gen - 1;
+        let gen_mask = !(gen - 1);
+        Self {
+            head: CachePadded(AtomicUsize::new(0)),
+            tail: CachePadded(AtomicUsize::new(0)),
+            gen,
+            gen_mask,
+            idx_mask,
+            slots,
+        }
+    }
+
+    #[inline]
+    fn idx_gen(&self, val: usize) -> (usize, usize) {
+        (val & self.idx_mask, val & self.gen_mask)
+    }
+
+    #[inline]
+    fn next(&self, idx: usize, gen: usize) -> usize {
+        // Are we still in the same generation?
+        if idx + 1 < self.slots.len() {
+            // If we're still within the current generation, increment the index
+            // by 1.
+            (idx | gen) + 1
+        } else {
+            // We've reached the end of the current lap, wrap the index around
+            // to 0.
+            gen.wrapping_add(self.gen)
+        }
+    }
+
+    pub fn push_ref(&self) -> Result<Ref<'_, T>, AtCapacity> {
+        let mut backoff = Backoff::new();
+        let mut tail = self.tail.load(Ordering::Relaxed);
+
+        loop {
+            let (idx, gen) = self.idx_gen(tail);
+            test_dbg!(idx);
+            test_dbg!(gen);
+            let slot = &self.slots[idx];
+            let state = slot.state.load(Ordering::Acquire);
+
+            if state == tail {
+                // Move the tail index forward by 1.
+                let next_tail = self.next(idx, gen);
+                match self.tail.compare_exchange(
+                    tail,
+                    next_tail,
+                    Ordering::AcqRel,
+                    Ordering::Relaxed,
+                ) {
+                    Ok(_) => {
+                        return Ok(Ref {
+                            new_state: tail + 1,
+                            slot,
+                        })
+                    }
+                    Err(actual) => {
+                        tail = actual;
+                        backoff.spin();
+                        continue;
+                    }
+                }
+            }
+
+            if state.wrapping_add(self.gen) == tail + 1 {
+                if self.head.load(Ordering::Acquire).wrapping_add(self.gen) == tail {
+                    return Err(AtCapacity(self.slots.len()));
+                }
+
+                backoff.spin();
+            } else {
+                backoff.spin_yield();
+            }
+
+            tail = self.tail.load(Ordering::Relaxed)
+        }
+    }
+
+    pub fn pop_ref(&self) -> Option<Ref<'_, T>> {
+        let mut backoff = Backoff::new();
+        let mut head = self.head.load(Ordering::Relaxed);
+
+        loop {
+            let (idx, gen) = self.idx_gen(head);
+            test_dbg!(idx);
+            test_dbg!(gen);
+            let slot = &self.slots[idx];
+            let state = slot.state.load(Ordering::Acquire);
+
+            // If the slot's state is ahead of the head index by one, we can pop
+            // it.
+            if state == head + 1 {
+                let next_head = self.next(idx, gen);
+                match self.head.compare_exchange(
+                    head,
+                    next_head,
+                    Ordering::AcqRel,
+                    Ordering::Relaxed,
+                ) {
+                    Ok(_) => {
+                        return Some(Ref {
+                            new_state: head + 1,
+                            slot,
+                        })
+                    }
+                    Err(actual) => {
+                        head = actual;
+                        backoff.spin();
+                        continue;
+                    }
+                }
+            }
+
+            if state == head {
+                let tail = self.tail.load(Ordering::Acquire);
+
+                if tail == head {
+                    return None;
+                }
+
+                backoff.spin();
+            } else {
+                backoff.spin_yield();
+            }
+
+            head = self.head.load(Ordering::Relaxed);
+        }
+    }
+
+    pub fn capacity(&self) -> usize {
+        self.slots.len()
+    }
+}
+
+// === impl Ref ===
+
+impl<T> Ref<'_, T> {
+    #[inline]
+    pub fn with<U>(&self, f: impl Fn(&T) -> U) -> U {
+        self.slot.value.with(|value| unsafe {
+            // Safety: if a `Ref` exists, we have exclusive ownership of the slot.
+            f(&*value)
+        })
+    }
+
+    #[inline]
+    pub fn with_mut<U>(&mut self, f: impl Fn(&mut T) -> U) -> U {
+        self.slot.value.with_mut(|value| unsafe {
+            // Safety: if a `Ref` exists, we have exclusive ownership of the slot.
+            f(&mut *value)
+        })
+    }
+}
+
+impl<T> Drop for Ref<'_, T> {
+    #[inline]
+    fn drop(&mut self) {
+        self.slot.state.store(self.new_state, Ordering::Release);
+    }
+}
+
+// impl<T> Deref for Writing<'_, T> {
+//     type Target = T;
+
+//     #[inline]
+//     fn deref(&self) -> &T {
+//         unsafe {
+//             // Safety: if a `Writing` exists, it must exclusively own the
+//             // slot it points at.
+//             self.slot.value.with()
+//         }
+//     }
+// }
+
+// impl<T> DerefMut for Writing<'_, T> {
+//     fn deref_mut(&mut self) -> &mut T {
+//         unsafe {
+//             // Safety: if a `Writing` exists, it must exclusively own the
+//             // slot it points at.
+//             self.slot.value.get_mut()
+//         }
+//     }
+// }
+
+// === impl Backoff ===
+
+impl Backoff {
+    const MAX_SPINS: u8 = 6;
+    const MAX_YIELDS: u8 = 10;
+    #[inline]
+    fn new() -> Self {
+        Self(0)
+    }
+
+    #[inline]
+    fn spin(&mut self) {
+        for _ in 0..test_dbg!(1 << self.0.min(Self::MAX_SPINS)) {
+            loom::hint::spin_loop();
+
+            test_println!("spin_loop_hint");
+        }
+
+        if self.0 <= Self::MAX_SPINS {
+            self.0 += 1;
+        }
+    }
+
+    #[inline]
+    fn spin_yield(&mut self) {
+        if self.0 <= Self::MAX_SPINS || cfg!(not(any(feature = "std", test))) {
+            for _ in 0..1 << self.0 {
+                loom::hint::spin_loop();
+                test_println!("spin_loop_hint");
+            }
+        }
+
+        #[cfg(any(test, feature = "std"))]
+        loom::thread::yield_now();
+
+        if self.0 <= Self::MAX_YIELDS {
+            self.0 += 1;
+        }
+    }
+}
+
+// === impl CachePadded ===
+
+impl<T> Deref for CachePadded<T> {
+    type Target = T;
+
+    fn deref(&self) -> &T {
+        &self.0
+    }
+}
+
+impl<T> DerefMut for CachePadded<T> {
+    fn deref_mut(&mut self) -> &mut T {
+        &mut self.0
+    }
+}

+ 136 - 0
src/loom.rs

@@ -0,0 +1,136 @@
+pub(crate) use self::inner::*;
+
+#[cfg(test)]
+mod inner {
+    pub(crate) mod atomic {
+        pub use loom::sync::atomic::*;
+        pub use std::sync::atomic::Ordering;
+    }
+    pub(crate) use loom::{cell::UnsafeCell, hint, model, sync, thread};
+
+    pub(crate) mod alloc {
+        #![allow(dead_code)]
+        use loom::alloc;
+        use std::fmt;
+        /// Track allocations, detecting leaks
+        ///
+        /// This is a version of `loom::alloc::Track` that adds a missing
+        /// `Default` impl.
+        pub struct Track<T>(alloc::Track<T>);
+
+        impl<T> Track<T> {
+            /// Track a value for leaks
+            #[inline(always)]
+            pub fn new(value: T) -> Track<T> {
+                Track(alloc::Track::new(value))
+            }
+
+            /// Get a reference to the value
+            #[inline(always)]
+            pub fn get_ref(&self) -> &T {
+                self.0.get_ref()
+            }
+
+            /// Get a mutable reference to the value
+            #[inline(always)]
+            pub fn get_mut(&mut self) -> &mut T {
+                self.0.get_mut()
+            }
+
+            /// Stop tracking the value for leaks
+            #[inline(always)]
+            pub fn into_inner(self) -> T {
+                self.0.into_inner()
+            }
+        }
+
+        impl<T: fmt::Debug> fmt::Debug for Track<T> {
+            fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+                self.0.fmt(f)
+            }
+        }
+
+        impl<T: Default> Default for Track<T> {
+            fn default() -> Self {
+                Self::new(T::default())
+            }
+        }
+    }
+}
+
+#[cfg(not(test))]
+mod inner {
+    #![allow(dead_code)]
+    pub(crate) use core::sync::atomic;
+
+    #[cfg(feature = "std")]
+    use std::{sync, thread};
+
+    pub(crate) mod hint {
+        #[inline(always)]
+        pub(crate) fn spin_loop() {
+            // MSRV: std::hint::spin_loop() stabilized in 1.49.0
+            #[allow(deprecated)]
+            super::atomic::spin_loop_hint()
+        }
+    }
+
+    #[derive(Debug)]
+    pub(crate) struct UnsafeCell<T>(core::cell::UnsafeCell<T>);
+
+    impl<T> UnsafeCell<T> {
+        pub fn new(data: T) -> UnsafeCell<T> {
+            UnsafeCell(core::cell::UnsafeCell::new(data))
+        }
+
+        #[inline(always)]
+        pub fn with<F, R>(&self, f: F) -> R
+        where
+            F: FnOnce(*const T) -> R,
+        {
+            f(self.0.get())
+        }
+
+        #[inline(always)]
+        pub fn with_mut<F, R>(&self, f: F) -> R
+        where
+            F: FnOnce(*mut T) -> R,
+        {
+            f(self.0.get())
+        }
+    }
+
+    pub(crate) mod alloc {
+        /// Track allocations, detecting leaks
+        #[derive(Debug, Default)]
+        pub struct Track<T> {
+            value: T,
+        }
+
+        impl<T> Track<T> {
+            /// Track a value for leaks
+            #[inline(always)]
+            pub fn new(value: T) -> Track<T> {
+                Track { value }
+            }
+
+            /// Get a reference to the value
+            #[inline(always)]
+            pub fn get_ref(&self) -> &T {
+                &self.value
+            }
+
+            /// Get a mutable reference to the value
+            #[inline(always)]
+            pub fn get_mut(&mut self) -> &mut T {
+                &mut self.value
+            }
+
+            /// Stop tracking the value for leaks
+            #[inline(always)]
+            pub fn into_inner(self) -> T {
+                self.value
+            }
+        }
+    }
+}

+ 33 - 0
src/tests.rs

@@ -0,0 +1,33 @@
+use super::ThingBuf;
+use crate::loom::{self, thread};
+use std::sync::Arc;
+
+#[test]
+fn linearizable() {
+    const THREADS: usize = 4;
+
+    fn thread(i: usize, q: Arc<ThingBuf<usize>>) {
+        while q
+            .push_ref()
+            .map(|mut r| r.with_mut(|val| *val = i))
+            .is_err()
+        {}
+
+        if let Some(mut r) = q.pop_ref() {
+            r.with_mut(|val| *val = 0);
+        }
+    }
+    loom::model(|| {
+        let q = Arc::new(ThingBuf::new(THREADS));
+        let q1 = q.clone();
+        let q2 = q.clone();
+        let q3 = q.clone();
+        let t1 = thread::spawn(move || thread(1, q1));
+        let t2 = thread::spawn(move || thread(2, q2));
+        let t3 = thread::spawn(move || thread(3, q3));
+        thread(4, q);
+        t1.join().expect("thread 1 panicked!");
+        t2.join().expect("thread 2 panicked!");
+        t3.join().expect("thread 3 panicked!");
+    })
+}