浏览代码

refac(mpsc): reduce boilerplate a bit (#6)

Signed-off-by: Eliza Weisman <[email protected]>
Eliza Weisman 3 年之前
父节点
当前提交
74baf66c8a
共有 4 个文件被更改,包括 167 次插入179 次删除
  1. 152 6
      src/mpsc.rs
  2. 6 85
      src/mpsc/async_impl.rs
  3. 7 86
      src/mpsc/sync.rs
  4. 2 2
      src/util/wait.rs

+ 152 - 6
src/mpsc.rs

@@ -9,13 +9,13 @@
 //! instead wait for new messages by blocking the current thread. Naturally,
 //! this requires the Rust standard library. A synchronous channel
 //! can be constructed using the [`sync::channel`] function.
-mod async_impl;
-pub use self::async_impl::*;
 
-feature! {
-    #![feature = "std"]
-    pub mod sync;
-}
+use crate::{
+    loom::atomic::AtomicUsize,
+    util::wait::{Notify, WaitCell},
+    Ref, ThingBuf,
+};
+use core::fmt;
 
 #[derive(Debug)]
 #[non_exhaustive]
@@ -24,6 +24,20 @@ pub enum TrySendError<T = ()> {
     Closed(T),
 }
 
+#[derive(Debug)]
+struct Inner<T, N> {
+    thingbuf: ThingBuf<T>,
+    rx_wait: WaitCell<N>,
+    tx_count: AtomicUsize,
+}
+
+struct SendRefInner<'a, T, N: Notify> {
+    inner: &'a Inner<T, N>,
+    slot: Ref<'a, T>,
+}
+
+// ==== impl TrySendError ===
+
 impl TrySendError {
     fn with_value<T>(self, value: T) -> TrySendError<T> {
         match self {
@@ -33,5 +47,137 @@ impl TrySendError {
     }
 }
 
+// ==== impl Inner ====
+
+impl<T: Default, N: Notify> Inner<T, N> {
+    fn try_send_ref(&self) -> Result<SendRefInner<'_, T, N>, TrySendError> {
+        self.thingbuf
+            .push_ref()
+            .map(|slot| SendRefInner { inner: self, slot })
+            .map_err(|_| {
+                if self.rx_wait.is_rx_closed() {
+                    TrySendError::Closed(())
+                } else {
+                    self.rx_wait.notify();
+                    TrySendError::Full(())
+                }
+            })
+    }
+
+    fn try_send(&self, val: T) -> Result<(), TrySendError<T>> {
+        match self.try_send_ref() {
+            Ok(mut slot) => {
+                slot.with_mut(|slot| *slot = val);
+                Ok(())
+            }
+            Err(e) => Err(e.with_value(val)),
+        }
+    }
+}
+
+impl<T, N: Notify> SendRefInner<'_, T, N> {
+    #[inline]
+    pub fn with<U>(&self, f: impl FnOnce(&T) -> U) -> U {
+        self.slot.with(f)
+    }
+
+    #[inline]
+    pub fn with_mut<U>(&mut self, f: impl FnOnce(&mut T) -> U) -> U {
+        self.slot.with_mut(f)
+    }
+}
+
+impl<T, N: Notify> Drop for SendRefInner<'_, T, N> {
+    #[inline]
+    fn drop(&mut self) {
+        test_println!("drop SendRef<T, {}>", std::any::type_name::<N>());
+        self.inner.rx_wait.notify();
+    }
+}
+
+impl<T: fmt::Debug, N: Notify> fmt::Debug for SendRefInner<'_, T, N> {
+    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+        self.with(|val| fmt::Debug::fmt(val, f))
+    }
+}
+
+impl<T: fmt::Display, N: Notify> fmt::Display for SendRefInner<'_, T, N> {
+    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+        self.with(|val| fmt::Display::fmt(val, f))
+    }
+}
+
+impl<T: fmt::Write, N: Notify> fmt::Write for SendRefInner<'_, T, N> {
+    #[inline]
+    fn write_str(&mut self, s: &str) -> fmt::Result {
+        self.with_mut(|val| val.write_str(s))
+    }
+
+    #[inline]
+    fn write_char(&mut self, c: char) -> fmt::Result {
+        self.with_mut(|val| val.write_char(c))
+    }
+
+    #[inline]
+    fn write_fmt(&mut self, f: fmt::Arguments<'_>) -> fmt::Result {
+        self.with_mut(|val| val.write_fmt(f))
+    }
+}
+
+macro_rules! impl_send_ref {
+    (pub struct $name:ident<$notify:ty>;) => {
+        pub struct $name<'sender, T>(SendRefInner<'sender, T, $notify>);
+
+        impl<T> $name<'_, T> {
+            #[inline]
+            pub fn with<U>(&self, f: impl FnOnce(&T) -> U) -> U {
+                self.0.with(f)
+            }
+
+            #[inline]
+            pub fn with_mut<U>(&mut self, f: impl FnOnce(&mut T) -> U) -> U {
+                self.0.with_mut(f)
+            }
+        }
+
+        impl<T: fmt::Debug> fmt::Debug for $name<'_, T> {
+            fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+                self.0.fmt(f)
+            }
+        }
+
+        impl<T: fmt::Display> fmt::Display for $name<'_, T> {
+            fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+                self.0.fmt(f)
+            }
+        }
+
+        impl<T: fmt::Write> fmt::Write for $name<'_, T> {
+            #[inline]
+            fn write_str(&mut self, s: &str) -> fmt::Result {
+                self.0.write_str(s)
+            }
+
+            #[inline]
+            fn write_char(&mut self, c: char) -> fmt::Result {
+                self.0.write_char(c)
+            }
+
+            #[inline]
+            fn write_fmt(&mut self, f: fmt::Arguments<'_>) -> fmt::Result {
+                self.0.write_fmt(f)
+            }
+        }
+    };
+}
+
+mod async_impl;
+pub use self::async_impl::*;
+
+feature! {
+    #![feature = "std"]
+    pub mod sync;
+}
+
 #[cfg(test)]
 mod tests;

+ 6 - 85
src/mpsc/async_impl.rs

@@ -31,17 +31,16 @@ pub fn channel<T>(thingbuf: ThingBuf<T>) -> (Sender<T>, Receiver<T>) {
 
 #[derive(Debug)]
 pub struct Sender<T> {
-    inner: Arc<Inner<T>>,
+    inner: Arc<Inner<T, Waker>>,
 }
 
 #[derive(Debug)]
 pub struct Receiver<T> {
-    inner: Arc<Inner<T>>,
+    inner: Arc<Inner<T, Waker>>,
 }
 
-pub struct SendRef<'a, T> {
-    inner: &'a Inner<T>,
-    slot: Ref<'a, T>,
+impl_send_ref! {
+    pub struct SendRef<Waker>;
 }
 
 /// A [`Future`] that tries to receive a reference from a [`Receiver`].
@@ -64,42 +63,15 @@ pub struct RecvFuture<'a, T> {
     rx: &'a Receiver<T>,
 }
 
-#[derive(Debug)]
-struct Inner<T> {
-    thingbuf: ThingBuf<T>,
-    rx_wait: WaitCell<Waker>,
-    tx_count: AtomicUsize,
-}
-
 // === impl Sender ===
 
 impl<T: Default> Sender<T> {
     pub fn try_send_ref(&self) -> Result<SendRef<'_, T>, TrySendError> {
-        self.inner
-            .thingbuf
-            .push_ref()
-            .map(|slot| SendRef {
-                inner: &*self.inner,
-                slot,
-            })
-            .map_err(|_| {
-                if self.inner.rx_wait.is_rx_closed() {
-                    TrySendError::Closed(())
-                } else {
-                    self.inner.rx_wait.notify();
-                    TrySendError::Full(())
-                }
-            })
+        self.inner.try_send_ref().map(SendRef)
     }
 
     pub fn try_send(&self, val: T) -> Result<(), TrySendError<T>> {
-        match self.try_send_ref() {
-            Ok(mut slot) => {
-                slot.with_mut(|slot| *slot = val);
-                Ok(())
-            }
-            Err(e) => Err(e.with_value(val)),
-        }
+        self.inner.try_send(val)
     }
 }
 
@@ -214,57 +186,6 @@ impl<T> Drop for Receiver<T> {
     }
 }
 
-// === impl SendRef ===
-
-impl<T> SendRef<'_, T> {
-    #[inline]
-    pub fn with<U>(&self, f: impl FnOnce(&T) -> U) -> U {
-        self.slot.with(f)
-    }
-
-    #[inline]
-    pub fn with_mut<U>(&mut self, f: impl FnOnce(&mut T) -> U) -> U {
-        self.slot.with_mut(f)
-    }
-}
-
-impl<T> Drop for SendRef<'_, T> {
-    #[inline]
-    fn drop(&mut self) {
-        test_println!("drop async SendRef");
-        self.inner.rx_wait.notify();
-    }
-}
-
-impl<T: fmt::Debug> fmt::Debug for SendRef<'_, T> {
-    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
-        self.with(|val| fmt::Debug::fmt(val, f))
-    }
-}
-
-impl<T: fmt::Display> fmt::Display for SendRef<'_, T> {
-    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
-        self.with(|val| fmt::Display::fmt(val, f))
-    }
-}
-
-impl<T: fmt::Write> fmt::Write for SendRef<'_, T> {
-    #[inline]
-    fn write_str(&mut self, s: &str) -> fmt::Result {
-        self.with_mut(|val| val.write_str(s))
-    }
-
-    #[inline]
-    fn write_char(&mut self, c: char) -> fmt::Result {
-        self.with_mut(|val| val.write_char(c))
-    }
-
-    #[inline]
-    fn write_fmt(&mut self, f: fmt::Arguments<'_>) -> fmt::Result {
-        self.with_mut(|val| val.write_fmt(f))
-    }
-}
-
 // === impl RecvRefFuture ===
 
 impl<'a, T: Default> Future for RecvRefFuture<'a, T> {

+ 7 - 86
src/mpsc/sync.rs

@@ -3,7 +3,7 @@
 //! This provides an equivalent API to the [`mpsc`](crate::mpsc) module, but the
 //! [`Receiver`] type in this module waits by blocking the current thread,
 //! rather than asynchronously yielding.
-use super::TrySendError;
+use super::*;
 use crate::{
     loom::{
         self,
@@ -32,55 +32,27 @@ pub fn channel<T>(thingbuf: ThingBuf<T>) -> (Sender<T>, Receiver<T>) {
 
 #[derive(Debug)]
 pub struct Sender<T> {
-    inner: Arc<Inner<T>>,
+    inner: Arc<Inner<T, Thread>>,
 }
 
 #[derive(Debug)]
 pub struct Receiver<T> {
-    inner: Arc<Inner<T>>,
+    inner: Arc<Inner<T, Thread>>,
 }
 
-pub struct SendRef<'a, T> {
-    inner: &'a Inner<T>,
-    slot: Ref<'a, T>,
-}
-
-#[derive(Debug)]
-struct Inner<T> {
-    thingbuf: ThingBuf<T>,
-    rx_wait: WaitCell<Thread>,
-    tx_count: AtomicUsize,
+impl_send_ref! {
+    pub struct SendRef<Thread>;
 }
 
 // === impl Sender ===
 
 impl<T: Default> Sender<T> {
     pub fn try_send_ref(&self) -> Result<SendRef<'_, T>, TrySendError> {
-        self.inner
-            .thingbuf
-            .push_ref()
-            .map(|slot| SendRef {
-                inner: &*self.inner,
-                slot,
-            })
-            .map_err(|_| {
-                if self.inner.rx_wait.is_rx_closed() {
-                    TrySendError::Closed(())
-                } else {
-                    self.inner.rx_wait.notify();
-                    TrySendError::Full(())
-                }
-            })
+        self.inner.try_send_ref().map(SendRef)
     }
 
     pub fn try_send(&self, val: T) -> Result<(), TrySendError<T>> {
-        match self.try_send_ref() {
-            Ok(mut slot) => {
-                slot.with_mut(|slot| *slot = val);
-                Ok(())
-            }
-            Err(e) => Err(e.with_value(val)),
-        }
+        self.inner.try_send(val)
     }
 }
 
@@ -169,54 +141,3 @@ impl<T> Drop for Receiver<T> {
         self.inner.rx_wait.close_rx();
     }
 }
-
-// === impl SendRef ===
-
-impl<T> SendRef<'_, T> {
-    #[inline]
-    pub fn with<U>(&self, f: impl FnOnce(&T) -> U) -> U {
-        self.slot.with(f)
-    }
-
-    #[inline]
-    pub fn with_mut<U>(&mut self, f: impl FnOnce(&mut T) -> U) -> U {
-        self.slot.with_mut(f)
-    }
-}
-
-impl<T> Drop for SendRef<'_, T> {
-    #[inline]
-    fn drop(&mut self) {
-        test_println!("drop SendRef");
-        self.inner.rx_wait.notify();
-    }
-}
-
-impl<T: fmt::Debug> fmt::Debug for SendRef<'_, T> {
-    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
-        self.with(|val| fmt::Debug::fmt(val, f))
-    }
-}
-
-impl<T: fmt::Display> fmt::Display for SendRef<'_, T> {
-    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
-        self.with(|val| fmt::Display::fmt(val, f))
-    }
-}
-
-impl<T: fmt::Write> fmt::Write for SendRef<'_, T> {
-    #[inline]
-    fn write_str(&mut self, s: &str) -> fmt::Result {
-        self.with_mut(|val| val.write_str(s))
-    }
-
-    #[inline]
-    fn write_char(&mut self, c: char) -> fmt::Result {
-        self.with_mut(|val| val.write_char(c))
-    }
-
-    #[inline]
-    fn write_fmt(&mut self, f: fmt::Arguments<'_>) -> fmt::Result {
-        self.with_mut(|val| val.write_fmt(f))
-    }
-}

+ 2 - 2
src/util/wait.rs

@@ -42,7 +42,7 @@ pub(crate) enum WaitResult {
     TxClosed,
 }
 
-pub(crate) trait Notify {
+pub(crate) trait Notify: UnwindSafe + fmt::Debug {
     fn notify(self);
 }
 
@@ -51,7 +51,7 @@ struct State(usize);
 
 // === impl WaitCell ===
 
-impl<T: Notify + UnwindSafe + fmt::Debug> WaitCell<T> {
+impl<T: Notify> WaitCell<T> {
     pub(crate) fn new() -> Self {
         Self {
             lock: AtomicUsize::new(State::WAITING.0),