Selaa lähdekoodia

fix(mpsc): `try_recv_ref` should return `RecvRef` (#61)

`try_recv_ref` should return a `RecvRef` to notify the sender
threads to wake up.
Name1e5s 2 vuotta sitten
vanhempi
commit
47f16f5957
3 muutettua tiedostoa jossa 105 lisäystä ja 86 poistoa
  1. 84 70
      src/mpsc.rs
  2. 9 8
      src/mpsc/async_impl.rs
  3. 12 8
      src/mpsc/blocking.rs

+ 84 - 70
src/mpsc.rs

@@ -285,6 +285,26 @@ struct SendRefInner<'a, T, N: Notify> {
     _notify: NotifyRx<'a, N>,
 }
 
+struct RecvRefInner<'a, T, N: Notify + Unpin> {
+    // /!\ LOAD BEARING STRUCT DROP ORDER /!\
+    //
+    // The `Ref` field *must* be dropped before the `NotifyTx` field, or else
+    // loom tests will fail. This ensures that the mutable access to the slot is
+    // considered to have ended *before* the receiver thread/task is notified.
+    //
+    // The alternatives to a load-bearing drop order would be:
+    // (a) put one field inside an `Option` so it can be dropped before the
+    //     other (not great, as it adds a little extra overhead even outside
+    //     of Loom tests),
+    // (b) use `core::mem::ManuallyDrop` (also not great, requires additional
+    //     unsafe code that in this case we can avoid)
+    //
+    // So, given that, relying on struct field drop order seemed like the least
+    // bad option here. Just don't reorder these fields. :)
+    slot: Ref<'a, T>,
+    _notify: crate::mpsc::NotifyTx<'a, N>,
+}
+
 struct NotifyRx<'a, N: Notify>(&'a WaitCell<N>);
 struct NotifyTx<'a, N: Notify + Unpin>(&'a WaitQueue<N>);
 
@@ -356,8 +376,14 @@ where
         }
     }
 
-    fn try_recv_ref<'a, T>(&'a self, slots: &'a [Slot<T>]) -> Result<Ref<'a, T>, TryRecvError> {
-        self.core.pop_ref(slots)
+    fn try_recv_ref<'a, T>(
+        &'a self,
+        slots: &'a [Slot<T>],
+    ) -> Result<RecvRefInner<'a, T, N>, TryRecvError> {
+        self.core.pop_ref(slots).map(|slot| RecvRefInner {
+            _notify: NotifyTx(&self.tx_wait),
+            slot,
+        })
     }
 
     fn try_recv<T, R>(&self, slots: &[Slot<T>], recycle: &R) -> Result<T, TryRecvError>
@@ -484,6 +510,52 @@ impl<T: fmt::Write, N: Notify> fmt::Write for SendRefInner<'_, T, N> {
     }
 }
 
+// === impl RecvRefInner ===
+
+impl<T, N: Notify + Unpin> core::ops::Deref for RecvRefInner<'_, T, N> {
+    type Target = T;
+    #[inline]
+    fn deref(&self) -> &Self::Target {
+        self.slot.deref()
+    }
+}
+
+impl<T, N: Notify + Unpin> core::ops::DerefMut for RecvRefInner<'_, T, N> {
+    #[inline]
+    fn deref_mut(&mut self) -> &mut Self::Target {
+        self.slot.deref_mut()
+    }
+}
+
+impl<T: fmt::Debug, N: Notify + Unpin> fmt::Debug for RecvRefInner<'_, T, N> {
+    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+        self.slot.fmt(f)
+    }
+}
+
+impl<T: fmt::Display, N: Notify + Unpin> fmt::Display for RecvRefInner<'_, T, N> {
+    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+        self.slot.fmt(f)
+    }
+}
+
+impl<T: fmt::Write, N: Notify + Unpin> fmt::Write for RecvRefInner<'_, T, N> {
+    #[inline]
+    fn write_str(&mut self, s: &str) -> fmt::Result {
+        self.slot.write_str(s)
+    }
+
+    #[inline]
+    fn write_char(&mut self, c: char) -> fmt::Result {
+        self.slot.write_char(c)
+    }
+
+    #[inline]
+    fn write_fmt(&mut self, f: fmt::Arguments<'_>) -> fmt::Result {
+        self.slot.write_fmt(f)
+    }
+}
+
 impl<N: Notify> Drop for NotifyRx<'_, N> {
     #[inline]
     fn drop(&mut self) {
@@ -500,10 +572,10 @@ impl<N: Notify + Unpin> Drop for NotifyTx<'_, N> {
     }
 }
 
-macro_rules! impl_send_ref {
-    ($(#[$m:meta])* pub struct $name:ident<$notify:ty>;) => {
+macro_rules! impl_ref_inner {
+    ($(#[$m:meta])*, $inner:ident, $name:ident, $notify:ty) => {
         $(#[$m])*
-        pub struct $name<'sender, T>(SendRefInner<'sender, T, $notify>);
+        pub struct $name<'a, T>($inner<'a, T, $notify>);
 
         impl<T> core::ops::Deref for $name<'_, T> {
             type Target = T;
@@ -552,73 +624,15 @@ macro_rules! impl_send_ref {
     };
 }
 
-macro_rules! impl_recv_ref {
+macro_rules! impl_send_ref {
     ($(#[$m:meta])* pub struct $name:ident<$notify:ty>;) => {
-        $(#[$m])*
-        pub struct $name<'recv, T> {
-            // /!\ LOAD BEARING STRUCT DROP ORDER /!\
-            //
-            // The `Ref` field *must* be dropped before the `NotifyTx` field, or else
-            // loom tests will fail. This ensures that the mutable access to the slot is
-            // considered to have ended *before* the receiver thread/task is notified.
-            //
-            // The alternatives to a load-bearing drop order would be:
-            // (a) put one field inside an `Option` so it can be dropped before the
-            //     other (not great, as it adds a little extra overhead even outside
-            //     of Loom tests),
-            // (b) use `core::mem::ManuallyDrop` (also not great, requires additional
-            //     unsafe code that in this case we can avoid)
-            //
-            // So, given that, relying on struct field drop order seemed like the least
-            // bad option here. Just don't reorder these fields. :)
-            slot: Ref<'recv, T>,
-            _notify: crate::mpsc::NotifyTx<'recv, $notify>,
-        }
-
-        impl<T> core::ops::Deref for $name<'_, T> {
-            type Target = T;
-
-            #[inline]
-            fn deref(&self) -> &Self::Target {
-                self.slot.deref()
-            }
-        }
-
-        impl<T> core::ops::DerefMut for $name<'_, T> {
-            #[inline]
-            fn deref_mut(&mut self) -> &mut Self::Target {
-                self.slot.deref_mut()
-            }
-        }
-
-        impl<T: fmt::Debug> fmt::Debug for $name<'_, T> {
-            fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
-                self.slot.fmt(f)
-            }
-        }
-
-        impl<T: fmt::Display> fmt::Display for $name<'_, T> {
-            fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
-                self.slot.fmt(f)
-            }
-        }
-
-        impl<T: fmt::Write> fmt::Write for $name<'_, T> {
-            #[inline]
-            fn write_str(&mut self, s: &str) -> fmt::Result {
-                self.slot.write_str(s)
-            }
-
-            #[inline]
-            fn write_char(&mut self, c: char) -> fmt::Result {
-                self.slot.write_char(c)
-            }
+        impl_ref_inner!($(#[$m])*, SendRefInner, $name, $notify);
+    };
+}
 
-            #[inline]
-            fn write_fmt(&mut self, f: fmt::Arguments<'_>) -> fmt::Result {
-                self.slot.write_fmt(f)
-            }
-        }
+macro_rules! impl_recv_ref {
+    ($(#[$m:meta])* pub struct $name:ident<$notify:ty>;) => {
+        impl_ref_inner!($(#[$m])*, RecvRefInner, $name, $notify);
     };
 }
 

+ 9 - 8
src/mpsc/async_impl.rs

@@ -3,7 +3,6 @@ use crate::{
     loom::atomic::{self, Ordering},
     recycling::{self, Recycle},
     wait::queue,
-    Ref,
 };
 use core::{
     fmt,
@@ -449,11 +448,11 @@ feature! {
         /// ```
         ///
         /// [`recv_ref`]: Self::recv_ref
-        pub fn try_recv_ref(&self) -> Result<Ref<'_, T>, TryRecvError>
+        pub fn try_recv_ref(&self) -> Result<RecvRef<'_, T>, TryRecvError>
         where
             R: Recycle<T>,
         {
-            self.inner.core.try_recv_ref(self.inner.slots.as_ref())
+            self.inner.core.try_recv_ref(self.inner.slots.as_ref()).map(RecvRef)
         }
 
         /// Attempts to receive the next message for this receiver by reference
@@ -1154,11 +1153,11 @@ feature! {
         /// ```
         ///
         /// [`recv_ref`]: Self::recv_ref
-        pub fn try_recv_ref(&self) -> Result<Ref<'_, T>, TryRecvError>
+        pub fn try_recv_ref(&self) -> Result<RecvRef<'_, T>, TryRecvError>
         where
             R: Recycle<T>,
         {
-            self.core.try_recv_ref(self.slots.as_ref())
+            self.core.try_recv_ref(self.slots.as_ref()).map(RecvRef)
         }
 
         /// Attempts to receive the next message for this receiver by reference
@@ -1394,9 +1393,11 @@ fn poll_recv_ref<'a, T>(
 ) -> Poll<Option<RecvRef<'a, T>>> {
     core.poll_recv_ref(slots, || cx.waker().clone())
         .map(|some| {
-            some.map(|slot| RecvRef {
-                _notify: super::NotifyTx(&core.tx_wait),
-                slot,
+            some.map(|slot| {
+                RecvRef(RecvRefInner {
+                    _notify: super::NotifyTx(&core.tx_wait),
+                    slot,
+                })
             })
         })
 }

+ 12 - 8
src/mpsc/blocking.rs

@@ -13,7 +13,6 @@ use crate::{
     recycling::{self, Recycle},
     util::Backoff,
     wait::queue,
-    Ref,
 };
 use core::{fmt, pin::Pin};
 use errors::*;
@@ -602,11 +601,11 @@ feature! {
         /// ```
         ///
         /// [`recv_ref`]: Self::recv_ref
-        pub fn try_recv_ref(&self) -> Result<Ref<'_, T>, TryRecvError>
+        pub fn try_recv_ref(&self) -> Result<RecvRef<'_, T>, TryRecvError>
         where
             R: Recycle<T>,
         {
-            self.core.try_recv_ref(self.slots.as_ref())
+            self.core.try_recv_ref(self.slots.as_ref()).map(RecvRef)
         }
 
         /// Attempts to receive the next message for this receiver by value
@@ -1061,8 +1060,11 @@ impl<T, R> Receiver<T, R> {
     /// ```
     ///
     /// [`recv_ref`]: Self::recv_ref
-    pub fn try_recv_ref(&self) -> Result<Ref<'_, T>, TryRecvError> {
-        self.inner.core.try_recv_ref(self.inner.slots.as_ref())
+    pub fn try_recv_ref(&self) -> Result<RecvRef<'_, T>, TryRecvError> {
+        self.inner
+            .core
+            .try_recv_ref(self.inner.slots.as_ref())
+            .map(RecvRef)
     }
 
     /// Attempts to receive the next message for this receiver by value
@@ -1148,9 +1150,11 @@ fn recv_ref<'a, T>(core: &'a ChannelCore<Thread>, slots: &'a [Slot<T>]) -> Optio
     loop {
         match core.poll_recv_ref(slots, thread::current) {
             Poll::Ready(r) => {
-                return r.map(|slot| RecvRef {
-                    _notify: super::NotifyTx(&core.tx_wait),
-                    slot,
+                return r.map(|slot| {
+                    RecvRef(RecvRefInner {
+                        _notify: super::NotifyTx(&core.tx_wait),
+                        slot,
+                    })
                 })
             }
             Poll::Pending => {