async_impl.rs 9.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302
  1. use super::*;
  2. use crate::{
  3. loom::{
  4. atomic::{self, Ordering},
  5. sync::Arc,
  6. },
  7. wait::queue,
  8. Ref, ThingBuf,
  9. };
  10. use core::{
  11. fmt,
  12. future::Future,
  13. pin::Pin,
  14. task::{Context, Poll, Waker},
  15. };
  16. /// Returns a new synchronous multi-producer, single consumer channel.
  17. pub fn channel<T>(thingbuf: ThingBuf<T>) -> (Sender<T>, Receiver<T>) {
  18. let inner = Arc::new(Inner::new(thingbuf));
  19. let tx = Sender {
  20. inner: inner.clone(),
  21. };
  22. let rx = Receiver { inner };
  23. (tx, rx)
  24. }
  25. #[derive(Debug)]
  26. pub struct Sender<T> {
  27. inner: Arc<Inner<T, Waker>>,
  28. }
  29. #[derive(Debug)]
  30. pub struct Receiver<T> {
  31. inner: Arc<Inner<T, Waker>>,
  32. }
  33. impl_send_ref! {
  34. pub struct SendRef<Waker>;
  35. }
  36. impl_recv_ref! {
  37. pub struct RecvRef<Waker>;
  38. }
  39. /// A [`Future`] that tries to receive a reference from a [`Receiver`].
  40. ///
  41. /// This type is returned by [`Receiver::recv_ref`].
  42. #[must_use = "futures do nothing unless you `.await` or poll them"]
  43. pub struct RecvRefFuture<'a, T> {
  44. rx: &'a Receiver<T>,
  45. }
  46. /// A [`Future`] that tries to receive a value from a [`Receiver`].
  47. ///
  48. /// This type is returned by [`Receiver::recv`].
  49. ///
  50. /// This is equivalent to the [`RecvRefFuture`] future, but the value is moved out of
  51. /// the [`ThingBuf`] after it is received. This means that allocations are not
  52. /// reused.
  53. #[must_use = "futures do nothing unless you `.await` or poll them"]
  54. pub struct RecvFuture<'a, T> {
  55. rx: &'a Receiver<T>,
  56. }
  57. // === impl Sender ===
  58. impl<T: Default> Sender<T> {
  59. pub fn try_send_ref(&self) -> Result<SendRef<'_, T>, TrySendError> {
  60. self.inner.try_send_ref().map(SendRef)
  61. }
  62. pub fn try_send(&self, val: T) -> Result<(), TrySendError<T>> {
  63. self.inner.try_send(val)
  64. }
  65. pub async fn send_ref(&self) -> Result<SendRef<'_, T>, Closed> {
  66. #[pin_project::pin_project(PinnedDrop)]
  67. struct SendRefFuture<'sender, T> {
  68. tx: &'sender Sender<T>,
  69. has_been_queued: bool,
  70. #[pin]
  71. waiter: queue::Waiter<Waker>,
  72. }
  73. impl<'sender, T: Default + 'sender> Future for SendRefFuture<'sender, T> {
  74. type Output = Result<SendRef<'sender, T>, Closed>;
  75. fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
  76. test_println!("SendRefFuture::poll({:p})", self);
  77. // perform one send ref loop iteration
  78. let this = self.as_mut().project();
  79. let waiter = if test_dbg!(*this.has_been_queued) {
  80. None
  81. } else {
  82. Some(this.waiter)
  83. };
  84. this.tx
  85. .inner
  86. .poll_send_ref(waiter, |waker| {
  87. // if this is called, we are definitely getting queued.
  88. *this.has_been_queued = true;
  89. // if the wait node does not already have a waker, or the task
  90. // has been polled with a waker that won't wake the previous
  91. // one, register a new waker.
  92. let my_waker = cx.waker();
  93. // do we need to re-register?
  94. let will_wake = waker
  95. .as_ref()
  96. .map(|waker| test_dbg!(waker.will_wake(my_waker)))
  97. .unwrap_or(false);
  98. if test_dbg!(will_wake) {
  99. return;
  100. }
  101. *waker = Some(my_waker.clone());
  102. })
  103. .map(|ok| {
  104. // avoid having to lock the list to remove a node that's
  105. // definitely not queued.
  106. *this.has_been_queued = false;
  107. ok.map(SendRef)
  108. })
  109. }
  110. }
  111. #[pin_project::pinned_drop]
  112. impl<T> PinnedDrop for SendRefFuture<'_, T> {
  113. fn drop(self: Pin<&mut Self>) {
  114. test_println!("SendRefFuture::drop({:p})", self);
  115. if test_dbg!(self.has_been_queued) {
  116. let this = self.project();
  117. this.waiter.remove(&this.tx.inner.tx_wait)
  118. }
  119. }
  120. }
  121. SendRefFuture {
  122. tx: self,
  123. has_been_queued: false,
  124. waiter: queue::Waiter::new(),
  125. }
  126. .await
  127. }
  128. pub async fn send(&self, val: T) -> Result<(), Closed<T>> {
  129. match self.send_ref().await {
  130. Err(Closed(())) => Err(Closed(val)),
  131. Ok(mut slot) => {
  132. slot.with_mut(|slot| *slot = val);
  133. Ok(())
  134. }
  135. }
  136. }
  137. }
  138. impl<T> Clone for Sender<T> {
  139. fn clone(&self) -> Self {
  140. test_dbg!(self.inner.tx_count.fetch_add(1, Ordering::Relaxed));
  141. Self {
  142. inner: self.inner.clone(),
  143. }
  144. }
  145. }
  146. impl<T> Drop for Sender<T> {
  147. fn drop(&mut self) {
  148. if test_dbg!(self.inner.tx_count.fetch_sub(1, Ordering::Release)) > 1 {
  149. return;
  150. }
  151. // if we are the last sender, synchronize
  152. test_dbg!(atomic::fence(Ordering::SeqCst));
  153. self.inner.thingbuf.core.close();
  154. self.inner.rx_wait.close_tx();
  155. }
  156. }
  157. // === impl Receiver ===
  158. impl<T: Default> Receiver<T> {
  159. pub fn recv_ref(&self) -> RecvRefFuture<'_, T> {
  160. RecvRefFuture { rx: self }
  161. }
  162. pub fn recv(&self) -> RecvFuture<'_, T> {
  163. RecvFuture { rx: self }
  164. }
  165. /// # Returns
  166. ///
  167. /// * `Poll::Pending` if no messages are available but the channel is not
  168. /// closed, or if a spurious failure happens.
  169. /// * `Poll::Ready(Some(Ref<T>))` if a message is available.
  170. /// * `Poll::Ready(None)` if the channel has been closed and all messages
  171. /// sent before it was closed have been received.
  172. ///
  173. /// When the method returns [`Poll::Pending`], the [`Waker`] in the provided
  174. /// [`Context`] is scheduled to receive a wakeup when a message is sent on any
  175. /// sender, or when the channel is closed. Note that on multiple calls to
  176. /// `poll_recv_ref`, only the [`Waker`] from the [`Context`] passed to the most
  177. /// recent call is scheduled to receive a wakeup.
  178. pub fn poll_recv_ref(&self, cx: &mut Context<'_>) -> Poll<Option<RecvRef<'_, T>>> {
  179. self.inner.poll_recv_ref(|| cx.waker().clone()).map(|some| {
  180. some.map(|slot| RecvRef {
  181. _notify: super::NotifyTx(&self.inner.tx_wait),
  182. slot,
  183. })
  184. })
  185. }
  186. /// # Returns
  187. ///
  188. /// * `Poll::Pending` if no messages are available but the channel is not
  189. /// closed, or if a spurious failure happens.
  190. /// * `Poll::Ready(Some(message))` if a message is available.
  191. /// * `Poll::Ready(None)` if the channel has been closed and all messages
  192. /// sent before it was closed have been received.
  193. ///
  194. /// When the method returns [`Poll::Pending`], the [`Waker`] in the provided
  195. /// [`Context`] is scheduled to receive a wakeup when a message is sent on any
  196. /// sender, or when the channel is closed. Note that on multiple calls to
  197. /// `poll_recv`, only the [`Waker`] from the [`Context`] passed to the most
  198. /// recent call is scheduled to receive a wakeup.
  199. pub fn poll_recv(&self, cx: &mut Context<'_>) -> Poll<Option<T>> {
  200. self.poll_recv_ref(cx)
  201. .map(|opt| opt.map(|mut r| r.with_mut(core::mem::take)))
  202. }
  203. pub fn is_closed(&self) -> bool {
  204. test_dbg!(self.inner.tx_count.load(Ordering::SeqCst)) <= 1
  205. }
  206. }
  207. impl<T> Drop for Receiver<T> {
  208. fn drop(&mut self) {
  209. self.inner.close_rx();
  210. }
  211. }
  212. // === impl RecvRefFuture ===
  213. impl<'a, T: Default> Future for RecvRefFuture<'a, T> {
  214. type Output = Option<RecvRef<'a, T>>;
  215. fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
  216. self.rx.poll_recv_ref(cx)
  217. }
  218. }
  219. // === impl Recv ===
  220. impl<'a, T: Default> Future for RecvFuture<'a, T> {
  221. type Output = Option<T>;
  222. fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
  223. self.rx.poll_recv(cx)
  224. }
  225. }
  226. #[cfg(test)]
  227. mod tests {
  228. use super::*;
  229. use crate::ThingBuf;
  230. fn _assert_sync<T: Sync>(_: T) {}
  231. fn _assert_send<T: Send>(_: T) {}
  232. #[test]
  233. fn recv_ref_future_is_send() {
  234. fn _compiles() {
  235. let (_, rx) = channel::<usize>(ThingBuf::new(10));
  236. _assert_send(rx.recv_ref());
  237. }
  238. }
  239. #[test]
  240. fn recv_ref_future_is_sync() {
  241. fn _compiles() {
  242. let (_, rx) = channel::<usize>(ThingBuf::new(10));
  243. _assert_sync(rx.recv_ref());
  244. }
  245. }
  246. #[test]
  247. fn send_ref_future_is_send() {
  248. fn _compiles() {
  249. let (tx, _) = channel::<usize>(ThingBuf::new(10));
  250. _assert_send(tx.send_ref());
  251. }
  252. }
  253. #[test]
  254. fn send_ref_future_is_sync() {
  255. fn _compiles() {
  256. let (tx, _) = channel::<usize>(ThingBuf::new(10));
  257. _assert_sync(tx.send_ref());
  258. }
  259. }
  260. }