123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183 |
- //! Multi-producer, single-consumer channels using [`ThingBuf`](crate::ThingBuf).
- //!
- //! The default MPSC channel returned by the [`channel`] function is
- //! _asynchronous_: receiving from the channel is an `async fn`, and the
- //! receiving task willwait when there are no messages in the channel.
- //!
- //! If the "std" feature flag is enabled, this module also provides a
- //! synchronous channel, in the [`sync`] module. The synchronous receiver will
- //! instead wait for new messages by blocking the current thread. Naturally,
- //! this requires the Rust standard library. A synchronous channel
- //! can be constructed using the [`sync::channel`] function.
- use crate::{
- loom::atomic::AtomicUsize,
- util::wait::{Notify, WaitCell},
- Ref, ThingBuf,
- };
- use core::fmt;
- #[derive(Debug)]
- #[non_exhaustive]
- pub enum TrySendError<T = ()> {
- Full(T),
- Closed(T),
- }
- #[derive(Debug)]
- struct Inner<T, N> {
- thingbuf: ThingBuf<T>,
- rx_wait: WaitCell<N>,
- tx_count: AtomicUsize,
- }
- struct SendRefInner<'a, T, N: Notify> {
- inner: &'a Inner<T, N>,
- slot: Ref<'a, T>,
- }
- // ==== impl TrySendError ===
- impl TrySendError {
- fn with_value<T>(self, value: T) -> TrySendError<T> {
- match self {
- Self::Full(()) => TrySendError::Full(value),
- Self::Closed(()) => TrySendError::Closed(value),
- }
- }
- }
- // ==== impl Inner ====
- impl<T: Default, N: Notify> Inner<T, N> {
- fn try_send_ref(&self) -> Result<SendRefInner<'_, T, N>, TrySendError> {
- self.thingbuf
- .push_ref()
- .map(|slot| SendRefInner { inner: self, slot })
- .map_err(|_| {
- if self.rx_wait.is_rx_closed() {
- TrySendError::Closed(())
- } else {
- self.rx_wait.notify();
- TrySendError::Full(())
- }
- })
- }
- fn try_send(&self, val: T) -> Result<(), TrySendError<T>> {
- match self.try_send_ref() {
- Ok(mut slot) => {
- slot.with_mut(|slot| *slot = val);
- Ok(())
- }
- Err(e) => Err(e.with_value(val)),
- }
- }
- }
- impl<T, N: Notify> SendRefInner<'_, T, N> {
- #[inline]
- pub fn with<U>(&self, f: impl FnOnce(&T) -> U) -> U {
- self.slot.with(f)
- }
- #[inline]
- pub fn with_mut<U>(&mut self, f: impl FnOnce(&mut T) -> U) -> U {
- self.slot.with_mut(f)
- }
- }
- impl<T, N: Notify> Drop for SendRefInner<'_, T, N> {
- #[inline]
- fn drop(&mut self) {
- test_println!("drop SendRef<T, {}>", std::any::type_name::<N>());
- self.inner.rx_wait.notify();
- }
- }
- impl<T: fmt::Debug, N: Notify> fmt::Debug for SendRefInner<'_, T, N> {
- fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
- self.with(|val| fmt::Debug::fmt(val, f))
- }
- }
- impl<T: fmt::Display, N: Notify> fmt::Display for SendRefInner<'_, T, N> {
- fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
- self.with(|val| fmt::Display::fmt(val, f))
- }
- }
- impl<T: fmt::Write, N: Notify> fmt::Write for SendRefInner<'_, T, N> {
- #[inline]
- fn write_str(&mut self, s: &str) -> fmt::Result {
- self.with_mut(|val| val.write_str(s))
- }
- #[inline]
- fn write_char(&mut self, c: char) -> fmt::Result {
- self.with_mut(|val| val.write_char(c))
- }
- #[inline]
- fn write_fmt(&mut self, f: fmt::Arguments<'_>) -> fmt::Result {
- self.with_mut(|val| val.write_fmt(f))
- }
- }
- macro_rules! impl_send_ref {
- (pub struct $name:ident<$notify:ty>;) => {
- pub struct $name<'sender, T>(SendRefInner<'sender, T, $notify>);
- impl<T> $name<'_, T> {
- #[inline]
- pub fn with<U>(&self, f: impl FnOnce(&T) -> U) -> U {
- self.0.with(f)
- }
- #[inline]
- pub fn with_mut<U>(&mut self, f: impl FnOnce(&mut T) -> U) -> U {
- self.0.with_mut(f)
- }
- }
- impl<T: fmt::Debug> fmt::Debug for $name<'_, T> {
- fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
- self.0.fmt(f)
- }
- }
- impl<T: fmt::Display> fmt::Display for $name<'_, T> {
- fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
- self.0.fmt(f)
- }
- }
- impl<T: fmt::Write> fmt::Write for $name<'_, T> {
- #[inline]
- fn write_str(&mut self, s: &str) -> fmt::Result {
- self.0.write_str(s)
- }
- #[inline]
- fn write_char(&mut self, c: char) -> fmt::Result {
- self.0.write_char(c)
- }
- #[inline]
- fn write_fmt(&mut self, f: fmt::Arguments<'_>) -> fmt::Result {
- self.0.write_fmt(f)
- }
- }
- };
- }
- mod async_impl;
- pub use self::async_impl::*;
- feature! {
- #![feature = "std"]
- pub mod sync;
- }
- #[cfg(test)]
- mod tests;
|