|
@@ -17,13 +17,18 @@ feature! {
|
|
|
|
|
|
use crate::loom::sync::Arc;
|
|
|
|
|
|
- /// Returns a new asynchronous multi-producer, single consumer channel.
|
|
|
+ /// Returns a new asynchronous multi-producer, single consumer (MPSC)
|
|
|
+ /// channel with the provided capacity.
|
|
|
+ ///
|
|
|
+ /// This channel will use the [default recycling policy].
|
|
|
+ ///
|
|
|
+ /// [recycling policy]: crate::recycling::DefaultRecycle
|
|
|
pub fn channel<T: Default + Clone>(capacity: usize) -> (Sender<T>, Receiver<T>) {
|
|
|
with_recycle(capacity, recycling::DefaultRecycle::new())
|
|
|
}
|
|
|
|
|
|
- /// Returns a new asynchronous multi-producer, single consumer channel with
|
|
|
- /// the provided [recycling policy].
|
|
|
+ /// Returns a new asynchronous multi-producer, single consumer (MPSC)
|
|
|
+ /// channel with the provided capacity and [recycling policy].
|
|
|
///
|
|
|
/// [recycling policy]: crate::recycling::Recycle
|
|
|
pub fn with_recycle<T, R: Recycle<T>>(capacity: usize, recycle: R) -> (Sender<T, R>, Receiver<T, R>) {
|
|
@@ -40,11 +45,20 @@ feature! {
|
|
|
(tx, rx)
|
|
|
}
|
|
|
|
|
|
+
|
|
|
+ /// Asynchronously receives values from associated [`Sender`]s.
|
|
|
+ ///
|
|
|
+ /// Instances of this struct are created by the [`channel`] and
|
|
|
+ /// [`with_recycle`] functions.
|
|
|
#[derive(Debug)]
|
|
|
pub struct Receiver<T, R = recycling::DefaultRecycle> {
|
|
|
inner: Arc<Inner<T, R>>,
|
|
|
}
|
|
|
|
|
|
+ /// Asynchronously sends values to an associated [`Receiver`].
|
|
|
+ ///
|
|
|
+ /// Instances of this struct are created by the [`channel`] and
|
|
|
+ /// [`with_recycle`] functions.
|
|
|
#[derive(Debug)]
|
|
|
pub struct Sender<T, R = recycling::DefaultRecycle> {
|
|
|
inner: Arc<Inner<T, R>>,
|
|
@@ -62,19 +76,55 @@ feature! {
|
|
|
where
|
|
|
R: Recycle<T>,
|
|
|
{
|
|
|
- pub fn try_send_ref(&self) -> Result<SendRef<'_, T>, TrySendError> {
|
|
|
- self.inner
|
|
|
- .core
|
|
|
- .try_send_ref(self.inner.slots.as_ref(), &self.inner.recycle)
|
|
|
- .map(SendRef)
|
|
|
- }
|
|
|
-
|
|
|
- pub fn try_send(&self, val: T) -> Result<(), TrySendError<T>> {
|
|
|
- self.inner
|
|
|
- .core
|
|
|
- .try_send(self.inner.slots.as_ref(), val, &self.inner.recycle)
|
|
|
- }
|
|
|
-
|
|
|
+ /// Reserves a slot in the channel to mutate in place, waiting until
|
|
|
+ /// there is a free slot to write to.
|
|
|
+ ///
|
|
|
+ /// This is similar to the [`send`] method, but, rather than taking a
|
|
|
+ /// message by value to write to the channel, this method reserves a
|
|
|
+ /// writable slot in the channel, and returns a [`SendRef`] that allows
|
|
|
+ /// mutating the slot in place. If the [`Receiver`] end of the channel
|
|
|
+ /// uses the [`Receiver::recv_ref`] or [`Receiver::poll_recv_ref`]
|
|
|
+ /// methods for receiving from the channel, this allows allocations for
|
|
|
+ /// channel messages to be reused in place.
|
|
|
+ ///
|
|
|
+ /// # Errors
|
|
|
+ ///
|
|
|
+ /// If the [`Receiver`] end of the channel has been dropped, this
|
|
|
+ /// returns a [`Closed`] error.
|
|
|
+ ///
|
|
|
+ /// # Examples
|
|
|
+ ///
|
|
|
+ /// Sending formatted strings by writing them directly to channel slots,
|
|
|
+ /// in place:
|
|
|
+ /// ```
|
|
|
+ /// # mod tokio {
|
|
|
+ /// # pub fn spawn(_: impl std::future::Future) {}
|
|
|
+ /// # }
|
|
|
+ /// use thingbuf::mpsc;
|
|
|
+ /// use std::fmt::Write;
|
|
|
+ ///
|
|
|
+ /// # async fn example() {
|
|
|
+ /// let (tx, rx) = mpsc::channel::<String>(8);
|
|
|
+ ///
|
|
|
+ /// // Spawn a task that prints each message received from the channel:
|
|
|
+ /// tokio::spawn(async move {
|
|
|
+ /// while let Some(msg) = rx.recv_ref().await {
|
|
|
+ /// println!("{}", msg);
|
|
|
+ /// }
|
|
|
+ /// });
|
|
|
+ ///
|
|
|
+ /// // Until the channel closes, write formatted messages to the channel.
|
|
|
+ /// let mut count = 1;
|
|
|
+ /// while let Ok(mut value) = tx.send_ref().await {
|
|
|
+ /// // Writing to the `SendRef` will reuse the *existing* string
|
|
|
+ /// // allocation in place.
|
|
|
+ /// write!(value, "hello from message {}", count)
|
|
|
+ /// .expect("writing to a `String` should never fail");
|
|
|
+ /// count += 1;
|
|
|
+ /// }
|
|
|
+ /// # }
|
|
|
+ /// ```
|
|
|
+ /// [`send`]: Self::send
|
|
|
pub async fn send_ref(&self) -> Result<SendRef<'_, T>, Closed> {
|
|
|
SendRefFuture {
|
|
|
core: &self.inner.core,
|
|
@@ -86,15 +136,125 @@ feature! {
|
|
|
.await
|
|
|
}
|
|
|
|
|
|
+ /// Sends a message by value, waiting until there is a free slot to
|
|
|
+ /// write to.
|
|
|
+ ///
|
|
|
+ /// This method takes the message by value, and replaces any previous
|
|
|
+ /// value in the slot. This means that the channel will *not* function
|
|
|
+ /// as an object pool while sending messages with `send`. This method is
|
|
|
+ /// most appropriate when messages don't own reusable heap allocations,
|
|
|
+ /// or when the [`Receiver`] end of the channel must receive messages by
|
|
|
+ /// moving them out of the channel by value (using the
|
|
|
+ /// [`Receiver::recv`] method). When messages in the channel own
|
|
|
+ /// reusable heap allocations (such as `String`s or `Vec`s), and the
|
|
|
+ /// [`Receiver`] doesn't need to receive them by value, consider using
|
|
|
+ /// [`send_ref`] instead, to enable allocation reuse.
|
|
|
+ ///
|
|
|
+ /// # Errors
|
|
|
+ ///
|
|
|
+ /// If the [`Receiver`] end of the channel has been dropped, this
|
|
|
+ /// returns a [`Closed`] error containing the sent value.
|
|
|
+ ///
|
|
|
+ /// # Examples
|
|
|
+ ///
|
|
|
+ /// ```
|
|
|
+ /// # mod tokio {
|
|
|
+ /// # pub fn spawn(_: impl std::future::Future) {}
|
|
|
+ /// # }
|
|
|
+ /// use thingbuf::mpsc;
|
|
|
+ /// # async fn example() {
|
|
|
+ /// let (tx, rx) = mpsc::channel(8);
|
|
|
+ ///
|
|
|
+ /// // Spawn a task that prints each message received from the channel:
|
|
|
+ /// tokio::spawn(async move {
|
|
|
+ /// while let Some(msg) = rx.recv().await {
|
|
|
+ /// println!("received message {}", msg);
|
|
|
+ /// }
|
|
|
+ /// });
|
|
|
+ ///
|
|
|
+ /// // Until the channel closes, write the current iteration to the channel.
|
|
|
+ /// let mut count = 1;
|
|
|
+ /// while tx.send(count).await.is_ok() {
|
|
|
+ /// count += 1;
|
|
|
+ /// }
|
|
|
+ /// # }
|
|
|
+ /// ```
|
|
|
+ /// [`send_ref`]: Self::send_ref
|
|
|
pub async fn send(&self, val: T) -> Result<(), Closed<T>> {
|
|
|
match self.send_ref().await {
|
|
|
Err(Closed(())) => Err(Closed(val)),
|
|
|
Ok(mut slot) => {
|
|
|
- slot.with_mut(|slot| *slot = val);
|
|
|
+ *slot = val;
|
|
|
Ok(())
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ /// Attempts to reserve a slot in the channel to mutate in place,
|
|
|
+ /// without waiting for capacity.
|
|
|
+ ///
|
|
|
+ /// This method differs from [`send_ref`] by returning immediately if the
|
|
|
+ /// channel’s buffer is full or no [`Receiver`] exists. Compared with
|
|
|
+ /// [`send_ref`], this method has two failure cases instead of one (one for
|
|
|
+ /// disconnection, one for a full buffer), and this method is not
|
|
|
+ /// `async`, because it will never wait.
|
|
|
+ ///
|
|
|
+ /// Like [`send_ref`], this method returns a [`SendRef`] that may be
|
|
|
+ /// used to mutate a slot in the channel's buffer in place. Dropping the
|
|
|
+ /// [`SendRef`] completes the send operation and makes the mutated value
|
|
|
+ /// available to the [`Receiver`].
|
|
|
+ ///
|
|
|
+ /// # Errors
|
|
|
+ ///
|
|
|
+ /// If the channel capacity has been reached (i.e., the channel has `n`
|
|
|
+ /// buffered values where `n` is the argument passed to
|
|
|
+ /// [`channel`]/[`with_recycle`]), then [`TrySendError::Full`] is
|
|
|
+ /// returned. In this case, a future call to `try_send` may succeed if
|
|
|
+ /// additional capacity becomes available.
|
|
|
+ ///
|
|
|
+ /// If the receive half of the channel is closed (i.e., the [`Receiver`]
|
|
|
+ /// handle was dropped), the function returns [`TrySendError::Closed`].
|
|
|
+ /// Once the channel has closed, subsequent calls to `try_send_ref` will
|
|
|
+ /// never succeed.
|
|
|
+ ///
|
|
|
+ /// [`send_ref`]: Self::send_ref
|
|
|
+ pub fn try_send_ref(&self) -> Result<SendRef<'_, T>, TrySendError> {
|
|
|
+ self.inner
|
|
|
+ .core
|
|
|
+ .try_send_ref(self.inner.slots.as_ref(), &self.inner.recycle)
|
|
|
+ .map(SendRef)
|
|
|
+ }
|
|
|
+
|
|
|
+ /// Attempts to send a message by value immediately, without waiting for
|
|
|
+ /// capacity.
|
|
|
+ ///
|
|
|
+ /// This method differs from [`send`] by returning immediately if the
|
|
|
+ /// channel’s buffer is full or no [`Receiver`] exists. Compared with
|
|
|
+ /// [`send`], this method has two failure cases instead of one (one for
|
|
|
+ /// disconnection, one for a full buffer), and this method is not
|
|
|
+ /// `async`, because it will never wait.
|
|
|
+ ///
|
|
|
+ /// # Errors
|
|
|
+ ///
|
|
|
+ /// If the channel capacity has been reached (i.e., the channel has `n`
|
|
|
+ /// buffered values where `n` is the argument passed to
|
|
|
+ /// [`channel`]/[`with_recycle`]), then [`TrySendError::Full`] is
|
|
|
+ /// returned. In this case, a future call to `try_send` may succeed if
|
|
|
+ /// additional capacity becomes available.
|
|
|
+ ///
|
|
|
+ /// If the receive half of the channel is closed (i.e., the [`Receiver`]
|
|
|
+ /// handle was dropped), the function returns [`TrySendError::Closed`].
|
|
|
+ /// Once the channel has closed, subsequent calls to `try_send` will
|
|
|
+ /// never succeed.
|
|
|
+ ///
|
|
|
+ /// In both cases, the error includes the value passed to `try_send`.
|
|
|
+ ///
|
|
|
+ /// [`send`]: Self::send
|
|
|
+ pub fn try_send(&self, val: T) -> Result<(), TrySendError<T>> {
|
|
|
+ self.inner
|
|
|
+ .core
|
|
|
+ .try_send(self.inner.slots.as_ref(), val, &self.inner.recycle)
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
impl<T, R> Clone for Sender<T, R> {
|
|
@@ -122,6 +282,69 @@ feature! {
|
|
|
// === impl Receiver ===
|
|
|
|
|
|
impl<T, R> Receiver<T, R> {
|
|
|
+ /// Receives the next message for this receiver, **by reference**.
|
|
|
+ ///
|
|
|
+ /// This method returns `None` if the channel has been closed and there are
|
|
|
+ /// no remaining messages in the channel's buffer. This indicates that no
|
|
|
+ /// further values can ever be received from this `Receiver`. The channel is
|
|
|
+ /// closed when all [`Sender`]s have been dropped.
|
|
|
+ ///
|
|
|
+ /// If there are no messages in the channel's buffer, but the channel has
|
|
|
+ /// not yet been closed, this method will wait until a message is sent or
|
|
|
+ /// the channel is closed.
|
|
|
+ ///
|
|
|
+ /// This method returns a [`RecvRef`] that can be used to read from (or
|
|
|
+ /// mutate) the received message by reference. When the [`RecvRef`] is
|
|
|
+ /// dropped, the receive operation completes and the slot occupied by
|
|
|
+ /// the received message becomes usable for a future [`send_ref`] operation.
|
|
|
+ ///
|
|
|
+ /// If all [`Sender`]s for this channel write to the channel's slots in
|
|
|
+ /// place by using the [`send_ref`] or [`try_send_ref`] methods, this
|
|
|
+ /// method allows messages that own heap allocations to be reused in
|
|
|
+ /// place.
|
|
|
+ ///
|
|
|
+ /// # Examples
|
|
|
+ ///
|
|
|
+ /// ```
|
|
|
+ /// # mod tokio {
|
|
|
+ /// # pub fn spawn(_: impl std::future::Future) {}
|
|
|
+ /// # }
|
|
|
+ /// # async fn docs() {
|
|
|
+ /// use thingbuf::mpsc;
|
|
|
+ /// use std::fmt::Write;
|
|
|
+ ///
|
|
|
+ /// let (tx, rx) = mpsc::channel::<String>(100);
|
|
|
+ ///
|
|
|
+ /// tokio::spawn(async move {
|
|
|
+ /// let mut value = tx.send_ref().await.unwrap();
|
|
|
+ /// write!(value, "hello world!")
|
|
|
+ /// .expect("writing to a `String` should never fail");
|
|
|
+ /// });
|
|
|
+ ///
|
|
|
+ /// assert_eq!(Some("hello world!"), rx.recv_ref().await.as_deref().map(String::as_str));
|
|
|
+ /// assert_eq!(None, rx.recv().await.as_deref());
|
|
|
+ /// # }
|
|
|
+ /// ```
|
|
|
+ ///
|
|
|
+ /// Values are buffered:
|
|
|
+ ///
|
|
|
+ /// ```
|
|
|
+ /// # async fn docs() {
|
|
|
+ /// use thingbuf::mpsc;
|
|
|
+ /// use std::fmt::Write;
|
|
|
+ ///
|
|
|
+ /// let (tx, rx) = mpsc::channel::<String>(100);
|
|
|
+ ///
|
|
|
+ /// write!(tx.send_ref().await.unwrap(), "hello").unwrap();
|
|
|
+ /// write!(tx.send_ref().await.unwrap(), "world").unwrap();
|
|
|
+ ///
|
|
|
+ /// assert_eq!("hello", rx.recv_ref().await.unwrap().as_str());
|
|
|
+ /// assert_eq!("world", rx.recv_ref().await.unwrap().as_str());
|
|
|
+ /// # }
|
|
|
+ /// ```
|
|
|
+ ///
|
|
|
+ /// [`send_ref`]: Sender::send_ref
|
|
|
+ /// [`try_send_ref`]: Sender::try_send_ref
|
|
|
pub fn recv_ref(&self) -> RecvRefFuture<'_, T> {
|
|
|
RecvRefFuture {
|
|
|
core: &self.inner.core,
|
|
@@ -129,6 +352,64 @@ feature! {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ /// Receives the next message for this receiver, **by value**.
|
|
|
+ ///
|
|
|
+ /// This method returns `None` if the channel has been closed and there are
|
|
|
+ /// no remaining messages in the channel's buffer. This indicates that no
|
|
|
+ /// further values can ever be received from this `Receiver`. The channel is
|
|
|
+ /// closed when all [`Sender`]s have been dropped.
|
|
|
+ ///
|
|
|
+ /// If there are no messages in the channel's buffer, but the channel has
|
|
|
+ /// not yet been closed, this method will wait until a message is sent or
|
|
|
+ /// the channel is closed.
|
|
|
+ ///
|
|
|
+ /// When a message is received, it is moved out of the channel by value,
|
|
|
+ /// and replaced with a new slot according to the configured [recycling
|
|
|
+ /// policy]. If all [`Sender`]s for this channel write to the channel's
|
|
|
+ /// slots in place by using the [`send_ref`] or [`try_send_ref`] methods,
|
|
|
+ /// consider using the [`recv_ref`] method instead, to enable the
|
|
|
+ /// reuse of heap allocations.
|
|
|
+ ///
|
|
|
+ /// # Examples
|
|
|
+ ///
|
|
|
+ /// ```
|
|
|
+ /// # mod tokio {
|
|
|
+ /// # pub fn spawn(_: impl std::future::Future) {}
|
|
|
+ /// # }
|
|
|
+ /// # async fn docs() {
|
|
|
+ /// use thingbuf::mpsc;
|
|
|
+ ///
|
|
|
+ /// let (tx, rx) = mpsc::channel(100);
|
|
|
+ ///
|
|
|
+ /// tokio::spawn(async move {
|
|
|
+ /// tx.send(1).await.unwrap();
|
|
|
+ /// });
|
|
|
+ ///
|
|
|
+ /// assert_eq!(Some(1), rx.recv().await);
|
|
|
+ /// assert_eq!(None, rx.recv().await);
|
|
|
+ /// # }
|
|
|
+ /// ```
|
|
|
+ ///
|
|
|
+ /// Values are buffered:
|
|
|
+ ///
|
|
|
+ /// ```
|
|
|
+ /// # async fn docs() {
|
|
|
+ /// use thingbuf::mpsc;
|
|
|
+ ///
|
|
|
+ /// let (tx, rx) = mpsc::channel(100);
|
|
|
+ ///
|
|
|
+ /// tx.send(1).await.unwrap();
|
|
|
+ /// tx.send(2).await.unwrap();
|
|
|
+ ///
|
|
|
+ /// assert_eq!(Some(1), rx.recv().await);
|
|
|
+ /// assert_eq!(Some(2), rx.recv().await);
|
|
|
+ /// # }
|
|
|
+ /// ```
|
|
|
+ ///
|
|
|
+ /// [`send_ref`]: Sender::send_ref
|
|
|
+ /// [`try_send_ref`]: Sender::try_send_ref
|
|
|
+ /// [recycling policy]: crate::recycling::Recycle
|
|
|
+ /// [`recv_ref`]: Self::recv_ref
|
|
|
pub fn recv(&self) -> RecvFuture<'_, T, R>
|
|
|
where
|
|
|
R: Recycle<T>,
|
|
@@ -140,36 +421,82 @@ feature! {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ /// Attempts to receive a message *by reference* from this channel,
|
|
|
+ /// registering the current task for wakeup if the a message is not yet
|
|
|
+ /// available, and returning `None` if the channel has closed and all
|
|
|
+ /// messages have been received.
|
|
|
+ ///
|
|
|
+ /// Like [`Receiver::recv_ref`], this method returns a [`RecvRef`] that
|
|
|
+ /// can be used to read from (or mutate) the received message by
|
|
|
+ /// reference. When the [`RecvRef`] is dropped, the receive operation
|
|
|
+ /// completes and the slot occupied by the received message becomes
|
|
|
+ /// usable for a future [`send_ref`] operation.
|
|
|
+ ///
|
|
|
+ /// If all [`Sender`]s for this channel write to the channel's slots in
|
|
|
+ /// place by using the [`send_ref`] or [`try_send_ref`] methods, this
|
|
|
+ /// method allows messages that own heap allocations to be reused in
|
|
|
+ /// place.
|
|
|
+ ///
|
|
|
+ /// To wait asynchronously until a message becomes available, use the
|
|
|
+ /// [`recv_ref`] method instead.
|
|
|
+ ///
|
|
|
/// # Returns
|
|
|
///
|
|
|
/// * `Poll::Pending` if no messages are available but the channel is not
|
|
|
/// closed, or if a spurious failure happens.
|
|
|
- /// * `Poll::Ready(Some(Ref<T>))` if a message is available.
|
|
|
- /// * `Poll::Ready(None)` if the channel has been closed and all messages
|
|
|
- /// sent before it was closed have been received.
|
|
|
+ /// * `Poll::Ready(Some(RecvRef<T>))` if a message is available.
|
|
|
+ /// * `Poll::Ready(None)` if the channel has been closed (i.e., all
|
|
|
+ /// [`Sender`]s have been dropped), and all messages sent before it
|
|
|
+ /// was closed have been received.
|
|
|
///
|
|
|
/// When the method returns [`Poll::Pending`], the [`Waker`] in the provided
|
|
|
/// [`Context`] is scheduled to receive a wakeup when a message is sent on any
|
|
|
/// sender, or when the channel is closed. Note that on multiple calls to
|
|
|
/// `poll_recv_ref`, only the [`Waker`] from the [`Context`] passed to the most
|
|
|
/// recent call is scheduled to receive a wakeup.
|
|
|
+ ///
|
|
|
+ /// [`send_ref`]: Sender::send_ref
|
|
|
+ /// [`try_send_ref`]: Sender::try_send_ref
|
|
|
+ /// [`recv_ref`]: Self::recv_ref
|
|
|
pub fn poll_recv_ref(&self, cx: &mut Context<'_>) -> Poll<Option<RecvRef<'_, T>>> {
|
|
|
poll_recv_ref(&self.inner.core, &self.inner.slots, cx)
|
|
|
}
|
|
|
|
|
|
+ /// Attempts to receive a message *by value* from this channel,
|
|
|
+ /// registering the current task for wakeup if the value is not yet
|
|
|
+ /// available, and returning `None` if the channel has closed and all
|
|
|
+ /// messages have been received.
|
|
|
+ ///
|
|
|
+ /// When a message is received, it is moved out of the channel by value,
|
|
|
+ /// and replaced with a new slot according to the configured [recycling
|
|
|
+ /// policy]. If all [`Sender`]s for this channel write to the channel's
|
|
|
+ /// slots in place by using the [`send_ref`] or [`try_send_ref`] methods,
|
|
|
+ /// consider using the [`poll_recv_ref`] method instead, to enable the
|
|
|
+ /// reuse of heap allocations.
|
|
|
+ ///
|
|
|
+ /// To wait asynchronously until a message becomes available, use the
|
|
|
+ /// [`recv`] method instead.
|
|
|
+ ///
|
|
|
/// # Returns
|
|
|
///
|
|
|
/// * `Poll::Pending` if no messages are available but the channel is not
|
|
|
/// closed, or if a spurious failure happens.
|
|
|
/// * `Poll::Ready(Some(message))` if a message is available.
|
|
|
- /// * `Poll::Ready(None)` if the channel has been closed and all messages
|
|
|
- /// sent before it was closed have been received.
|
|
|
+ /// * `Poll::Ready(None)` if the channel has been closed (i.e., all
|
|
|
+ /// [`Sender`]s have been dropped) and all messages sent before it
|
|
|
+ /// was closed have been received.
|
|
|
///
|
|
|
/// When the method returns [`Poll::Pending`], the [`Waker`] in the provided
|
|
|
/// [`Context`] is scheduled to receive a wakeup when a message is sent on any
|
|
|
/// sender, or when the channel is closed. Note that on multiple calls to
|
|
|
/// `poll_recv`, only the [`Waker`] from the [`Context`] passed to the most
|
|
|
/// recent call is scheduled to receive a wakeup.
|
|
|
+ ///
|
|
|
+ /// [`send_ref`]: Sender::send_ref
|
|
|
+ /// [`try_send_ref`]: Sender::try_send_ref
|
|
|
+ /// [recycling policy]: crate::recycling::Recycle
|
|
|
+ /// [`poll_recv_ref`]: Self::poll_recv_ref
|
|
|
+ /// [`recv`]: Self::recv
|
|
|
pub fn poll_recv(&self, cx: &mut Context<'_>) -> Poll<Option<T>>
|
|
|
where
|
|
|
R: Recycle<T>,
|
|
@@ -178,6 +505,11 @@ feature! {
|
|
|
.map(|opt| opt.map(|mut r| recycling::take(&mut *r, &self.inner.recycle)))
|
|
|
}
|
|
|
|
|
|
+ /// Returns `true` if the channel has closed (all corresponding
|
|
|
+ /// [`Sender`]s have been dropped).
|
|
|
+ ///
|
|
|
+ /// If this method returns `true`, no new messages will become available
|
|
|
+ /// on this channel. Previously sent messages may still be available.
|
|
|
pub fn is_closed(&self) -> bool {
|
|
|
test_dbg!(self.inner.core.tx_count.load(Ordering::SeqCst)) <= 1
|
|
|
}
|
|
@@ -249,12 +581,20 @@ feature! {
|
|
|
is_split: AtomicBool,
|
|
|
}
|
|
|
|
|
|
+ /// Asynchronously sends values to an associated [`StaticReceiver`].
|
|
|
+ ///
|
|
|
+ /// Instances of this struct are created by the [`StaticChannel::split`] and
|
|
|
+ /// [``StaticChannel::try_split`] functions.
|
|
|
pub struct StaticSender<T: 'static, R: 'static = recycling::DefaultRecycle> {
|
|
|
core: &'static ChannelCore<Waker>,
|
|
|
recycle: &'static R,
|
|
|
slots: &'static [Slot<T>],
|
|
|
}
|
|
|
|
|
|
+ /// Asynchronously receives values from associated [`StaticSender`]s.
|
|
|
+ ///
|
|
|
+ /// Instances of this struct are created by the [`StaticChannel::split`] and
|
|
|
+ /// [``StaticChannel::try_split`] functions.
|
|
|
pub struct StaticReceiver<T: 'static, R: 'static = recycling::DefaultRecycle> {
|
|
|
core: &'static ChannelCore<Waker>,
|
|
|
recycle: &'static R,
|
|
@@ -351,16 +691,56 @@ feature! {
|
|
|
where
|
|
|
R: Recycle<T>,
|
|
|
{
|
|
|
- pub fn try_send_ref(&self) -> Result<SendRef<'_, T>, TrySendError> {
|
|
|
- self.core
|
|
|
- .try_send_ref(self.slots, self.recycle)
|
|
|
- .map(SendRef)
|
|
|
- }
|
|
|
-
|
|
|
- pub fn try_send(&self, val: T) -> Result<(), TrySendError<T>> {
|
|
|
- self.core.try_send(self.slots, val, self.recycle)
|
|
|
- }
|
|
|
-
|
|
|
+ /// Reserves a slot in the channel to mutate in place, waiting until
|
|
|
+ /// there is a free slot to write to.
|
|
|
+ ///
|
|
|
+ /// This is similar to the [`send`] method, but, rather than taking a
|
|
|
+ /// message by value to write to the channel, this method reserves a
|
|
|
+ /// writable slot in the channel, and returns a [`SendRef`] that allows
|
|
|
+ /// mutating the slot in place. If the [`StaticReceiver`] end of the
|
|
|
+ /// channel uses the [`StaticReceiver::recv_ref`] or
|
|
|
+ /// [`StaticReceiver::poll_recv_ref`] methods for receiving from the
|
|
|
+ /// channel, this allows allocations for channel messages to be reused in place.
|
|
|
+ ///
|
|
|
+ /// # Errors
|
|
|
+ ///
|
|
|
+ /// If the [`StaticReceiver`] end of the channel has been dropped, this
|
|
|
+ /// returns a [`Closed`] error.
|
|
|
+ ///
|
|
|
+ /// # Examples
|
|
|
+ ///
|
|
|
+ /// Sending formatted strings by writing them directly to channel slots,
|
|
|
+ /// in place:
|
|
|
+ /// ```
|
|
|
+ /// # mod tokio {
|
|
|
+ /// # pub fn spawn(_: impl std::future::Future) {}
|
|
|
+ /// # }
|
|
|
+ /// use thingbuf::mpsc;
|
|
|
+ /// use std::fmt::Write;
|
|
|
+ ///
|
|
|
+ /// # async fn example() {
|
|
|
+ /// static CHANNEL: mpsc::StaticChannel<String, 8> = mpsc::StaticChannel::new();
|
|
|
+ /// let (tx, rx) = CHANNEL.split();
|
|
|
+ ///
|
|
|
+ /// // Spawn a task that prints each message received from the channel:
|
|
|
+ /// tokio::spawn(async move {
|
|
|
+ /// while let Some(msg) = rx.recv_ref().await {
|
|
|
+ /// println!("{}", msg);
|
|
|
+ /// }
|
|
|
+ /// });
|
|
|
+ ///
|
|
|
+ /// // Until the channel closes, write formatted messages to the channel.
|
|
|
+ /// let mut count = 1;
|
|
|
+ /// while let Ok(mut value) = tx.send_ref().await {
|
|
|
+ /// // Writing to the `SendRef` will reuse the *existing* string
|
|
|
+ /// // allocation in place.
|
|
|
+ /// write!(value, "hello from message {}", count)
|
|
|
+ /// .expect("writing to a `String` should never fail");
|
|
|
+ /// count += 1;
|
|
|
+ /// }
|
|
|
+ /// # }
|
|
|
+ /// ```
|
|
|
+ /// [`send`]: Self::send
|
|
|
pub async fn send_ref(&self) -> Result<SendRef<'_, T>, Closed> {
|
|
|
SendRefFuture {
|
|
|
core: self.core,
|
|
@@ -372,15 +752,124 @@ feature! {
|
|
|
.await
|
|
|
}
|
|
|
|
|
|
+ /// Sends a message by value, waiting until there is a free slot to
|
|
|
+ /// write to.
|
|
|
+ ///
|
|
|
+ /// This method takes the message by value, and replaces any previous
|
|
|
+ /// value in the slot. This means that the channel will *not* function
|
|
|
+ /// as an object pool while sending messages with `send`. This method is
|
|
|
+ /// most appropriate when messages don't own reusable heap allocations,
|
|
|
+ /// or when the [`StaticReceiver`] end of the channel must receive
|
|
|
+ /// messages by moving them out of the channel by value (using the
|
|
|
+ /// [`StaticReceiver::recv`] method). When messages in the channel own
|
|
|
+ /// reusable heap allocations (such as `String`s or `Vec`s), and the
|
|
|
+ /// [`StaticReceiver`] doesn't need to receive them by value, consider
|
|
|
+ /// using [`send_ref`] instead, to enable allocation reuse.
|
|
|
+ ///
|
|
|
+ /// # Errors
|
|
|
+ ///
|
|
|
+ /// If the [`StaticReceiver`] end of the channel has been dropped, this
|
|
|
+ /// returns a [`Closed`] error containing the sent value.
|
|
|
+ ///
|
|
|
+ /// # Examples
|
|
|
+ ///
|
|
|
+ /// ```
|
|
|
+ /// # mod tokio {
|
|
|
+ /// # pub fn spawn(_: impl std::future::Future) {}
|
|
|
+ /// # }
|
|
|
+ /// use thingbuf::mpsc;
|
|
|
+ /// # async fn example() {
|
|
|
+ ///
|
|
|
+ /// static CHANNEL: mpsc::StaticChannel<i32, 8> = mpsc::StaticChannel::new();
|
|
|
+ /// let (tx, rx) = CHANNEL.split();
|
|
|
+ ///
|
|
|
+ /// // Spawn a task that prints each message received from the channel:
|
|
|
+ /// tokio::spawn(async move {
|
|
|
+ /// while let Some(msg) = rx.recv().await {
|
|
|
+ /// println!("received message {}", msg);
|
|
|
+ /// }
|
|
|
+ /// });
|
|
|
+ ///
|
|
|
+ /// // Until the channel closes, write the current iteration to the channel.
|
|
|
+ /// let mut count = 1;
|
|
|
+ /// while tx.send(count).await.is_ok() {
|
|
|
+ /// count += 1;
|
|
|
+ /// }
|
|
|
+ /// # }
|
|
|
+ /// ```
|
|
|
+ /// [`send_ref`]: Self::send_ref
|
|
|
pub async fn send(&self, val: T) -> Result<(), Closed<T>> {
|
|
|
match self.send_ref().await {
|
|
|
Err(Closed(())) => Err(Closed(val)),
|
|
|
Ok(mut slot) => {
|
|
|
- slot.with_mut(|slot| *slot = val);
|
|
|
+ *slot = val;
|
|
|
Ok(())
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ /// Attempts to reserve a slot in the channel to mutate in place,
|
|
|
+ /// without waiting for capacity.
|
|
|
+ ///
|
|
|
+ /// This method differs from [`send_ref`] by returning immediately if the
|
|
|
+ /// channel’s buffer is full or no [`Receiver`] exists. Compared with
|
|
|
+ /// [`send_ref`], this method has two failure cases instead of one (one for
|
|
|
+ /// disconnection, one for a full buffer), and this method is not
|
|
|
+ /// `async`, because it will never wait.
|
|
|
+ ///
|
|
|
+ /// Like [`send_ref`], this method returns a [`SendRef`] that may be
|
|
|
+ /// used to mutate a slot in the channel's buffer in place. Dropping the
|
|
|
+ /// [`SendRef`] completes the send operation and makes the mutated value
|
|
|
+ /// available to the [`Receiver`].
|
|
|
+ ///
|
|
|
+ /// # Errors
|
|
|
+ ///
|
|
|
+ /// If the channel capacity has been reached (i.e., the channel has `n`
|
|
|
+ /// buffered values where `n` is the `usize` const generic parameter of
|
|
|
+ /// the [`StaticChannel`]), then [`TrySendError::Full`] is returned. In
|
|
|
+ /// this case, a future call to `try_send` may succeed if additional
|
|
|
+ /// capacity becomes available.
|
|
|
+ ///
|
|
|
+ /// If the receive half of the channel is closed (i.e., the
|
|
|
+ /// [`StaticReceiver`] handle was dropped), the function returns
|
|
|
+ /// [`TrySendError::Closed`]. Once the channel has closed, subsequent
|
|
|
+ /// calls to `try_send_ref` will never succeed.
|
|
|
+ ///
|
|
|
+ /// [`send_ref`]: Self::send_ref
|
|
|
+ pub fn try_send_ref(&self) -> Result<SendRef<'_, T>, TrySendError> {
|
|
|
+ self.core
|
|
|
+ .try_send_ref(self.slots, self.recycle)
|
|
|
+ .map(SendRef)
|
|
|
+ }
|
|
|
+
|
|
|
+ /// Attempts to send a message by value immediately, without waiting for
|
|
|
+ /// capacity.
|
|
|
+ ///
|
|
|
+ /// This method differs from [`send`] by returning immediately if the
|
|
|
+ /// channel’s buffer is full or no [`StaticReceiver`] exists. Compared with
|
|
|
+ /// [`send`], this method has two failure cases instead of one (one for
|
|
|
+ /// disconnection, one for a full buffer), and this method is not
|
|
|
+ /// `async`, because it will never wait.
|
|
|
+ ///
|
|
|
+ /// # Errors
|
|
|
+ ///
|
|
|
+ /// If the channel capacity has been reached (i.e., the channel has `n`
|
|
|
+ /// buffered values where `n` is the `usize` const generic parameter of
|
|
|
+ /// the [`StaticChannel`]), then [`TrySendError::Full`] is returned. In
|
|
|
+ /// this case, a future call to `try_send` may succeed if additional
|
|
|
+ /// capacity becomes available.
|
|
|
+ ///
|
|
|
+ /// If the receive half of the channel is closed (i.e., the
|
|
|
+ /// [`StaticReceiver`] handle was dropped), the function returns
|
|
|
+ /// [`TrySendError::Closed`]. Once the channel has closed, subsequent
|
|
|
+ /// calls to `try_send` will never succeed.
|
|
|
+ ///
|
|
|
+ /// In both cases, the error includes the value passed to `try_send`.
|
|
|
+ ///
|
|
|
+ /// [`send`]: Self::send
|
|
|
+ pub fn try_send(&self, val: T) -> Result<(), TrySendError<T>> {
|
|
|
+ self.core.try_send(self.slots, val, self.recycle)
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
impl<T> Clone for StaticSender<T> {
|
|
@@ -420,6 +909,72 @@ feature! {
|
|
|
// === impl StaticReceiver ===
|
|
|
|
|
|
impl<T, R> StaticReceiver<T, R> {
|
|
|
+ /// Receives the next message for this receiver, **by reference**.
|
|
|
+ ///
|
|
|
+ /// This method returns `None` if the channel has been closed and there are
|
|
|
+ /// no remaining messages in the channel's buffer. This indicates that no
|
|
|
+ /// further values can ever be received from this `StaticReceiver`. The
|
|
|
+ /// channel is closed when all [`StaticSender`]s have been dropped.
|
|
|
+ ///
|
|
|
+ /// If there are no messages in the channel's buffer, but the channel has
|
|
|
+ /// not yet been closed, this method will wait until a message is sent or
|
|
|
+ /// the channel is closed.
|
|
|
+ ///
|
|
|
+ /// This method returns a [`RecvRef`] that can be used to read from (or
|
|
|
+ /// mutate) the received message by reference. When the [`RecvRef`] is
|
|
|
+ /// dropped, the receive operation completes and the slot occupied by
|
|
|
+ /// the received message becomes usable for a future [`send_ref`] operation.
|
|
|
+ ///
|
|
|
+ /// If all [`StaticSender`]s for this channel write to the channel's slots in
|
|
|
+ /// place by using the [`send_ref`] or [`try_send_ref`] methods, this
|
|
|
+ /// method allows messages that own heap allocations to be reused in
|
|
|
+ /// place.
|
|
|
+ ///
|
|
|
+ /// # Examples
|
|
|
+ ///
|
|
|
+ /// ```
|
|
|
+ /// # mod tokio {
|
|
|
+ /// # pub fn spawn(_: impl std::future::Future) {}
|
|
|
+ /// # }
|
|
|
+ /// # async fn docs() {
|
|
|
+ /// use thingbuf::mpsc::StaticChannel;
|
|
|
+ /// use std::fmt::Write;
|
|
|
+ ///
|
|
|
+ /// static CHANNEL: StaticChannel<String, 100> = StaticChannel::new();
|
|
|
+ /// let (tx, rx) = CHANNEL.split();
|
|
|
+ ///
|
|
|
+ /// tokio::spawn(async move {
|
|
|
+ /// let mut value = tx.send_ref().await.unwrap();
|
|
|
+ /// write!(value, "hello world!")
|
|
|
+ /// .expect("writing to a `String` should never fail");
|
|
|
+ /// });
|
|
|
+ ///
|
|
|
+ /// assert_eq!(Some("hello world!"), rx.recv_ref().await.as_deref().map(String::as_str));
|
|
|
+ /// assert_eq!(None, rx.recv().await.as_deref());
|
|
|
+ /// # }
|
|
|
+ /// ```
|
|
|
+ ///
|
|
|
+ /// Values are buffered:
|
|
|
+ ///
|
|
|
+ /// ```
|
|
|
+ /// # async fn docs() {
|
|
|
+ /// use thingbuf::mpsc::StaticChannel;
|
|
|
+ /// use std::fmt::Write;
|
|
|
+ ///
|
|
|
+ /// static CHANNEL: StaticChannel<String, 100> = StaticChannel::new();
|
|
|
+ ///
|
|
|
+ /// let (tx, rx) = CHANNEL.split();
|
|
|
+ ///
|
|
|
+ /// write!(tx.send_ref().await.unwrap(), "hello").unwrap();
|
|
|
+ /// write!(tx.send_ref().await.unwrap(), "world").unwrap();
|
|
|
+ ///
|
|
|
+ /// assert_eq!("hello", rx.recv_ref().await.unwrap().as_str());
|
|
|
+ /// assert_eq!("world", rx.recv_ref().await.unwrap().as_str());
|
|
|
+ /// # }
|
|
|
+ /// ```
|
|
|
+ ///
|
|
|
+ /// [`send_ref`]: StaticSender::send_ref
|
|
|
+ /// [`try_send_ref`]: StaticSender::try_send_ref
|
|
|
pub fn recv_ref(&self) -> RecvRefFuture<'_, T> {
|
|
|
RecvRefFuture {
|
|
|
core: self.core,
|
|
@@ -427,6 +982,66 @@ feature! {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ /// Receives the next message for this receiver, **by value**.
|
|
|
+ ///
|
|
|
+ /// This method returns `None` if the channel has been closed and there are
|
|
|
+ /// no remaining messages in the channel's buffer. This indicates that no
|
|
|
+ /// further values can ever be received from this `StaticReceiver`. The
|
|
|
+ /// channel is closed when all [`StaticSender`]s have been dropped.
|
|
|
+ ///
|
|
|
+ /// If there are no messages in the channel's buffer, but the channel has
|
|
|
+ /// not yet been closed, this method will wait until a message is sent or
|
|
|
+ /// the channel is closed.
|
|
|
+ ///
|
|
|
+ /// When a message is received, it is moved out of the channel by value,
|
|
|
+ /// and replaced with a new slot according to the configured [recycling
|
|
|
+ /// policy]. If all [`StaticSender`]s for this channel write to the channel's
|
|
|
+ /// slots in place by using the [`send_ref`] or [`try_send_ref`] methods,
|
|
|
+ /// consider using the [`recv_ref`] method instead, to enable the
|
|
|
+ /// reuse of heap allocations.
|
|
|
+ ///
|
|
|
+ /// # Examples
|
|
|
+ ///
|
|
|
+ /// ```
|
|
|
+ /// # mod tokio {
|
|
|
+ /// # pub fn spawn(_: impl std::future::Future) {}
|
|
|
+ /// # }
|
|
|
+ /// # async fn docs() {
|
|
|
+ /// use thingbuf::mpsc::StaticChannel;
|
|
|
+ ///
|
|
|
+ /// static CHANNEL: StaticChannel<i32, 100> = StaticChannel::new();
|
|
|
+ /// let (tx, rx) = CHANNEL.split();
|
|
|
+ ///
|
|
|
+ /// tokio::spawn(async move {
|
|
|
+ /// tx.send(1).await.unwrap();
|
|
|
+ /// });
|
|
|
+ ///
|
|
|
+ /// assert_eq!(Some(1), rx.recv().await);
|
|
|
+ /// assert_eq!(None, rx.recv().await);
|
|
|
+ /// # }
|
|
|
+ /// ```
|
|
|
+ ///
|
|
|
+ /// Values are buffered:
|
|
|
+ ///
|
|
|
+ /// ```
|
|
|
+ /// # async fn docs() {
|
|
|
+ /// use thingbuf::mpsc::StaticChannel;
|
|
|
+ ///
|
|
|
+ /// static CHANNEL: StaticChannel<i32, 100> = StaticChannel::new();
|
|
|
+ /// let (tx, rx) = CHANNEL.split();
|
|
|
+ ///
|
|
|
+ /// tx.send(1).await.unwrap();
|
|
|
+ /// tx.send(2).await.unwrap();
|
|
|
+ ///
|
|
|
+ /// assert_eq!(Some(1), rx.recv().await);
|
|
|
+ /// assert_eq!(Some(2), rx.recv().await);
|
|
|
+ /// # }
|
|
|
+ /// ```
|
|
|
+ ///
|
|
|
+ /// [`send_ref`]: StaticSender::send_ref
|
|
|
+ /// [`try_send_ref`]: StaticSender::try_send_ref
|
|
|
+ /// [recycling policy]: crate::recycling::Recycle
|
|
|
+ /// [`recv_ref`]: Self::recv_ref
|
|
|
pub fn recv(&self) -> RecvFuture<'_, T, R>
|
|
|
where
|
|
|
R: Recycle<T>,
|
|
@@ -438,36 +1053,82 @@ feature! {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ /// Attempts to receive a message *by reference* from this channel,
|
|
|
+ /// registering the current task for wakeup if the a message is not yet
|
|
|
+ /// available, and returning `None` if the channel has closed and all
|
|
|
+ /// messages have been received.
|
|
|
+ ///
|
|
|
+ /// Like [`StaticReceiver::recv_ref`], this method returns a [`RecvRef`]
|
|
|
+ /// that can be used to read from (or mutate) the received message by
|
|
|
+ /// reference. When the [`RecvRef`] is dropped, the receive operation
|
|
|
+ /// completes and the slot occupied by the received message becomes
|
|
|
+ /// usable for a future [`send_ref`] operation.
|
|
|
+ ///
|
|
|
+ /// If all [`StaticSender`]s for this channel write to the channel's slots in
|
|
|
+ /// place by using the [`send_ref`] or [`try_send_ref`] methods, this
|
|
|
+ /// method allows messages that own heap allocations to be reused in
|
|
|
+ /// place.
|
|
|
+ ///
|
|
|
+ /// To wait asynchronously until a message becomes available, use the
|
|
|
+ /// [`recv_ref`] method instead.
|
|
|
+ ///
|
|
|
/// # Returns
|
|
|
///
|
|
|
/// * `Poll::Pending` if no messages are available but the channel is not
|
|
|
/// closed, or if a spurious failure happens.
|
|
|
- /// * `Poll::Ready(Some(Ref<T>))` if a message is available.
|
|
|
- /// * `Poll::Ready(None)` if the channel has been closed and all messages
|
|
|
- /// sent before it was closed have been received.
|
|
|
+ /// * `Poll::Ready(Some(RecvRef<T>))` if a message is available.
|
|
|
+ /// * `Poll::Ready(None)` if the channel has been closed (i.e., all
|
|
|
+ /// [`StaticSender`]s have been dropped), and all messages sent before it
|
|
|
+ /// was closed have been received.
|
|
|
///
|
|
|
/// When the method returns [`Poll::Pending`], the [`Waker`] in the provided
|
|
|
/// [`Context`] is scheduled to receive a wakeup when a message is sent on any
|
|
|
/// sender, or when the channel is closed. Note that on multiple calls to
|
|
|
/// `poll_recv_ref`, only the [`Waker`] from the [`Context`] passed to the most
|
|
|
/// recent call is scheduled to receive a wakeup.
|
|
|
+ ///
|
|
|
+ /// [`send_ref`]: StaticSender::send_ref
|
|
|
+ /// [`try_send_ref`]: StaticSender::try_send_ref
|
|
|
+ /// [`recv_ref`]: Self::recv_ref
|
|
|
pub fn poll_recv_ref(&self, cx: &mut Context<'_>) -> Poll<Option<RecvRef<'_, T>>> {
|
|
|
poll_recv_ref(self.core, self.slots, cx)
|
|
|
}
|
|
|
|
|
|
+ /// Attempts to receive a message *by value* from this channel,
|
|
|
+ /// registering the current task for wakeup if the value is not yet
|
|
|
+ /// available, and returning `None` if the channel has closed and all
|
|
|
+ /// messages have been received.
|
|
|
+ ///
|
|
|
+ /// When a message is received, it is moved out of the channel by value,
|
|
|
+ /// and replaced with a new slot according to the configured [recycling
|
|
|
+ /// policy]. If all [`StaticSender`]s for this channel write to the channel's
|
|
|
+ /// slots in place by using the [`send_ref`] or [`try_send_ref`] methods,
|
|
|
+ /// consider using the [`poll_recv_ref`] method instead, to enable the
|
|
|
+ /// reuse of heap allocations.
|
|
|
+ ///
|
|
|
+ /// To wait asynchronously until a message becomes available, use the
|
|
|
+ /// [`recv`] method instead.
|
|
|
+ ///
|
|
|
/// # Returns
|
|
|
///
|
|
|
/// * `Poll::Pending` if no messages are available but the channel is not
|
|
|
/// closed, or if a spurious failure happens.
|
|
|
/// * `Poll::Ready(Some(message))` if a message is available.
|
|
|
- /// * `Poll::Ready(None)` if the channel has been closed and all messages
|
|
|
- /// sent before it was closed have been received.
|
|
|
+ /// * `Poll::Ready(None)` if the channel has been closed (i.e., all
|
|
|
+ /// [`StaticSender`]s have been dropped) and all messages sent before it
|
|
|
+ /// was closed have been received.
|
|
|
///
|
|
|
/// When the method returns [`Poll::Pending`], the [`Waker`] in the provided
|
|
|
/// [`Context`] is scheduled to receive a wakeup when a message is sent on any
|
|
|
/// sender, or when the channel is closed. Note that on multiple calls to
|
|
|
/// `poll_recv`, only the [`Waker`] from the [`Context`] passed to the most
|
|
|
/// recent call is scheduled to receive a wakeup.
|
|
|
+ ///
|
|
|
+ /// [`send_ref`]: StaticSender::send_ref
|
|
|
+ /// [`try_send_ref`]: StaticSender::try_send_ref
|
|
|
+ /// [recycling policy]: crate::recycling::Recycle
|
|
|
+ /// [`poll_recv_ref`]: Self::poll_recv_ref
|
|
|
+ /// [`recv`]: Self::recv
|
|
|
pub fn poll_recv(&self, cx: &mut Context<'_>) -> Poll<Option<T>>
|
|
|
where
|
|
|
R: Recycle<T>,
|
|
@@ -476,6 +1137,11 @@ feature! {
|
|
|
.map(|opt| opt.map(|mut r| recycling::take(&mut *r, self.recycle)))
|
|
|
}
|
|
|
|
|
|
+ /// Returns `true` if the channel has closed (all corresponding
|
|
|
+ /// [`StaticSender`]s have been dropped).
|
|
|
+ ///
|
|
|
+ /// If this method returns `true`, no new messages will become available
|
|
|
+ /// on this channel. Previously sent messages may still be available.
|
|
|
pub fn is_closed(&self) -> bool {
|
|
|
test_dbg!(self.core.tx_count.load(Ordering::SeqCst)) <= 1
|
|
|
}
|
|
@@ -499,10 +1165,38 @@ feature! {
|
|
|
}
|
|
|
|
|
|
impl_send_ref! {
|
|
|
+ /// A reference to a message being sent to an asynchronous channel.
|
|
|
+ ///
|
|
|
+ /// A `SendRef` represents the exclusive permission to mutate a given
|
|
|
+ /// element in a channel. A `SendRef<T>` [implements `DerefMut<T>`] to allow
|
|
|
+ /// writing to that element. This is analogous to the [`Ref`] type, except
|
|
|
+ /// that it completes a `send_ref` or `try_send_ref` operation when it is
|
|
|
+ /// dropped.
|
|
|
+ ///
|
|
|
+ /// This type is returned by the [`Sender::send_ref`] and
|
|
|
+ /// [`Sender::try_send_ref`] (or [`StaticSender::send_ref`] and
|
|
|
+ /// [`StaticSender::try_send_ref`]) methods.
|
|
|
+ ///
|
|
|
+ /// [implements `DerefMut<T>`]: #impl-DerefMut
|
|
|
+ /// [`Ref`]: crate::Ref
|
|
|
pub struct SendRef<Waker>;
|
|
|
}
|
|
|
|
|
|
impl_recv_ref! {
|
|
|
+ /// A reference to a message being received from an asynchronous channel.
|
|
|
+ ///
|
|
|
+ /// A `RecvRef` represents the exclusive permission to mutate a given
|
|
|
+ /// element in a channel. A `RecvRef<T>` [implements `DerefMut<T>`] to allow
|
|
|
+ /// writing to that element. This is analogous to the [`Ref`] type, except
|
|
|
+ /// that it completes a `recv_ref` or `poll_recv_ref` operation when it is
|
|
|
+ /// dropped.
|
|
|
+ ///
|
|
|
+ /// This type is returned by the [`Receiver::recv_ref`] and
|
|
|
+ /// [`Receiver::poll_recv_ref`] (or [`StaticReceiver::recv_ref`] and
|
|
|
+ /// [`StaticReceiver::poll_recv_ref`]) methods.
|
|
|
+ ///
|
|
|
+ /// [implements `DerefMut<T>`]: #impl-DerefMut
|
|
|
+ /// [`Ref`]: crate::Ref
|
|
|
pub struct RecvRef<Waker>;
|
|
|
}
|
|
|
|