mpsc.rs 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434
  1. //! Multi-producer, single-consumer channels using [`ThingBuf`](crate::ThingBuf).
  2. //!
  3. //! The default MPSC channel returned by the [`channel`] function is
  4. //! _asynchronous_: receiving from the channel is an `async fn`, and the
  5. //! receiving task willwait when there are no messages in the channel.
  6. //!
  7. //! If the "std" feature flag is enabled, this module also provides a
  8. //! synchronous channel, in the [`sync`] module. The synchronous receiver will
  9. //! instead wait for new messages by blocking the current thread. Naturally,
  10. //! this requires the Rust standard library. A synchronous channel
  11. //! can be constructed using the [`sync::channel`] function.
  12. use crate::{
  13. loom::{atomic::AtomicUsize, hint},
  14. util::Backoff,
  15. wait::{queue, Notify, WaitCell, WaitQueue, WaitResult},
  16. Ref, ThingBuf,
  17. };
  18. use core::fmt;
  19. use core::pin::Pin;
  20. use core::task::Poll;
  21. #[derive(Debug)]
  22. #[non_exhaustive]
  23. pub enum TrySendError<T = ()> {
  24. Full(T),
  25. Closed(T),
  26. }
  27. #[derive(Debug)]
  28. pub struct Closed<T = ()>(T);
  29. #[derive(Debug)]
  30. struct Inner<T, N: Notify> {
  31. thingbuf: ThingBuf<T>,
  32. rx_wait: WaitCell<N>,
  33. tx_count: AtomicUsize,
  34. tx_wait: WaitQueue<N>,
  35. }
  36. struct SendRefInner<'a, T, N: Notify> {
  37. // /!\ LOAD BEARING STRUCT DROP ORDER /!\
  38. //
  39. // The `Ref` field *must* be dropped before the `NotifyInner` field, or else
  40. // loom tests will fail. This ensures that the mutable access to the slot is
  41. // considered to have ended *before* the receiver thread/task is notified.
  42. //
  43. // The alternatives to a load-bearing drop order would be:
  44. // (a) put one field inside an `Option` so it can be dropped before the
  45. // other (not great, as it adds a little extra overhead even outside
  46. // of Loom tests),
  47. // (b) use `core::mem::ManuallyDrop` (also not great, requires additional
  48. // unsafe code that in this case we can avoid)
  49. //
  50. // So, given that, relying on struct field drop order seemed like the least
  51. // bad option here. Just don't reorder these fields. :)
  52. slot: Ref<'a, T>,
  53. _notify: NotifyRx<'a, N>,
  54. }
  55. struct NotifyRx<'a, N: Notify>(&'a WaitCell<N>);
  56. struct NotifyTx<'a, N: Notify + Unpin>(&'a WaitQueue<N>);
  57. // ==== impl TrySendError ===
  58. impl TrySendError {
  59. fn with_value<T>(self, value: T) -> TrySendError<T> {
  60. match self {
  61. Self::Full(()) => TrySendError::Full(value),
  62. Self::Closed(()) => TrySendError::Closed(value),
  63. }
  64. }
  65. }
  66. // ==== impl Inner ====
  67. impl<T, N: Notify + Unpin> Inner<T, N> {
  68. fn new(thingbuf: ThingBuf<T>) -> Self {
  69. Self {
  70. thingbuf,
  71. rx_wait: WaitCell::new(),
  72. tx_count: AtomicUsize::new(1),
  73. tx_wait: WaitQueue::new(),
  74. }
  75. }
  76. fn close_rx(&self) {
  77. if self.thingbuf.core.close() {
  78. crate::loom::hint::spin_loop();
  79. test_println!("draining_queue");
  80. self.tx_wait.close();
  81. }
  82. }
  83. }
  84. impl<T: Default, N: Notify + Unpin> Inner<T, N> {
  85. fn try_send_ref(&self) -> Result<SendRefInner<'_, T, N>, TrySendError> {
  86. self.thingbuf
  87. .core
  88. .push_ref(self.thingbuf.slots.as_ref())
  89. .map(|slot| SendRefInner {
  90. _notify: NotifyRx(&self.rx_wait),
  91. slot,
  92. })
  93. }
  94. fn try_send(&self, val: T) -> Result<(), TrySendError<T>> {
  95. match self.try_send_ref() {
  96. Ok(mut slot) => {
  97. slot.with_mut(|slot| *slot = val);
  98. Ok(())
  99. }
  100. Err(e) => Err(e.with_value(val)),
  101. }
  102. }
  103. /// Performs one iteration of the `send_ref` loop.
  104. ///
  105. /// The loop itself has to be written in the actual `send` method's
  106. /// implementation, rather than on `inner`, because it might be async and
  107. /// may yield, or might park the thread.
  108. fn poll_send_ref(
  109. &self,
  110. mut node: Option<Pin<&mut queue::Waiter<N>>>,
  111. mut register: impl FnMut(&mut Option<N>),
  112. ) -> Poll<Result<SendRefInner<'_, T, N>, Closed>> {
  113. let mut backoff = Backoff::new();
  114. // try to send a few times in a loop, in case the receiver notifies us
  115. // right before we park.
  116. loop {
  117. // try to reserve a send slot, returning if we succeeded or if the
  118. // queue was closed.
  119. match self.try_send_ref() {
  120. Ok(slot) => return Poll::Ready(Ok(slot)),
  121. Err(TrySendError::Closed(_)) => return Poll::Ready(Err(Closed(()))),
  122. Err(_) => {}
  123. }
  124. // try to push a waiter
  125. let pushed_waiter = self.tx_wait.push_waiter(&mut node, &mut register);
  126. match test_dbg!(pushed_waiter) {
  127. WaitResult::Closed => {
  128. // the channel closed while we were registering the waiter!
  129. return Poll::Ready(Err(Closed(())));
  130. }
  131. WaitResult::Wait => {
  132. // okay, we are now queued to wait. gotosleep!
  133. return Poll::Pending;
  134. }
  135. WaitResult::Notified => {
  136. // we consumed a queued notification. try again...
  137. backoff.spin_yield();
  138. }
  139. }
  140. }
  141. }
  142. /// Performs one iteration of the `recv_ref` loop.
  143. ///
  144. /// The loop itself has to be written in the actual `send` method's
  145. /// implementation, rather than on `inner`, because it might be async and
  146. /// may yield, or might park the thread.
  147. fn poll_recv_ref(&self, mk_waiter: impl Fn() -> N) -> Poll<Option<Ref<'_, T>>> {
  148. macro_rules! try_poll_recv {
  149. () => {
  150. // If we got a value, return it!
  151. match self.thingbuf.core.pop_ref(self.thingbuf.slots.as_ref()) {
  152. Ok(slot) => return Poll::Ready(Some(slot)),
  153. Err(TrySendError::Closed(_)) => return Poll::Ready(None),
  154. _ => {}
  155. }
  156. };
  157. }
  158. test_println!("poll_recv_ref");
  159. loop {
  160. test_println!("poll_recv_ref => loop");
  161. // try to receive a reference, returning if we succeeded or the
  162. // channel is closed.
  163. try_poll_recv!();
  164. // otherwise, gotosleep
  165. match test_dbg!(self.rx_wait.wait_with(&mk_waiter)) {
  166. WaitResult::Wait => {
  167. // we successfully registered a waiter! try polling again,
  168. // just in case someone sent a message while we were
  169. // registering the waiter.
  170. try_poll_recv!();
  171. return Poll::Pending;
  172. }
  173. WaitResult::Closed => {
  174. // the channel is closed (all the receivers are dropped).
  175. // however, there may be messages left in the queue. try
  176. // popping from the queue until it's empty.
  177. return Poll::Ready(self.thingbuf.pop_ref());
  178. }
  179. WaitResult::Notified => {
  180. // we were notified while we were trying to register the
  181. // waiter. loop and try polling again.
  182. hint::spin_loop();
  183. }
  184. }
  185. }
  186. }
  187. }
  188. impl<T, N: Notify> core::ops::Deref for SendRefInner<'_, T, N> {
  189. type Target = T;
  190. #[inline]
  191. fn deref(&self) -> &Self::Target {
  192. self.slot.deref()
  193. }
  194. }
  195. impl<T, N: Notify> core::ops::DerefMut for SendRefInner<'_, T, N> {
  196. #[inline]
  197. fn deref_mut(&mut self) -> &mut Self::Target {
  198. self.slot.deref_mut()
  199. }
  200. }
  201. impl<T, N: Notify> SendRefInner<'_, T, N> {
  202. #[inline]
  203. pub fn with<U>(&self, f: impl FnOnce(&T) -> U) -> U {
  204. self.slot.with(f)
  205. }
  206. #[inline]
  207. pub fn with_mut<U>(&mut self, f: impl FnOnce(&mut T) -> U) -> U {
  208. self.slot.with_mut(f)
  209. }
  210. }
  211. impl<T: fmt::Debug, N: Notify> fmt::Debug for SendRefInner<'_, T, N> {
  212. fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
  213. self.with(|val| fmt::Debug::fmt(val, f))
  214. }
  215. }
  216. impl<T: fmt::Display, N: Notify> fmt::Display for SendRefInner<'_, T, N> {
  217. fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
  218. self.with(|val| fmt::Display::fmt(val, f))
  219. }
  220. }
  221. impl<T: fmt::Write, N: Notify> fmt::Write for SendRefInner<'_, T, N> {
  222. #[inline]
  223. fn write_str(&mut self, s: &str) -> fmt::Result {
  224. self.with_mut(|val| val.write_str(s))
  225. }
  226. #[inline]
  227. fn write_char(&mut self, c: char) -> fmt::Result {
  228. self.with_mut(|val| val.write_char(c))
  229. }
  230. #[inline]
  231. fn write_fmt(&mut self, f: fmt::Arguments<'_>) -> fmt::Result {
  232. self.with_mut(|val| val.write_fmt(f))
  233. }
  234. }
  235. impl<N: Notify> Drop for NotifyRx<'_, N> {
  236. #[inline]
  237. fn drop(&mut self) {
  238. test_println!("notifying rx ({})", core::any::type_name::<N>());
  239. self.0.notify();
  240. }
  241. }
  242. impl<N: Notify + Unpin> Drop for NotifyTx<'_, N> {
  243. #[inline]
  244. fn drop(&mut self) {
  245. test_println!("notifying tx ({})", core::any::type_name::<N>());
  246. self.0.notify();
  247. }
  248. }
  249. macro_rules! impl_send_ref {
  250. (pub struct $name:ident<$notify:ty>;) => {
  251. pub struct $name<'sender, T>(SendRefInner<'sender, T, $notify>);
  252. impl<T> $name<'_, T> {
  253. #[inline]
  254. pub fn with<U>(&self, f: impl FnOnce(&T) -> U) -> U {
  255. self.0.with(f)
  256. }
  257. #[inline]
  258. pub fn with_mut<U>(&mut self, f: impl FnOnce(&mut T) -> U) -> U {
  259. self.0.with_mut(f)
  260. }
  261. }
  262. impl<T> core::ops::Deref for $name<'_, T> {
  263. type Target = T;
  264. #[inline]
  265. fn deref(&self) -> &Self::Target {
  266. self.0.deref()
  267. }
  268. }
  269. impl<T> core::ops::DerefMut for $name<'_, T> {
  270. #[inline]
  271. fn deref_mut(&mut self) -> &mut Self::Target {
  272. self.0.deref_mut()
  273. }
  274. }
  275. impl<T: fmt::Debug> fmt::Debug for $name<'_, T> {
  276. fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
  277. self.0.fmt(f)
  278. }
  279. }
  280. impl<T: fmt::Display> fmt::Display for $name<'_, T> {
  281. fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
  282. self.0.fmt(f)
  283. }
  284. }
  285. impl<T: fmt::Write> fmt::Write for $name<'_, T> {
  286. #[inline]
  287. fn write_str(&mut self, s: &str) -> fmt::Result {
  288. self.0.write_str(s)
  289. }
  290. #[inline]
  291. fn write_char(&mut self, c: char) -> fmt::Result {
  292. self.0.write_char(c)
  293. }
  294. #[inline]
  295. fn write_fmt(&mut self, f: fmt::Arguments<'_>) -> fmt::Result {
  296. self.0.write_fmt(f)
  297. }
  298. }
  299. };
  300. }
  301. macro_rules! impl_recv_ref {
  302. (pub struct $name:ident<$notify:ty>;) => {
  303. pub struct $name<'recv, T> {
  304. // /!\ LOAD BEARING STRUCT DROP ORDER /!\
  305. //
  306. // The `Ref` field *must* be dropped before the `NotifyTx` field, or else
  307. // loom tests will fail. This ensures that the mutable access to the slot is
  308. // considered to have ended *before* the receiver thread/task is notified.
  309. //
  310. // The alternatives to a load-bearing drop order would be:
  311. // (a) put one field inside an `Option` so it can be dropped before the
  312. // other (not great, as it adds a little extra overhead even outside
  313. // of Loom tests),
  314. // (b) use `core::mem::ManuallyDrop` (also not great, requires additional
  315. // unsafe code that in this case we can avoid)
  316. //
  317. // So, given that, relying on struct field drop order seemed like the least
  318. // bad option here. Just don't reorder these fields. :)
  319. slot: Ref<'recv, T>,
  320. _notify: crate::mpsc::NotifyTx<'recv, $notify>,
  321. }
  322. impl<T> $name<'_, T> {
  323. #[inline]
  324. pub fn with<U>(&self, f: impl FnOnce(&T) -> U) -> U {
  325. self.slot.with(f)
  326. }
  327. #[inline]
  328. pub fn with_mut<U>(&mut self, f: impl FnOnce(&mut T) -> U) -> U {
  329. self.slot.with_mut(f)
  330. }
  331. }
  332. impl<T> core::ops::Deref for $name<'_, T> {
  333. type Target = T;
  334. #[inline]
  335. fn deref(&self) -> &Self::Target {
  336. self.slot.deref()
  337. }
  338. }
  339. impl<T> core::ops::DerefMut for $name<'_, T> {
  340. #[inline]
  341. fn deref_mut(&mut self) -> &mut Self::Target {
  342. self.slot.deref_mut()
  343. }
  344. }
  345. impl<T: fmt::Debug> fmt::Debug for $name<'_, T> {
  346. fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
  347. self.slot.fmt(f)
  348. }
  349. }
  350. impl<T: fmt::Display> fmt::Display for $name<'_, T> {
  351. fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
  352. self.slot.fmt(f)
  353. }
  354. }
  355. impl<T: fmt::Write> fmt::Write for $name<'_, T> {
  356. #[inline]
  357. fn write_str(&mut self, s: &str) -> fmt::Result {
  358. self.slot.write_str(s)
  359. }
  360. #[inline]
  361. fn write_char(&mut self, c: char) -> fmt::Result {
  362. self.slot.write_char(c)
  363. }
  364. #[inline]
  365. fn write_fmt(&mut self, f: fmt::Arguments<'_>) -> fmt::Result {
  366. self.slot.write_fmt(f)
  367. }
  368. }
  369. };
  370. }
  371. mod async_impl;
  372. pub use self::async_impl::*;
  373. feature! {
  374. #![feature = "std"]
  375. pub mod sync;
  376. }
  377. #[cfg(all(loom, test))]
  378. mod tests;