Browse Source

refac: move utils to utils

Signed-off-by: Eliza Weisman <eliza@buoyant.io>
Eliza Weisman 3 years ago
parent
commit
7efe99503f
2 changed files with 138 additions and 77 deletions
  1. 68 77
      src/lib.rs
  2. 70 0
      src/util.rs

+ 68 - 77
src/lib.rs

@@ -1,7 +1,4 @@
-use core::{
-    fmt,
-    ops::{Deref, DerefMut},
-};
+use core::{fmt, marker::PhantomData, mem::MaybeUninit};
 
 #[cfg(feature = "alloc")]
 extern crate alloc;
@@ -34,25 +31,29 @@ macro_rules! test_dbg {
 }
 
 mod loom;
-
 #[cfg(test)]
 mod tests;
+mod util;
 
 use crate::loom::{
     atomic::{AtomicUsize, Ordering},
     UnsafeCell,
 };
 
+use crate::util::{Backoff, CachePadded};
+
 #[cfg(feature = "alloc")]
 mod stringbuf;
 
-pub struct ThingBuf<T> {
+pub struct ThingBuf<T, S = Box<[Slot<T>]>> {
     head: CachePadded<AtomicUsize>,
     tail: CachePadded<AtomicUsize>,
     gen: usize,
     gen_mask: usize,
     idx_mask: usize,
-    slots: Box<[Slot<T>]>,
+    capacity: usize,
+    slots: S,
+    _t: PhantomData<T>,
 }
 
 pub struct Ref<'slot, T> {
@@ -63,22 +64,11 @@ pub struct Ref<'slot, T> {
 #[derive(Debug)]
 pub struct AtCapacity(usize);
 
-struct Slot<T> {
+pub struct Slot<T> {
     value: UnsafeCell<T>,
     state: AtomicUsize,
 }
 
-#[derive(Debug)]
-struct Backoff(u8);
-
-#[cfg_attr(any(target_arch = "x86_64", target_arch = "aarch64"), repr(align(128)))]
-#[cfg_attr(
-    not(any(target_arch = "x86_64", target_arch = "aarch64")),
-    repr(align(64))
-)]
-#[derive(Clone, Copy, Default, Hash, PartialEq, Eq, Debug)]
-struct CachePadded<T>(T);
-
 // === impl ThingBuf ===
 
 impl<T: Default> ThingBuf<T> {
@@ -105,10 +95,14 @@ impl<T> ThingBuf<T> {
             gen,
             gen_mask,
             idx_mask,
+            capacity,
             slots,
+            _t: PhantomData,
         }
     }
+}
 
+impl<T, S> ThingBuf<T, S> {
     #[inline]
     fn idx_gen(&self, val: usize) -> (usize, usize) {
         (val & self.idx_mask, val & self.gen_mask)
@@ -117,7 +111,7 @@ impl<T> ThingBuf<T> {
     #[inline]
     fn next(&self, idx: usize, gen: usize) -> usize {
         // Are we still in the same generation?
-        if idx + 1 < self.slots.len() {
+        if idx + 1 < self.capacity() {
             // If we're still within the current generation, increment the index
             // by 1.
             (idx | gen) + 1
@@ -128,6 +122,37 @@ impl<T> ThingBuf<T> {
         }
     }
 
+    pub fn capacity(&self) -> usize {
+        self.capacity
+    }
+}
+
+impl<T, S> ThingBuf<T, S>
+where
+    S: AsRef<[Slot<T>]>,
+{
+    pub fn from_array(slots: S) -> Self {
+        let capacity = slots.as_ref().len();
+        assert!(capacity > 0);
+        for (idx, slot) in slots.as_ref().iter().enumerate() {
+            // Relaxed is fine here, because the slot is not shared yet.
+            slot.state.store(idx, Ordering::Relaxed);
+        }
+        let gen = (capacity + 1).next_power_of_two();
+        let idx_mask = gen - 1;
+        let gen_mask = !(gen - 1);
+        Self {
+            head: CachePadded(AtomicUsize::new(0)),
+            tail: CachePadded(AtomicUsize::new(0)),
+            gen,
+            gen_mask,
+            idx_mask,
+            capacity,
+            slots,
+            _t: PhantomData,
+        }
+    }
+
     #[inline]
     pub fn push_with<U>(&self, f: impl FnOnce(&mut T) -> U) -> Result<U, AtCapacity> {
         self.push_ref().map(|mut r| r.with_mut(f))
@@ -136,18 +161,19 @@ impl<T> ThingBuf<T> {
     pub fn push_ref(&self) -> Result<Ref<'_, T>, AtCapacity> {
         let mut backoff = Backoff::new();
         let mut tail = self.tail.load(Ordering::Relaxed);
+        let slots = self.slots.as_ref();
 
         loop {
             let (idx, gen) = self.idx_gen(tail);
             test_dbg!(idx);
             test_dbg!(gen);
-            let slot = &self.slots[idx];
+            let slot = &slots[idx];
             let state = slot.state.load(Ordering::Acquire);
 
             if state == tail {
                 // Move the tail index forward by 1.
                 let next_tail = self.next(idx, gen);
-                match self.tail.compare_exchange(
+                match self.tail.compare_exchange_weak(
                     tail,
                     next_tail,
                     Ordering::AcqRel,
@@ -168,8 +194,8 @@ impl<T> ThingBuf<T> {
             }
 
             if state.wrapping_add(self.gen) == tail + 1 {
-                if self.head.load(Ordering::Acquire).wrapping_add(self.gen) == tail {
-                    return Err(AtCapacity(self.slots.len()));
+                if self.head.load(Ordering::SeqCst).wrapping_add(self.gen) == tail {
+                    return Err(AtCapacity(self.capacity()));
                 }
 
                 backoff.spin();
@@ -189,13 +215,14 @@ impl<T> ThingBuf<T> {
     pub fn pop_ref(&self) -> Option<Ref<'_, T>> {
         let mut backoff = Backoff::new();
         let mut head = self.head.load(Ordering::Relaxed);
+        let slots = self.slots.as_ref();
 
         loop {
             test_dbg!(head);
             let (idx, gen) = self.idx_gen(head);
             test_dbg!(idx);
             test_dbg!(gen);
-            let slot = &self.slots[idx];
+            let slot = &slots[idx];
             let state = slot.state.load(Ordering::Acquire);
             test_dbg!(state);
 
@@ -206,7 +233,7 @@ impl<T> ThingBuf<T> {
                 match self.head.compare_exchange(
                     head,
                     next_head,
-                    Ordering::AcqRel,
+                    Ordering::SeqCst,
                     Ordering::Relaxed,
                 ) {
                     Ok(_) => {
@@ -224,6 +251,7 @@ impl<T> ThingBuf<T> {
             }
 
             if test_dbg!(state == head) {
+                let tail = self.tail.load(Ordering::SeqCst);
 
                 if test_dbg!(tail == head) {
                     return None;
@@ -237,10 +265,6 @@ impl<T> ThingBuf<T> {
             head = self.head.load(Ordering::Relaxed);
         }
     }
-
-    pub fn capacity(&self) -> usize {
-        self.slots.len()
-    }
 }
 
 impl<T> fmt::Debug for ThingBuf<T> {
@@ -310,59 +334,26 @@ impl<T: fmt::Write> fmt::Write for Ref<'_, T> {
     }
 }
 
-// === impl Backoff ===
+// === impl Slot ===
 
-impl Backoff {
-    const MAX_SPINS: u8 = 6;
-    const MAX_YIELDS: u8 = 10;
-    #[inline]
-    fn new() -> Self {
-        Self(0)
-    }
-
-    #[inline]
-    fn spin(&mut self) {
-        for _ in 0..test_dbg!(1 << self.0.min(Self::MAX_SPINS)) {
-            loom::hint::spin_loop();
-
-            test_println!("spin_loop_hint");
-        }
-
-        if self.0 <= Self::MAX_SPINS {
-            self.0 += 1;
-        }
-    }
-
-    #[inline]
-    fn spin_yield(&mut self) {
-        if self.0 <= Self::MAX_SPINS || cfg!(not(any(feature = "std", test))) {
-            for _ in 0..1 << self.0 {
-                loom::hint::spin_loop();
-                test_println!("spin_loop_hint");
-            }
-        }
-
-        #[cfg(any(test, feature = "std"))]
-        loom::thread::yield_now();
-
-        if self.0 <= Self::MAX_YIELDS {
-            self.0 += 1;
-        }
+impl<T: Default> Default for Slot<T> {
+    fn default() -> Self {
+        Self::new(T::default())
     }
 }
 
-// === impl CachePadded ===
-
-impl<T> Deref for CachePadded<T> {
-    type Target = T;
-
-    fn deref(&self) -> &T {
-        &self.0
+impl<T> Slot<T> {
+    const UNINIT: usize = usize::MAX;
+    pub fn new(t: T) -> Self {
+        Self {
+            value: UnsafeCell::new(t),
+            state: AtomicUsize::new(Self::UNINIT),
+        }
     }
 }
 
-impl<T> DerefMut for CachePadded<T> {
-    fn deref_mut(&mut self) -> &mut T {
-        &mut self.0
+impl<T> Slot<MaybeUninit<T>> {
+    pub fn uninit() -> Self {
+        Self::new(MaybeUninit::uninit())
     }
 }

+ 70 - 0
src/util.rs

@@ -0,0 +1,70 @@
+use crate::loom;
+use core::ops::{Deref, DerefMut};
+
+#[derive(Debug)]
+pub(crate) struct Backoff(u8);
+
+#[cfg_attr(any(target_arch = "x86_64", target_arch = "aarch64"), repr(align(128)))]
+#[cfg_attr(
+    not(any(target_arch = "x86_64", target_arch = "aarch64")),
+    repr(align(64))
+)]
+#[derive(Clone, Copy, Default, Hash, PartialEq, Eq, Debug)]
+pub(crate) struct CachePadded<T>(pub(crate) T);
+
+// === impl Backoff ===
+
+impl Backoff {
+    const MAX_SPINS: u8 = 6;
+    const MAX_YIELDS: u8 = 10;
+    #[inline]
+    pub(crate) fn new() -> Self {
+        Self(0)
+    }
+
+    #[inline]
+    pub(crate) fn spin(&mut self) {
+        for _ in 0..test_dbg!(1 << self.0.min(Self::MAX_SPINS)) {
+            loom::hint::spin_loop();
+
+            test_println!("spin_loop_hint");
+        }
+
+        if self.0 <= Self::MAX_SPINS {
+            self.0 += 1;
+        }
+    }
+
+    #[inline]
+    pub(crate) fn spin_yield(&mut self) {
+        if self.0 <= Self::MAX_SPINS || cfg!(not(any(feature = "std", test))) {
+            for _ in 0..1 << self.0 {
+                loom::hint::spin_loop();
+                test_println!("spin_loop_hint");
+            }
+        }
+
+        #[cfg(any(test, feature = "std"))]
+        loom::thread::yield_now();
+
+        if self.0 <= Self::MAX_YIELDS {
+            self.0 += 1;
+        }
+    }
+}
+
+// === impl CachePadded ===
+
+impl<T> Deref for CachePadded<T> {
+    type Target = T;
+
+    fn deref(&self) -> &T {
+        &self.0
+    }
+}
+
+impl<T> DerefMut for CachePadded<T> {
+    fn deref_mut(&mut self) -> &mut T {
+        &mut self.0
+    }
+}