//! Multi-producer, single-consumer channels using [`ThingBuf`](crate::ThingBuf). //! //! The default MPSC channel returned by the [`channel`] function is //! _asynchronous_: receiving from the channel is an `async fn`, and the //! receiving task willwait when there are no messages in the channel. //! //! If the "std" feature flag is enabled, this module also provides a //! synchronous channel, in the [`sync`] module. The synchronous receiver will //! instead wait for new messages by blocking the current thread. Naturally, //! this requires the Rust standard library. A synchronous channel //! can be constructed using the [`sync::channel`] function. use crate::{ loom::atomic::AtomicUsize, util::wait::{Notify, WaitCell}, Ref, ThingBuf, }; use core::fmt; #[derive(Debug)] #[non_exhaustive] pub enum TrySendError { Full(T), Closed(T), } #[derive(Debug)] struct Inner { thingbuf: ThingBuf, rx_wait: WaitCell, tx_count: AtomicUsize, } struct SendRefInner<'a, T, N: Notify> { inner: &'a Inner, slot: Ref<'a, T>, } // ==== impl TrySendError === impl TrySendError { fn with_value(self, value: T) -> TrySendError { match self { Self::Full(()) => TrySendError::Full(value), Self::Closed(()) => TrySendError::Closed(value), } } } // ==== impl Inner ==== impl Inner { fn try_send_ref(&self) -> Result, TrySendError> { self.thingbuf .push_ref() .map(|slot| SendRefInner { inner: self, slot }) .map_err(|_| { if self.rx_wait.is_rx_closed() { TrySendError::Closed(()) } else { self.rx_wait.notify(); TrySendError::Full(()) } }) } fn try_send(&self, val: T) -> Result<(), TrySendError> { match self.try_send_ref() { Ok(mut slot) => { slot.with_mut(|slot| *slot = val); Ok(()) } Err(e) => Err(e.with_value(val)), } } } impl SendRefInner<'_, T, N> { #[inline] pub fn with(&self, f: impl FnOnce(&T) -> U) -> U { self.slot.with(f) } #[inline] pub fn with_mut(&mut self, f: impl FnOnce(&mut T) -> U) -> U { self.slot.with_mut(f) } } impl Drop for SendRefInner<'_, T, N> { #[inline] fn drop(&mut self) { test_println!("drop SendRef", std::any::type_name::()); self.inner.rx_wait.notify(); } } impl fmt::Debug for SendRefInner<'_, T, N> { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { self.with(|val| fmt::Debug::fmt(val, f)) } } impl fmt::Display for SendRefInner<'_, T, N> { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { self.with(|val| fmt::Display::fmt(val, f)) } } impl fmt::Write for SendRefInner<'_, T, N> { #[inline] fn write_str(&mut self, s: &str) -> fmt::Result { self.with_mut(|val| val.write_str(s)) } #[inline] fn write_char(&mut self, c: char) -> fmt::Result { self.with_mut(|val| val.write_char(c)) } #[inline] fn write_fmt(&mut self, f: fmt::Arguments<'_>) -> fmt::Result { self.with_mut(|val| val.write_fmt(f)) } } macro_rules! impl_send_ref { (pub struct $name:ident<$notify:ty>;) => { pub struct $name<'sender, T>(SendRefInner<'sender, T, $notify>); impl $name<'_, T> { #[inline] pub fn with(&self, f: impl FnOnce(&T) -> U) -> U { self.0.with(f) } #[inline] pub fn with_mut(&mut self, f: impl FnOnce(&mut T) -> U) -> U { self.0.with_mut(f) } } impl fmt::Debug for $name<'_, T> { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { self.0.fmt(f) } } impl fmt::Display for $name<'_, T> { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { self.0.fmt(f) } } impl fmt::Write for $name<'_, T> { #[inline] fn write_str(&mut self, s: &str) -> fmt::Result { self.0.write_str(s) } #[inline] fn write_char(&mut self, c: char) -> fmt::Result { self.0.write_char(c) } #[inline] fn write_fmt(&mut self, f: fmt::Arguments<'_>) -> fmt::Result { self.0.write_fmt(f) } } }; } mod async_impl; pub use self::async_impl::*; feature! { #![feature = "std"] pub mod sync; } #[cfg(test)] mod tests;