mpsc.rs 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425
  1. //! Multi-producer, single-consumer channels using [`ThingBuf`](crate::ThingBuf).
  2. //!
  3. //! The default MPSC channel returned by the [`channel`] function is
  4. //! _asynchronous_: receiving from the channel is an `async fn`, and the
  5. //! receiving task willwait when there are no messages in the channel.
  6. //!
  7. //! If the "std" feature flag is enabled, this module also provides a
  8. //! synchronous channel, in the [`sync`] module. The synchronous receiver will
  9. //! instead wait for new messages by blocking the current thread. Naturally,
  10. //! this requires the Rust standard library. A synchronous channel
  11. //! can be constructed using the [`sync::channel`] function.
  12. use crate::{
  13. loom::{atomic::AtomicUsize, hint},
  14. wait::{Notify, WaitCell, WaitQueue, WaitResult},
  15. Core, Ref, Slot,
  16. };
  17. use core::{fmt, ops::Index, task::Poll};
  18. #[derive(Debug)]
  19. #[non_exhaustive]
  20. pub enum TrySendError<T = ()> {
  21. Full(T),
  22. Closed(T),
  23. }
  24. #[derive(Debug)]
  25. pub struct Closed<T = ()>(T);
  26. #[derive(Debug)]
  27. struct ChannelCore<N> {
  28. core: Core,
  29. rx_wait: WaitCell<N>,
  30. tx_count: AtomicUsize,
  31. tx_wait: WaitQueue<N>,
  32. }
  33. struct SendRefInner<'a, T, N: Notify> {
  34. // /!\ LOAD BEARING STRUCT DROP ORDER /!\
  35. //
  36. // The `Ref` field *must* be dropped before the `NotifyInner` field, or else
  37. // loom tests will fail. This ensures that the mutable access to the slot is
  38. // considered to have ended *before* the receiver thread/task is notified.
  39. //
  40. // The alternatives to a load-bearing drop order would be:
  41. // (a) put one field inside an `Option` so it can be dropped before the
  42. // other (not great, as it adds a little extra overhead even outside
  43. // of Loom tests),
  44. // (b) use `core::mem::ManuallyDrop` (also not great, requires additional
  45. // unsafe code that in this case we can avoid)
  46. //
  47. // So, given that, relying on struct field drop order seemed like the least
  48. // bad option here. Just don't reorder these fields. :)
  49. slot: Ref<'a, T>,
  50. _notify: NotifyRx<'a, N>,
  51. }
  52. struct NotifyRx<'a, N: Notify>(&'a WaitCell<N>);
  53. struct NotifyTx<'a, N: Notify + Unpin>(&'a WaitQueue<N>);
  54. // ==== impl TrySendError ===
  55. impl TrySendError {
  56. fn with_value<T>(self, value: T) -> TrySendError<T> {
  57. match self {
  58. Self::Full(()) => TrySendError::Full(value),
  59. Self::Closed(()) => TrySendError::Closed(value),
  60. }
  61. }
  62. }
  63. // ==== impl Inner ====
  64. impl<N> ChannelCore<N> {
  65. #[cfg(not(loom))]
  66. const fn new(capacity: usize) -> Self {
  67. Self {
  68. core: Core::new(capacity),
  69. rx_wait: WaitCell::new(),
  70. tx_count: AtomicUsize::new(1),
  71. tx_wait: WaitQueue::new(),
  72. }
  73. }
  74. #[cfg(loom)]
  75. fn new(capacity: usize) -> Self {
  76. Self {
  77. core: Core::new(capacity),
  78. rx_wait: WaitCell::new(),
  79. tx_count: AtomicUsize::new(1),
  80. tx_wait: WaitQueue::new(),
  81. }
  82. }
  83. }
  84. impl<N> ChannelCore<N>
  85. where
  86. N: Notify + Unpin,
  87. {
  88. fn close_rx(&self) {
  89. if self.core.close() {
  90. crate::loom::hint::spin_loop();
  91. test_println!("draining_queue");
  92. self.tx_wait.close();
  93. }
  94. }
  95. }
  96. impl<N> ChannelCore<N>
  97. where
  98. N: Notify + Unpin,
  99. {
  100. fn try_send_ref<'a, T>(
  101. &'a self,
  102. slots: &'a [Slot<T>],
  103. ) -> Result<SendRefInner<'a, T, N>, TrySendError>
  104. where
  105. T: Default,
  106. {
  107. self.core.push_ref(slots).map(|slot| SendRefInner {
  108. _notify: NotifyRx(&self.rx_wait),
  109. slot,
  110. })
  111. }
  112. fn try_send<T>(&self, slots: &[Slot<T>], val: T) -> Result<(), TrySendError<T>>
  113. where
  114. T: Default,
  115. {
  116. match self.try_send_ref(slots) {
  117. Ok(mut slot) => {
  118. slot.with_mut(|slot| *slot = val);
  119. Ok(())
  120. }
  121. Err(e) => Err(e.with_value(val)),
  122. }
  123. }
  124. /// Performs one iteration of the `recv_ref` loop.
  125. ///
  126. /// The loop itself has to be written in the actual `send` method's
  127. /// implementation, rather than on `inner`, because it might be async and
  128. /// may yield, or might park the thread.
  129. fn poll_recv_ref<'a, T, S>(
  130. &'a self,
  131. slots: &'a S,
  132. mk_waiter: impl Fn() -> N,
  133. ) -> Poll<Option<Ref<'a, T>>>
  134. where
  135. S: Index<usize, Output = Slot<T>> + ?Sized,
  136. T: Default,
  137. {
  138. macro_rules! try_poll_recv {
  139. () => {
  140. // If we got a value, return it!
  141. match self.core.pop_ref(slots) {
  142. Ok(slot) => return Poll::Ready(Some(slot)),
  143. Err(TrySendError::Closed(_)) => return Poll::Ready(None),
  144. _ => {}
  145. }
  146. };
  147. }
  148. test_println!("poll_recv_ref");
  149. loop {
  150. test_println!("poll_recv_ref => loop");
  151. // try to receive a reference, returning if we succeeded or the
  152. // channel is closed.
  153. try_poll_recv!();
  154. // otherwise, gotosleep
  155. match test_dbg!(self.rx_wait.wait_with(&mk_waiter)) {
  156. WaitResult::Wait => {
  157. // we successfully registered a waiter! try polling again,
  158. // just in case someone sent a message while we were
  159. // registering the waiter.
  160. try_poll_recv!();
  161. test_println!("-> yield");
  162. return Poll::Pending;
  163. }
  164. WaitResult::Closed => {
  165. // the channel is closed (all the receivers are dropped).
  166. // however, there may be messages left in the queue. try
  167. // popping from the queue until it's empty.
  168. return Poll::Ready(self.core.pop_ref(slots).ok());
  169. }
  170. WaitResult::Notified => {
  171. // we were notified while we were trying to register the
  172. // waiter. loop and try polling again.
  173. hint::spin_loop();
  174. }
  175. }
  176. }
  177. }
  178. }
  179. // === impl SendRefInner ===
  180. impl<T, N: Notify> core::ops::Deref for SendRefInner<'_, T, N> {
  181. type Target = T;
  182. #[inline]
  183. fn deref(&self) -> &Self::Target {
  184. self.slot.deref()
  185. }
  186. }
  187. impl<T, N: Notify> core::ops::DerefMut for SendRefInner<'_, T, N> {
  188. #[inline]
  189. fn deref_mut(&mut self) -> &mut Self::Target {
  190. self.slot.deref_mut()
  191. }
  192. }
  193. impl<T, N: Notify> SendRefInner<'_, T, N> {
  194. #[inline]
  195. pub fn with<U>(&self, f: impl FnOnce(&T) -> U) -> U {
  196. self.slot.with(f)
  197. }
  198. #[inline]
  199. pub fn with_mut<U>(&mut self, f: impl FnOnce(&mut T) -> U) -> U {
  200. self.slot.with_mut(f)
  201. }
  202. }
  203. impl<T: fmt::Debug, N: Notify> fmt::Debug for SendRefInner<'_, T, N> {
  204. fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
  205. self.with(|val| fmt::Debug::fmt(val, f))
  206. }
  207. }
  208. impl<T: fmt::Display, N: Notify> fmt::Display for SendRefInner<'_, T, N> {
  209. fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
  210. self.with(|val| fmt::Display::fmt(val, f))
  211. }
  212. }
  213. impl<T: fmt::Write, N: Notify> fmt::Write for SendRefInner<'_, T, N> {
  214. #[inline]
  215. fn write_str(&mut self, s: &str) -> fmt::Result {
  216. self.with_mut(|val| val.write_str(s))
  217. }
  218. #[inline]
  219. fn write_char(&mut self, c: char) -> fmt::Result {
  220. self.with_mut(|val| val.write_char(c))
  221. }
  222. #[inline]
  223. fn write_fmt(&mut self, f: fmt::Arguments<'_>) -> fmt::Result {
  224. self.with_mut(|val| val.write_fmt(f))
  225. }
  226. }
  227. impl<N: Notify> Drop for NotifyRx<'_, N> {
  228. #[inline]
  229. fn drop(&mut self) {
  230. test_println!("notifying rx ({})", core::any::type_name::<N>());
  231. self.0.notify();
  232. }
  233. }
  234. impl<N: Notify + Unpin> Drop for NotifyTx<'_, N> {
  235. #[inline]
  236. fn drop(&mut self) {
  237. test_println!("notifying tx ({})", core::any::type_name::<N>());
  238. self.0.notify();
  239. }
  240. }
  241. macro_rules! impl_send_ref {
  242. (pub struct $name:ident<$notify:ty>;) => {
  243. pub struct $name<'sender, T>(SendRefInner<'sender, T, $notify>);
  244. impl<T> $name<'_, T> {
  245. #[inline]
  246. pub fn with<U>(&self, f: impl FnOnce(&T) -> U) -> U {
  247. self.0.with(f)
  248. }
  249. #[inline]
  250. pub fn with_mut<U>(&mut self, f: impl FnOnce(&mut T) -> U) -> U {
  251. self.0.with_mut(f)
  252. }
  253. }
  254. impl<T> core::ops::Deref for $name<'_, T> {
  255. type Target = T;
  256. #[inline]
  257. fn deref(&self) -> &Self::Target {
  258. self.0.deref()
  259. }
  260. }
  261. impl<T> core::ops::DerefMut for $name<'_, T> {
  262. #[inline]
  263. fn deref_mut(&mut self) -> &mut Self::Target {
  264. self.0.deref_mut()
  265. }
  266. }
  267. impl<T: fmt::Debug> fmt::Debug for $name<'_, T> {
  268. fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
  269. self.0.fmt(f)
  270. }
  271. }
  272. impl<T: fmt::Display> fmt::Display for $name<'_, T> {
  273. fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
  274. self.0.fmt(f)
  275. }
  276. }
  277. impl<T: fmt::Write> fmt::Write for $name<'_, T> {
  278. #[inline]
  279. fn write_str(&mut self, s: &str) -> fmt::Result {
  280. self.0.write_str(s)
  281. }
  282. #[inline]
  283. fn write_char(&mut self, c: char) -> fmt::Result {
  284. self.0.write_char(c)
  285. }
  286. #[inline]
  287. fn write_fmt(&mut self, f: fmt::Arguments<'_>) -> fmt::Result {
  288. self.0.write_fmt(f)
  289. }
  290. }
  291. };
  292. }
  293. macro_rules! impl_recv_ref {
  294. (pub struct $name:ident<$notify:ty>;) => {
  295. pub struct $name<'recv, T> {
  296. // /!\ LOAD BEARING STRUCT DROP ORDER /!\
  297. //
  298. // The `Ref` field *must* be dropped before the `NotifyTx` field, or else
  299. // loom tests will fail. This ensures that the mutable access to the slot is
  300. // considered to have ended *before* the receiver thread/task is notified.
  301. //
  302. // The alternatives to a load-bearing drop order would be:
  303. // (a) put one field inside an `Option` so it can be dropped before the
  304. // other (not great, as it adds a little extra overhead even outside
  305. // of Loom tests),
  306. // (b) use `core::mem::ManuallyDrop` (also not great, requires additional
  307. // unsafe code that in this case we can avoid)
  308. //
  309. // So, given that, relying on struct field drop order seemed like the least
  310. // bad option here. Just don't reorder these fields. :)
  311. slot: Ref<'recv, T>,
  312. _notify: crate::mpsc::NotifyTx<'recv, $notify>,
  313. }
  314. impl<T> $name<'_, T> {
  315. #[inline]
  316. pub fn with<U>(&self, f: impl FnOnce(&T) -> U) -> U {
  317. self.slot.with(f)
  318. }
  319. #[inline]
  320. pub fn with_mut<U>(&mut self, f: impl FnOnce(&mut T) -> U) -> U {
  321. self.slot.with_mut(f)
  322. }
  323. }
  324. impl<T> core::ops::Deref for $name<'_, T> {
  325. type Target = T;
  326. #[inline]
  327. fn deref(&self) -> &Self::Target {
  328. self.slot.deref()
  329. }
  330. }
  331. impl<T> core::ops::DerefMut for $name<'_, T> {
  332. #[inline]
  333. fn deref_mut(&mut self) -> &mut Self::Target {
  334. self.slot.deref_mut()
  335. }
  336. }
  337. impl<T: fmt::Debug> fmt::Debug for $name<'_, T> {
  338. fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
  339. self.slot.fmt(f)
  340. }
  341. }
  342. impl<T: fmt::Display> fmt::Display for $name<'_, T> {
  343. fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
  344. self.slot.fmt(f)
  345. }
  346. }
  347. impl<T: fmt::Write> fmt::Write for $name<'_, T> {
  348. #[inline]
  349. fn write_str(&mut self, s: &str) -> fmt::Result {
  350. self.slot.write_str(s)
  351. }
  352. #[inline]
  353. fn write_char(&mut self, c: char) -> fmt::Result {
  354. self.slot.write_char(c)
  355. }
  356. #[inline]
  357. fn write_fmt(&mut self, f: fmt::Arguments<'_>) -> fmt::Result {
  358. self.slot.write_fmt(f)
  359. }
  360. }
  361. };
  362. }
  363. mod async_impl;
  364. pub use self::async_impl::*;
  365. feature! {
  366. #![feature = "std"]
  367. pub mod sync;
  368. }
  369. #[cfg(all(loom, test))]
  370. mod tests;