mpsc.rs 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183
  1. //! Multi-producer, single-consumer channels using [`ThingBuf`](crate::ThingBuf).
  2. //!
  3. //! The default MPSC channel returned by the [`channel`] function is
  4. //! _asynchronous_: receiving from the channel is an `async fn`, and the
  5. //! receiving task willwait when there are no messages in the channel.
  6. //!
  7. //! If the "std" feature flag is enabled, this module also provides a
  8. //! synchronous channel, in the [`sync`] module. The synchronous receiver will
  9. //! instead wait for new messages by blocking the current thread. Naturally,
  10. //! this requires the Rust standard library. A synchronous channel
  11. //! can be constructed using the [`sync::channel`] function.
  12. use crate::{
  13. loom::atomic::AtomicUsize,
  14. util::wait::{Notify, WaitCell},
  15. Ref, ThingBuf,
  16. };
  17. use core::fmt;
  18. #[derive(Debug)]
  19. #[non_exhaustive]
  20. pub enum TrySendError<T = ()> {
  21. Full(T),
  22. Closed(T),
  23. }
  24. #[derive(Debug)]
  25. struct Inner<T, N> {
  26. thingbuf: ThingBuf<T>,
  27. rx_wait: WaitCell<N>,
  28. tx_count: AtomicUsize,
  29. }
  30. struct SendRefInner<'a, T, N: Notify> {
  31. inner: &'a Inner<T, N>,
  32. slot: Ref<'a, T>,
  33. }
  34. // ==== impl TrySendError ===
  35. impl TrySendError {
  36. fn with_value<T>(self, value: T) -> TrySendError<T> {
  37. match self {
  38. Self::Full(()) => TrySendError::Full(value),
  39. Self::Closed(()) => TrySendError::Closed(value),
  40. }
  41. }
  42. }
  43. // ==== impl Inner ====
  44. impl<T: Default, N: Notify> Inner<T, N> {
  45. fn try_send_ref(&self) -> Result<SendRefInner<'_, T, N>, TrySendError> {
  46. self.thingbuf
  47. .push_ref()
  48. .map(|slot| SendRefInner { inner: self, slot })
  49. .map_err(|_| {
  50. if self.rx_wait.is_rx_closed() {
  51. TrySendError::Closed(())
  52. } else {
  53. self.rx_wait.notify();
  54. TrySendError::Full(())
  55. }
  56. })
  57. }
  58. fn try_send(&self, val: T) -> Result<(), TrySendError<T>> {
  59. match self.try_send_ref() {
  60. Ok(mut slot) => {
  61. slot.with_mut(|slot| *slot = val);
  62. Ok(())
  63. }
  64. Err(e) => Err(e.with_value(val)),
  65. }
  66. }
  67. }
  68. impl<T, N: Notify> SendRefInner<'_, T, N> {
  69. #[inline]
  70. pub fn with<U>(&self, f: impl FnOnce(&T) -> U) -> U {
  71. self.slot.with(f)
  72. }
  73. #[inline]
  74. pub fn with_mut<U>(&mut self, f: impl FnOnce(&mut T) -> U) -> U {
  75. self.slot.with_mut(f)
  76. }
  77. }
  78. impl<T, N: Notify> Drop for SendRefInner<'_, T, N> {
  79. #[inline]
  80. fn drop(&mut self) {
  81. test_println!("drop SendRef<T, {}>", std::any::type_name::<N>());
  82. self.inner.rx_wait.notify();
  83. }
  84. }
  85. impl<T: fmt::Debug, N: Notify> fmt::Debug for SendRefInner<'_, T, N> {
  86. fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
  87. self.with(|val| fmt::Debug::fmt(val, f))
  88. }
  89. }
  90. impl<T: fmt::Display, N: Notify> fmt::Display for SendRefInner<'_, T, N> {
  91. fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
  92. self.with(|val| fmt::Display::fmt(val, f))
  93. }
  94. }
  95. impl<T: fmt::Write, N: Notify> fmt::Write for SendRefInner<'_, T, N> {
  96. #[inline]
  97. fn write_str(&mut self, s: &str) -> fmt::Result {
  98. self.with_mut(|val| val.write_str(s))
  99. }
  100. #[inline]
  101. fn write_char(&mut self, c: char) -> fmt::Result {
  102. self.with_mut(|val| val.write_char(c))
  103. }
  104. #[inline]
  105. fn write_fmt(&mut self, f: fmt::Arguments<'_>) -> fmt::Result {
  106. self.with_mut(|val| val.write_fmt(f))
  107. }
  108. }
  109. macro_rules! impl_send_ref {
  110. (pub struct $name:ident<$notify:ty>;) => {
  111. pub struct $name<'sender, T>(SendRefInner<'sender, T, $notify>);
  112. impl<T> $name<'_, T> {
  113. #[inline]
  114. pub fn with<U>(&self, f: impl FnOnce(&T) -> U) -> U {
  115. self.0.with(f)
  116. }
  117. #[inline]
  118. pub fn with_mut<U>(&mut self, f: impl FnOnce(&mut T) -> U) -> U {
  119. self.0.with_mut(f)
  120. }
  121. }
  122. impl<T: fmt::Debug> fmt::Debug for $name<'_, T> {
  123. fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
  124. self.0.fmt(f)
  125. }
  126. }
  127. impl<T: fmt::Display> fmt::Display for $name<'_, T> {
  128. fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
  129. self.0.fmt(f)
  130. }
  131. }
  132. impl<T: fmt::Write> fmt::Write for $name<'_, T> {
  133. #[inline]
  134. fn write_str(&mut self, s: &str) -> fmt::Result {
  135. self.0.write_str(s)
  136. }
  137. #[inline]
  138. fn write_char(&mut self, c: char) -> fmt::Result {
  139. self.0.write_char(c)
  140. }
  141. #[inline]
  142. fn write_fmt(&mut self, f: fmt::Arguments<'_>) -> fmt::Result {
  143. self.0.write_fmt(f)
  144. }
  145. }
  146. };
  147. }
  148. mod async_impl;
  149. pub use self::async_impl::*;
  150. feature! {
  151. #![feature = "std"]
  152. pub mod sync;
  153. }
  154. #[cfg(test)]
  155. mod tests;