123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302 |
- use super::*;
- use crate::{
- loom::{
- atomic::{self, Ordering},
- sync::Arc,
- },
- wait::queue,
- Ref, ThingBuf,
- };
- use core::{
- fmt,
- future::Future,
- pin::Pin,
- task::{Context, Poll, Waker},
- };
- /// Returns a new synchronous multi-producer, single consumer channel.
- pub fn channel<T>(thingbuf: ThingBuf<T>) -> (Sender<T>, Receiver<T>) {
- let inner = Arc::new(Inner::new(thingbuf));
- let tx = Sender {
- inner: inner.clone(),
- };
- let rx = Receiver { inner };
- (tx, rx)
- }
- #[derive(Debug)]
- pub struct Sender<T> {
- inner: Arc<Inner<T, Waker>>,
- }
- #[derive(Debug)]
- pub struct Receiver<T> {
- inner: Arc<Inner<T, Waker>>,
- }
- impl_send_ref! {
- pub struct SendRef<Waker>;
- }
- impl_recv_ref! {
- pub struct RecvRef<Waker>;
- }
- /// A [`Future`] that tries to receive a reference from a [`Receiver`].
- ///
- /// This type is returned by [`Receiver::recv_ref`].
- #[must_use = "futures do nothing unless you `.await` or poll them"]
- pub struct RecvRefFuture<'a, T> {
- rx: &'a Receiver<T>,
- }
- /// A [`Future`] that tries to receive a value from a [`Receiver`].
- ///
- /// This type is returned by [`Receiver::recv`].
- ///
- /// This is equivalent to the [`RecvRefFuture`] future, but the value is moved out of
- /// the [`ThingBuf`] after it is received. This means that allocations are not
- /// reused.
- #[must_use = "futures do nothing unless you `.await` or poll them"]
- pub struct RecvFuture<'a, T> {
- rx: &'a Receiver<T>,
- }
- // === impl Sender ===
- impl<T: Default> Sender<T> {
- pub fn try_send_ref(&self) -> Result<SendRef<'_, T>, TrySendError> {
- self.inner.try_send_ref().map(SendRef)
- }
- pub fn try_send(&self, val: T) -> Result<(), TrySendError<T>> {
- self.inner.try_send(val)
- }
- pub async fn send_ref(&self) -> Result<SendRef<'_, T>, Closed> {
- #[pin_project::pin_project(PinnedDrop)]
- struct SendRefFuture<'sender, T> {
- tx: &'sender Sender<T>,
- has_been_queued: bool,
- #[pin]
- waiter: queue::Waiter<Waker>,
- }
- impl<'sender, T: Default + 'sender> Future for SendRefFuture<'sender, T> {
- type Output = Result<SendRef<'sender, T>, Closed>;
- fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
- test_println!("SendRefFuture::poll({:p})", self);
- // perform one send ref loop iteration
- let this = self.as_mut().project();
- let waiter = if test_dbg!(*this.has_been_queued) {
- None
- } else {
- Some(this.waiter)
- };
- this.tx
- .inner
- .poll_send_ref(waiter, |waker| {
- // if this is called, we are definitely getting queued.
- *this.has_been_queued = true;
- // if the wait node does not already have a waker, or the task
- // has been polled with a waker that won't wake the previous
- // one, register a new waker.
- let my_waker = cx.waker();
- // do we need to re-register?
- let will_wake = waker
- .as_ref()
- .map(|waker| test_dbg!(waker.will_wake(my_waker)))
- .unwrap_or(false);
- if test_dbg!(will_wake) {
- return;
- }
- *waker = Some(my_waker.clone());
- })
- .map(|ok| {
- // avoid having to lock the list to remove a node that's
- // definitely not queued.
- *this.has_been_queued = false;
- ok.map(SendRef)
- })
- }
- }
- #[pin_project::pinned_drop]
- impl<T> PinnedDrop for SendRefFuture<'_, T> {
- fn drop(self: Pin<&mut Self>) {
- test_println!("SendRefFuture::drop({:p})", self);
- if test_dbg!(self.has_been_queued) {
- let this = self.project();
- this.waiter.remove(&this.tx.inner.tx_wait)
- }
- }
- }
- SendRefFuture {
- tx: self,
- has_been_queued: false,
- waiter: queue::Waiter::new(),
- }
- .await
- }
- pub async fn send(&self, val: T) -> Result<(), Closed<T>> {
- match self.send_ref().await {
- Err(Closed(())) => Err(Closed(val)),
- Ok(mut slot) => {
- slot.with_mut(|slot| *slot = val);
- Ok(())
- }
- }
- }
- }
- impl<T> Clone for Sender<T> {
- fn clone(&self) -> Self {
- test_dbg!(self.inner.tx_count.fetch_add(1, Ordering::Relaxed));
- Self {
- inner: self.inner.clone(),
- }
- }
- }
- impl<T> Drop for Sender<T> {
- fn drop(&mut self) {
- if test_dbg!(self.inner.tx_count.fetch_sub(1, Ordering::Release)) > 1 {
- return;
- }
- // if we are the last sender, synchronize
- test_dbg!(atomic::fence(Ordering::SeqCst));
- self.inner.thingbuf.core.close();
- self.inner.rx_wait.close_tx();
- }
- }
- // === impl Receiver ===
- impl<T: Default> Receiver<T> {
- pub fn recv_ref(&self) -> RecvRefFuture<'_, T> {
- RecvRefFuture { rx: self }
- }
- pub fn recv(&self) -> RecvFuture<'_, T> {
- RecvFuture { rx: self }
- }
- /// # Returns
- ///
- /// * `Poll::Pending` if no messages are available but the channel is not
- /// closed, or if a spurious failure happens.
- /// * `Poll::Ready(Some(Ref<T>))` if a message is available.
- /// * `Poll::Ready(None)` if the channel has been closed and all messages
- /// sent before it was closed have been received.
- ///
- /// When the method returns [`Poll::Pending`], the [`Waker`] in the provided
- /// [`Context`] is scheduled to receive a wakeup when a message is sent on any
- /// sender, or when the channel is closed. Note that on multiple calls to
- /// `poll_recv_ref`, only the [`Waker`] from the [`Context`] passed to the most
- /// recent call is scheduled to receive a wakeup.
- pub fn poll_recv_ref(&self, cx: &mut Context<'_>) -> Poll<Option<RecvRef<'_, T>>> {
- self.inner.poll_recv_ref(|| cx.waker().clone()).map(|some| {
- some.map(|slot| RecvRef {
- _notify: super::NotifyTx(&self.inner.tx_wait),
- slot,
- })
- })
- }
- /// # Returns
- ///
- /// * `Poll::Pending` if no messages are available but the channel is not
- /// closed, or if a spurious failure happens.
- /// * `Poll::Ready(Some(message))` if a message is available.
- /// * `Poll::Ready(None)` if the channel has been closed and all messages
- /// sent before it was closed have been received.
- ///
- /// When the method returns [`Poll::Pending`], the [`Waker`] in the provided
- /// [`Context`] is scheduled to receive a wakeup when a message is sent on any
- /// sender, or when the channel is closed. Note that on multiple calls to
- /// `poll_recv`, only the [`Waker`] from the [`Context`] passed to the most
- /// recent call is scheduled to receive a wakeup.
- pub fn poll_recv(&self, cx: &mut Context<'_>) -> Poll<Option<T>> {
- self.poll_recv_ref(cx)
- .map(|opt| opt.map(|mut r| r.with_mut(core::mem::take)))
- }
- pub fn is_closed(&self) -> bool {
- test_dbg!(self.inner.tx_count.load(Ordering::SeqCst)) <= 1
- }
- }
- impl<T> Drop for Receiver<T> {
- fn drop(&mut self) {
- self.inner.close_rx();
- }
- }
- // === impl RecvRefFuture ===
- impl<'a, T: Default> Future for RecvRefFuture<'a, T> {
- type Output = Option<RecvRef<'a, T>>;
- fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
- self.rx.poll_recv_ref(cx)
- }
- }
- // === impl Recv ===
- impl<'a, T: Default> Future for RecvFuture<'a, T> {
- type Output = Option<T>;
- fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
- self.rx.poll_recv(cx)
- }
- }
- #[cfg(test)]
- mod tests {
- use super::*;
- use crate::ThingBuf;
- fn _assert_sync<T: Sync>(_: T) {}
- fn _assert_send<T: Send>(_: T) {}
- #[test]
- fn recv_ref_future_is_send() {
- fn _compiles() {
- let (_, rx) = channel::<usize>(ThingBuf::new(10));
- _assert_send(rx.recv_ref());
- }
- }
- #[test]
- fn recv_ref_future_is_sync() {
- fn _compiles() {
- let (_, rx) = channel::<usize>(ThingBuf::new(10));
- _assert_sync(rx.recv_ref());
- }
- }
- #[test]
- fn send_ref_future_is_send() {
- fn _compiles() {
- let (tx, _) = channel::<usize>(ThingBuf::new(10));
- _assert_send(tx.send_ref());
- }
- }
- #[test]
- fn send_ref_future_is_sync() {
- fn _compiles() {
- let (tx, _) = channel::<usize>(ThingBuf::new(10));
- _assert_sync(tx.send_ref());
- }
- }
- }
|