123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071 |
- //! A synchronous multi-producer, single-consumer channel.
- //!
- //! This provides an equivalent API to the [`mpsc`](crate::mpsc) module, but the
- //! [`Receiver`] types in this module wait by blocking the current thread,
- //! rather than asynchronously yielding.
- use super::*;
- use crate::{
- loom::{
- atomic::{self, Ordering},
- sync::Arc,
- thread::{self, Thread},
- },
- recycling::{self, Recycle},
- util::Backoff,
- wait::queue,
- Ref,
- };
- use core::{fmt, pin::Pin};
- use errors::*;
- /// Returns a new synchronous multi-producer, single consumer (MPSC)
- /// channel with the provided capacity.
- ///
- /// This channel will use the [default recycling policy].
- ///
- /// [recycling policy]: crate::recycling::DefaultRecycle
- pub fn channel<T: Default + Clone>(capacity: usize) -> (Sender<T>, Receiver<T>) {
- with_recycle(capacity, recycling::DefaultRecycle::new())
- }
- /// Returns a new synchronous multi-producer, single consumer channel with
- /// the provided [recycling policy].
- ///
- /// [recycling policy]: crate::recycling::Recycle
- pub fn with_recycle<T, R: Recycle<T>>(
- capacity: usize,
- recycle: R,
- ) -> (Sender<T, R>, Receiver<T, R>) {
- assert!(capacity > 0);
- let inner = Arc::new(Inner {
- core: ChannelCore::new(capacity),
- slots: Slot::make_boxed_array(capacity),
- recycle,
- });
- let tx = Sender {
- inner: inner.clone(),
- };
- let rx = Receiver { inner };
- (tx, rx)
- }
- /// Synchronously receives values from associated [`Sender`]s.
- ///
- /// Instances of this struct are created by the [`channel`] and
- /// [`with_recycle`] functions.
- #[derive(Debug)]
- pub struct Sender<T, R = recycling::DefaultRecycle> {
- inner: Arc<Inner<T, R>>,
- }
- /// Synchronously sends values to an associated [`Receiver`].
- ///
- /// Instances of this struct are created by the [`channel`] and
- /// [`with_recycle`] functions.
- #[derive(Debug)]
- pub struct Receiver<T, R = recycling::DefaultRecycle> {
- inner: Arc<Inner<T, R>>,
- }
- struct Inner<T, R> {
- core: super::ChannelCore<Thread>,
- slots: Box<[Slot<T>]>,
- recycle: R,
- }
- #[cfg(not(all(loom, test)))]
- feature! {
- #![feature = "static"]
- use crate::loom::atomic::AtomicBool;
- /// A statically-allocated, blocking bounded MPSC channel.
- ///
- /// A statically-allocated channel allows using a MPSC channel without
- /// requiring _any_ heap allocations. The [asynchronous variant][async] may be
- /// used in `#![no_std]` environments without requiring `liballoc`. This is a
- /// synchronous version which requires the Rust standard library, because it
- /// blocks the current thread in order to wait for send capacity. However, in
- /// some cases, it may offer _very slightly_ better performance than the
- /// non-static blocking channel due to requiring fewer heap pointer
- /// dereferences.
- ///
- /// In order to use a statically-allocated channel, a `StaticChannel` must
- /// be constructed in a `static` initializer. This reserves storage for the
- /// channel's message queue at compile-time. Then, at runtime, the channel
- /// is [`split`] into a [`StaticSender`]/[`StaticReceiver`] pair in order to
- /// be used.
- ///
- /// # Examples
- ///
- /// ```
- /// use thingbuf::mpsc::blocking::StaticChannel;
- ///
- /// // Construct a statically-allocated channel of `usize`s with a capacity
- /// // of 16 messages.
- /// static MY_CHANNEL: StaticChannel<usize, 16> = StaticChannel::new();
- ///
- /// fn main() {
- /// // Split the `StaticChannel` into a sender-receiver pair.
- /// let (tx, rx) = MY_CHANNEL.split();
- ///
- /// // Now, `tx` and `rx` can be used just like any other async MPSC
- /// // channel...
- /// # drop(tx); drop(rx);
- /// }
- /// ```
- ///
- /// [async]: crate::mpsc::StaticChannel
- /// [`split`]: StaticChannel::split
- pub struct StaticChannel<T, const CAPACITY: usize, R = recycling::DefaultRecycle> {
- core: ChannelCore<Thread>,
- slots: [Slot<T>; CAPACITY],
- is_split: AtomicBool,
- recycle: R,
- }
- /// Synchronously sends values to an associated [`StaticReceiver`].
- ///
- /// Instances of this struct are created by the [`StaticChannel::split`] and
- /// [``StaticChannel::try_split`] functions.
- pub struct StaticSender<T: 'static, R: 'static = recycling::DefaultRecycle> {
- core: &'static ChannelCore<Thread>,
- slots: &'static [Slot<T>],
- recycle: &'static R,
- }
- /// Synchronously receives values from associated [`StaticSender`]s.
- ///
- /// Instances of this struct are created by the [`StaticChannel::split`] and
- /// [``StaticChannel::try_split`] functions.
- pub struct StaticReceiver<T: 'static, R: 'static = recycling::DefaultRecycle> {
- core: &'static ChannelCore<Thread>,
- slots: &'static [Slot<T>],
- recycle: &'static R,
- }
- // === impl StaticChannel ===
- impl<T, const CAPACITY: usize> StaticChannel<T, CAPACITY> {
- /// Constructs a new statically-allocated, blocking bounded MPSC channel.
- ///
- /// A statically-allocated channel allows using a MPSC channel without
- /// requiring _any_ heap allocations. The [asynchronous variant][async] may be
- /// used in `#![no_std]` environments without requiring `liballoc`. This is a
- /// synchronous version which requires the Rust standard library, because it
- /// blocks the current thread in order to wait for send capacity. However, in
- /// some cases, it may offer _very slightly_ better performance than the
- /// non-static blocking channel due to requiring fewer heap pointer
- /// dereferences.
- ///
- /// In order to use a statically-allocated channel, a `StaticChannel` must
- /// be constructed in a `static` initializer. This reserves storage for the
- /// channel's message queue at compile-time. Then, at runtime, the channel
- /// is [`split`] into a [`StaticSender`]/[`StaticReceiver`] pair in order to
- /// be used.
- ///
- /// # Examples
- ///
- /// ```
- /// use thingbuf::mpsc::StaticChannel;
- ///
- /// // Construct a statically-allocated channel of `usize`s with a capacity
- /// // of 16 messages.
- /// static MY_CHANNEL: StaticChannel<usize, 16> = StaticChannel::new();
- ///
- /// fn main() {
- /// // Split the `StaticChannel` into a sender-receiver pair.
- /// let (tx, rx) = MY_CHANNEL.split();
- ///
- /// // Now, `tx` and `rx` can be used just like any other async MPSC
- /// // channel...
- /// # drop(tx); drop(rx);
- /// }
- /// ```
- ///
- /// [async]: crate::mpsc::StaticChannel
- /// [`split`]: StaticChannel::split
- pub const fn new() -> Self {
- Self {
- core: ChannelCore::new(CAPACITY),
- slots: Slot::make_static_array::<CAPACITY>(),
- is_split: AtomicBool::new(false),
- recycle: recycling::DefaultRecycle::new(),
- }
- }
- }
- impl<T, R, const CAPACITY: usize> StaticChannel<T, CAPACITY, R> {
- /// Split a [`StaticChannel`] into a [`StaticSender`]/[`StaticReceiver`]
- /// pair.
- ///
- /// A static channel can only be split a single time. If
- /// [`StaticChannel::split`] or [`StaticChannel::try_split`] have been
- /// called previously, this method will panic. For a non-panicking version
- /// of this method, see [`StaticChannel::try_split`].
- ///
- /// # Panics
- ///
- /// If the channel has already been split.
- pub fn split(&'static self) -> (StaticSender<T, R>, StaticReceiver<T, R>) {
- self.try_split().expect("channel already split")
- }
- /// Try to split a [`StaticChannel`] into a [`StaticSender`]/[`StaticReceiver`]
- /// pair, returning `None` if it has already been split.
- ///
- /// A static channel can only be split a single time. If
- /// [`StaticChannel::split`] or [`StaticChannel::try_split`] have been
- /// called previously, this method returns `None`.
- pub fn try_split(&'static self) -> Option<(StaticSender<T, R>, StaticReceiver<T, R>)> {
- self.is_split
- .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
- .ok()?;
- let tx = StaticSender {
- core: &self.core,
- slots: &self.slots[..],
- recycle: &self.recycle,
- };
- let rx = StaticReceiver {
- core: &self.core,
- slots: &self.slots[..],
- recycle: &self.recycle,
- };
- Some((tx, rx))
- }
- }
- // === impl StaticSender ===
- impl<T, R> StaticSender<T, R>
- where
- R: Recycle<T>,
- {
- /// Reserves a slot in the channel to mutate in place, blocking until
- /// there is a free slot to write to.
- ///
- /// This is similar to the [`send`] method, but, rather than taking a
- /// message by value to write to the channel, this method reserves a
- /// writable slot in the channel, and returns a [`SendRef`] that allows
- /// mutating the slot in place. If the [`StaticReceiver`] end of the
- /// channel uses the [`StaticReceiver::recv_ref`] method for receiving
- /// from the channel, this allows allocations for channel messages to be
- /// reused in place.
- ///
- /// # Errors
- ///
- /// If the [`StaticReceiver`] end of the channel has been dropped, this
- /// returns a [`Closed`] error.
- ///
- /// # Examples
- ///
- /// Sending formatted strings by writing them directly to channel slots,
- /// in place:
- /// ```
- /// use thingbuf::mpsc::blocking::StaticChannel;
- /// use std::{fmt::Write, thread};
- ///
- /// static CHANNEL: StaticChannel<String, 8> = StaticChannel::new();
- ///
- /// let (tx, rx) = CHANNEL.split();
- ///
- /// // Spawn a thread that prints each message received from the channel:
- /// thread::spawn(move || {
- /// for _ in 0..10 {
- /// let msg = rx.recv_ref().unwrap();
- /// println!("{}", msg);
- /// }
- /// });
- ///
- /// // Until the channel closes, write formatted messages to the channel.
- /// let mut count = 1;
- /// while let Ok(mut value) = tx.send_ref() {
- /// // Writing to the `SendRef` will reuse the *existing* string
- /// // allocation in place.
- /// write!(value, "hello from message {}", count)
- /// .expect("writing to a `String` should never fail");
- /// count += 1;
- /// }
- /// ```
- ///
- /// [`send`]: Self::send
- pub fn send_ref(&self) -> Result<SendRef<'_, T>, Closed> {
- send_ref(self.core, self.slots, self.recycle)
- }
- /// Sends a message by value, blocking until there is a free slot to
- /// write to.
- ///
- /// This method takes the message by value, and replaces any previous
- /// value in the slot. This means that the channel will *not* function
- /// as an object pool while sending messages with `send`. This method is
- /// most appropriate when messages don't own reusable heap allocations,
- /// or when the [`StaticReceiver`] end of the channel must receive messages
- /// by moving them out of the channel by value (using the
- /// [`StaticReceiver::recv`] method). When messages in the channel own
- /// reusable heap allocations (such as `String`s or `Vec`s), and the
- /// [`StaticReceiver`] doesn't need to receive them by value, consider using
- /// [`send_ref`] instead, to enable allocation reuse.
- ///
- /// # Errors
- ///
- /// If the [`StaticReceiver`] end of the channel has been dropped, this
- /// returns a [`Closed`] error containing the sent value.
- ///
- /// # Examples
- ///
- /// ```
- /// use thingbuf::mpsc::blocking::StaticChannel;
- /// use std::{fmt::Write, thread};
- ///
- /// static CHANNEL: StaticChannel<i32, 8> = StaticChannel::new();
- /// let (tx, rx) = CHANNEL.split();
- ///
- /// // Spawn a thread that prints each message received from the channel:
- /// thread::spawn(move || {
- /// for _ in 0..10 {
- /// let msg = rx.recv().unwrap();
- /// println!("received message {}", msg);
- /// }
- /// });
- ///
- /// // Until the channel closes, write the current iteration to the channel.
- /// let mut count = 1;
- /// while tx.send(count).is_ok() {
- /// count += 1;
- /// }
- /// ```
- /// [`send_ref`]: Self::send_ref
- pub fn send(&self, val: T) -> Result<(), Closed<T>> {
- match self.send_ref() {
- Err(Closed(())) => Err(Closed(val)),
- Ok(mut slot) => {
- *slot = val;
- Ok(())
- }
- }
- }
- /// Attempts to reserve a slot in the channel to mutate in place,
- /// without blocking until capacity is available.
- ///
- /// This method differs from [`send_ref`] by returning immediately if the
- /// channel’s buffer is full or no [`StaticReceiver`] exists. Compared with
- /// [`send_ref`], this method has two failure cases instead of one (one for
- /// disconnection, one for a full buffer), and this method will never block.
- ///
- /// Like [`send_ref`], this method returns a [`SendRef`] that may be
- /// used to mutate a slot in the channel's buffer in place. Dropping the
- /// [`SendRef`] completes the send operation and makes the mutated value
- /// available to the [`StaticReceiver`].
- ///
- /// # Errors
- ///
- /// If the channel capacity has been reached (i.e., the channel has `n`
- /// buffered values where `n` is the `usize` const generic parameter of
- /// the [`StaticChannel`]), then [`TrySendError::Full`] is returned. In
- /// this case, a future call to `try_send` may succeed if additional
- /// capacity becomes available.
- ///
- /// If the receive half of the channel is closed (i.e., the [`StaticReceiver`]
- /// handle was dropped), the function returns [`TrySendError::Closed`].
- /// Once the channel has closed, subsequent calls to `try_send_ref` will
- /// never succeed.
- ///
- /// [`send_ref`]: Self::send_ref
- pub fn try_send_ref(&self) -> Result<SendRef<'_, T>, TrySendError> {
- self.core
- .try_send_ref(self.slots, self.recycle)
- .map(SendRef)
- }
- /// Attempts to send a message by value immediately, without blocking until
- /// capacity is available.
- ///
- /// This method differs from [`send`] by returning immediately if the
- /// channel’s buffer is full or no [`StaticReceiver`] exists. Compared
- /// with [`send`], this method has two failure cases instead of one (one for
- /// disconnection, one for a full buffer), and this method will never block.
- ///
- /// # Errors
- ///
- /// If the channel capacity has been reached (i.e., the channel has `n`
- /// buffered values where `n` is the `usize` const generic parameter of
- /// the [`StaticChannel`]), then [`TrySendError::Full`] is returned. In
- /// this case, a future call to `try_send` may succeed if additional
- /// capacity becomes available.
- ///
- /// If the receive half of the channel is closed (i.e., the
- /// [`StaticReceiver`] handle was dropped), the function returns
- /// [`TrySendError::Closed`]. Once the channel has closed, subsequent
- /// calls to `try_send` will never succeed.
- ///
- /// In both cases, the error includes the value passed to `try_send`.
- ///
- /// [`send`]: Self::send
- pub fn try_send(&self, val: T) -> Result<(), TrySendError<T>> {
- self.core.try_send(self.slots, val, self.recycle)
- }
- }
- impl<T, R> Clone for StaticSender<T, R> {
- fn clone(&self) -> Self {
- test_dbg!(self.core.tx_count.fetch_add(1, Ordering::Relaxed));
- Self {
- core: self.core,
- slots: self.slots,
- recycle: self.recycle,
- }
- }
- }
- impl<T, R> Drop for StaticSender<T, R> {
- fn drop(&mut self) {
- if test_dbg!(self.core.tx_count.fetch_sub(1, Ordering::Release)) > 1 {
- return;
- }
- // if we are the last sender, synchronize
- test_dbg!(atomic::fence(Ordering::SeqCst));
- if self.core.core.close() {
- self.core.rx_wait.close_tx();
- }
- }
- }
- impl<T, R: fmt::Debug> fmt::Debug for StaticSender<T, R> {
- fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
- f.debug_struct("StaticSender")
- .field("core", &self.core)
- .field("slots", &format_args!("&[..]"))
- .field("recycle", &self.recycle)
- .finish()
- }
- }
- // === impl StaticReceiver ===
- impl<T, R> StaticReceiver<T, R> {
- /// Receives the next message for this receiver, **by reference**.
- ///
- /// This method returns `None` if the channel has been closed and there are
- /// no remaining messages in the channel's buffer. This indicates that no
- /// further values can ever be received from this `StaticReceiver`. The channel is
- /// closed when all [`StaticSender`]s have been dropped.
- ///
- /// If there are no messages in the channel's buffer, but the channel has
- /// not yet been closed, this method will block until a message is sent or
- /// the channel is closed.
- ///
- /// This method returns a [`RecvRef`] that can be used to read from (or
- /// mutate) the received message by reference. When the [`RecvRef`] is
- /// dropped, the receive operation completes and the slot occupied by
- /// the received message becomes usable for a future [`send_ref`] operation.
- ///
- /// If all [`StaticSender`]s for this channel write to the channel's
- /// slots in place by using the [`send_ref`] or [`try_send_ref`]
- /// methods, this method allows messages that own heap allocations to be reused in
- /// place.
- ///
- /// # Examples
- ///
- /// ```
- /// use thingbuf::mpsc::blocking::StaticChannel;
- /// use std::{thread, fmt::Write};
- ///
- /// static CHANNEL: StaticChannel<String, 100> = StaticChannel::new();
- /// let (tx, rx) = CHANNEL.split();
- ///
- /// thread::spawn(move || {
- /// let mut value = tx.send_ref().unwrap();
- /// write!(value, "hello world!")
- /// .expect("writing to a `String` should never fail");
- /// });
- ///
- /// assert_eq!(Some("hello world!"), rx.recv_ref().as_deref().map(String::as_str));
- /// assert_eq!(None, rx.recv().as_deref());
- /// ```
- ///
- /// Values are buffered:
- ///
- /// ```
- /// use thingbuf::mpsc::blocking::StaticChannel;
- /// use std::fmt::Write;
- ///
- /// static CHANNEL: StaticChannel<String, 100> = StaticChannel::new();
- /// let (tx, rx) = CHANNEL.split();
- ///
- /// write!(tx.send_ref().unwrap(), "hello").unwrap();
- /// write!(tx.send_ref().unwrap(), "world").unwrap();
- ///
- /// assert_eq!("hello", rx.recv_ref().unwrap().as_str());
- /// assert_eq!("world", rx.recv_ref().unwrap().as_str());
- /// ```
- ///
- /// [`send_ref`]: StaticSender::send_ref
- /// [`try_send_ref`]: StaticSender::try_send_ref
- pub fn recv_ref(&self) -> Option<RecvRef<'_, T>> {
- recv_ref(self.core, self.slots)
- }
- /// Receives the next message for this receiver, **by value**.
- ///
- /// This method returns `None` if the channel has been closed and there are
- /// no remaining messages in the channel's buffer. This indicates that no
- /// further values can ever be received from this `StaticReceiver`. The channel is
- /// closed when all [`StaticSender`]s have been dropped.
- ///
- /// If there are no messages in the channel's buffer, but the channel has
- /// not yet been closed, this method will block until a message is sent or
- /// the channel is closed.
- ///
- /// When a message is received, it is moved out of the channel by value,
- /// and replaced with a new slot according to the configured [recycling
- /// policy]. If all [`StaticSender`]s for this channel write to the channel's
- /// slots in place by using the [`send_ref`] or [`try_send_ref`] methods,
- /// consider using the [`recv_ref`] method instead, to enable the
- /// reuse of heap allocations.
- ///
- /// # Examples
- ///
- /// ```
- /// use thingbuf::mpsc::blocking::StaticChannel;
- /// use std::thread;
- ///
- /// static CHANNEL: StaticChannel<i32, 100> = StaticChannel::new();
- /// let (tx, rx) = CHANNEL.split();
- ///
- /// thread::spawn(move || {
- /// tx.send(1).unwrap();
- /// });
- ///
- /// assert_eq!(Some(1), rx.recv());
- /// assert_eq!(None, rx.recv());
- /// ```
- ///
- /// Values are buffered:
- ///
- /// ```
- /// use thingbuf::mpsc::blocking::StaticChannel;
- ///
- /// static CHANNEL: StaticChannel<i32, 100> = StaticChannel::new();
- /// let (tx, rx) = CHANNEL.split();
- ///
- /// tx.send(1).unwrap();
- /// tx.send(2).unwrap();
- ///
- /// assert_eq!(Some(1), rx.recv());
- /// assert_eq!(Some(2), rx.recv());
- /// ```
- ///
- /// [`send_ref`]: StaticSender::send_ref
- /// [`try_send_ref`]: StaticSender::try_send_ref
- /// [recycling policy]: crate::recycling::Recycle
- /// [`recv_ref`]: Self::recv_ref
- pub fn recv(&self) -> Option<T>
- where
- R: Recycle<T>,
- {
- let mut val = self.recv_ref()?;
- Some(recycling::take(&mut *val, self.recycle))
- }
- /// Returns `true` if the channel has closed (all corresponding
- /// [`StaticSender`]s have been dropped).
- ///
- /// If this method returns `true`, no new messages will become available
- /// on this channel. Previously sent messages may still be available.
- pub fn is_closed(&self) -> bool {
- test_dbg!(self.core.tx_count.load(Ordering::SeqCst)) <= 1
- }
- }
- impl<'a, T, R> Iterator for &'a StaticReceiver<T, R> {
- type Item = RecvRef<'a, T>;
- fn next(&mut self) -> Option<Self::Item> {
- self.recv_ref()
- }
- }
- impl<T, R> Drop for StaticReceiver<T, R> {
- fn drop(&mut self) {
- self.core.close_rx();
- }
- }
- impl<T, R: fmt::Debug> fmt::Debug for StaticReceiver<T, R> {
- fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
- f.debug_struct("StaticReceiver")
- .field("core", &self.core)
- .field("slots", &format_args!("&[..]"))
- .field("recycle", &self.recycle)
- .finish()
- }
- }
- }
- impl_send_ref! {
- /// A reference to a message being sent to a blocking channel.
- ///
- /// A `SendRef` represents the exclusive permission to mutate a given
- /// element in a channel. A `SendRef<T>` [implements `DerefMut<T>`] to allow
- /// writing to that element. This is analogous to the [`Ref`] type, except
- /// that it completes a `send_ref` or `try_send_ref` operation when it is
- /// dropped.
- ///
- /// This type is returned by the [`Sender::send_ref`] and
- /// [`Sender::try_send_ref`] (or [`StaticSender::send_ref`] and
- /// [`StaticSender::try_send_ref`]) methods.
- ///
- /// [implements `DerefMut<T>`]: #impl-DerefMut
- /// [`Ref`]: crate::Ref
- pub struct SendRef<Thread>;
- }
- impl_recv_ref! {
- /// A reference to a message being received from a blocking channel.
- ///
- /// A `RecvRef` represents the exclusive permission to mutate a given
- /// element in a channel. A `RecvRef<T>` [implements `DerefMut<T>`] to allow
- /// writing to that element. This is analogous to the [`Ref`] type, except
- /// that it completes a `recv_ref` operation when it is dropped.
- ///
- /// This type is returned by the [`Receiver::recv_ref`] and
- /// [`StaticReceiver::recv_ref`] methods.
- ///
- /// [implements `DerefMut<T>`]: #impl-DerefMut
- /// [`Ref`]: crate::Ref
- pub struct RecvRef<Thread>;
- }
- // === impl Sender ===
- impl<T, R> Sender<T, R>
- where
- R: Recycle<T>,
- {
- /// Reserves a slot in the channel to mutate in place, blocking until
- /// there is a free slot to write to.
- ///
- /// This is similar to the [`send`] method, but, rather than taking a
- /// message by value to write to the channel, this method reserves a
- /// writable slot in the channel, and returns a [`SendRef`] that allows
- /// mutating the slot in place. If the [`Receiver`] end of the channel
- /// uses the [`Receiver::recv_ref`] method for receiving from the channel,
- /// this allows allocations for channel messages to be reused in place.
- ///
- /// # Errors
- ///
- /// If the [`Receiver`] end of the channel has been dropped, this
- /// returns a [`Closed`] error.
- ///
- /// # Examples
- ///
- /// Sending formatted strings by writing them directly to channel slots,
- /// in place:
- /// ```
- /// use thingbuf::mpsc::blocking;
- /// use std::{fmt::Write, thread};
- ///
- /// let (tx, rx) = blocking::channel::<String>(8);
- ///
- /// // Spawn a thread that prints each message received from the channel:
- /// thread::spawn(move || {
- /// for _ in 0..10 {
- /// let msg = rx.recv_ref().unwrap();
- /// println!("{}", msg);
- /// }
- /// });
- ///
- /// // Until the channel closes, write formatted messages to the channel.
- /// let mut count = 1;
- /// while let Ok(mut value) = tx.send_ref() {
- /// // Writing to the `SendRef` will reuse the *existing* string
- /// // allocation in place.
- /// write!(value, "hello from message {}", count)
- /// .expect("writing to a `String` should never fail");
- /// count += 1;
- /// }
- /// ```
- ///
- /// [`send`]: Self::send
- pub fn send_ref(&self) -> Result<SendRef<'_, T>, Closed> {
- send_ref(
- &self.inner.core,
- self.inner.slots.as_ref(),
- &self.inner.recycle,
- )
- }
- /// Sends a message by value, blocking until there is a free slot to
- /// write to.
- ///
- /// This method takes the message by value, and replaces any previous
- /// value in the slot. This means that the channel will *not* function
- /// as an object pool while sending messages with `send`. This method is
- /// most appropriate when messages don't own reusable heap allocations,
- /// or when the [`Receiver`] end of the channel must receive messages by
- /// moving them out of the channel by value (using the
- /// [`Receiver::recv`] method). When messages in the channel own
- /// reusable heap allocations (such as `String`s or `Vec`s), and the
- /// [`Receiver`] doesn't need to receive them by value, consider using
- /// [`send_ref`] instead, to enable allocation reuse.
- ///
- /// # Errors
- ///
- /// If the [`Receiver`] end of the channel has been dropped, this
- /// returns a [`Closed`] error containing the sent value.
- ///
- /// # Examples
- ///
- /// ```
- /// use thingbuf::mpsc::blocking;
- /// use std::thread;
- ///
- /// let (tx, rx) = blocking::channel(8);
- ///
- /// // Spawn a thread that prints each message received from the channel:
- /// thread::spawn(move || {
- /// for _ in 0..10 {
- /// let msg = rx.recv().unwrap();
- /// println!("received message {}", msg);
- /// }
- /// });
- ///
- /// // Until the channel closes, write the current iteration to the channel.
- /// let mut count = 1;
- /// while tx.send(count).is_ok() {
- /// count += 1;
- /// }
- /// ```
- /// [`send_ref`]: Self::send_ref
- pub fn send(&self, val: T) -> Result<(), Closed<T>> {
- match self.send_ref() {
- Err(Closed(())) => Err(Closed(val)),
- Ok(mut slot) => {
- *slot = val;
- Ok(())
- }
- }
- }
- /// Attempts to reserve a slot in the channel to mutate in place,
- /// without blocking until capacity is available.
- ///
- /// This method differs from [`send_ref`] by returning immediately if the
- /// channel’s buffer is full or no [`Receiver`] exists. Compared with
- /// [`send_ref`], this method has two failure cases instead of one (one for
- /// disconnection, one for a full buffer), and this method will never block.
- ///
- /// Like [`send_ref`], this method returns a [`SendRef`] that may be
- /// used to mutate a slot in the channel's buffer in place. Dropping the
- /// [`SendRef`] completes the send operation and makes the mutated value
- /// available to the [`Receiver`].
- ///
- /// # Errors
- ///
- /// If the channel capacity has been reached (i.e., the channel has `n`
- /// buffered values where `n` is the argument passed to
- /// [`channel`]/[`with_recycle`]), then [`TrySendError::Full`] is
- /// returned. In this case, a future call to `try_send` may succeed if
- /// additional capacity becomes available.
- ///
- /// If the receive half of the channel is closed (i.e., the [`Receiver`]
- /// handle was dropped), the function returns [`TrySendError::Closed`].
- /// Once the channel has closed, subsequent calls to `try_send_ref` will
- /// never succeed.
- ///
- /// [`send_ref`]: Self::send_ref
- pub fn try_send_ref(&self) -> Result<SendRef<'_, T>, TrySendError> {
- self.inner
- .core
- .try_send_ref(self.inner.slots.as_ref(), &self.inner.recycle)
- .map(SendRef)
- }
- /// Attempts to send a message by value immediately, without blocking until
- /// capacity is available.
- ///
- /// This method differs from [`send`] by returning immediately if the
- /// channel’s buffer is full or no [`Receiver`] exists. Compared with
- /// [`send`], this method has two failure cases instead of one (one for
- /// disconnection, one for a full buffer), and this method will never block.
- ///
- /// # Errors
- ///
- /// If the channel capacity has been reached (i.e., the channel has `n`
- /// buffered values where `n` is the argument passed to
- /// [`channel`]/[`with_recycle`]), then [`TrySendError::Full`] is
- /// returned. In this case, a future call to `try_send` may succeed if
- /// additional capacity becomes available.
- ///
- /// If the receive half of the channel is closed (i.e., the [`Receiver`]
- /// handle was dropped), the function returns [`TrySendError::Closed`].
- /// Once the channel has closed, subsequent calls to `try_send` will
- /// never succeed.
- ///
- /// In both cases, the error includes the value passed to `try_send`.
- ///
- /// [`send`]: Self::send
- pub fn try_send(&self, val: T) -> Result<(), TrySendError<T>> {
- self.inner
- .core
- .try_send(self.inner.slots.as_ref(), val, &self.inner.recycle)
- }
- }
- impl<T, R> Clone for Sender<T, R> {
- fn clone(&self) -> Self {
- test_dbg!(self.inner.core.tx_count.fetch_add(1, Ordering::Relaxed));
- Self {
- inner: self.inner.clone(),
- }
- }
- }
- impl<T, R> Drop for Sender<T, R> {
- fn drop(&mut self) {
- if test_dbg!(self.inner.core.tx_count.fetch_sub(1, Ordering::Release)) > 1 {
- return;
- }
- // if we are the last sender, synchronize
- test_dbg!(atomic::fence(Ordering::SeqCst));
- if self.inner.core.core.close() {
- self.inner.core.rx_wait.close_tx();
- }
- }
- }
- // === impl Receiver ===
- impl<T, R> Receiver<T, R> {
- /// Receives the next message for this receiver, **by reference**.
- ///
- /// This method returns `None` if the channel has been closed and there are
- /// no remaining messages in the channel's buffer. This indicates that no
- /// further values can ever be received from this `Receiver`. The channel is
- /// closed when all [`Sender`]s have been dropped.
- ///
- /// If there are no messages in the channel's buffer, but the channel has
- /// not yet been closed, this method will block until a message is sent or
- /// the channel is closed.
- ///
- /// This method returns a [`RecvRef`] that can be used to read from (or
- /// mutate) the received message by reference. When the [`RecvRef`] is
- /// dropped, the receive operation completes and the slot occupied by
- /// the received message becomes usable for a future [`send_ref`] operation.
- ///
- /// If all [`Sender`]s for this channel write to the channel's slots in
- /// place by using the [`send_ref`] or [`try_send_ref`] methods, this
- /// method allows messages that own heap allocations to be reused in
- /// place.
- ///
- /// # Examples
- ///
- /// ```
- /// use thingbuf::mpsc::blocking;
- /// use std::{thread, fmt::Write};
- ///
- /// let (tx, rx) = blocking::channel::<String>(100);
- ///
- /// thread::spawn(move || {
- /// let mut value = tx.send_ref().unwrap();
- /// write!(value, "hello world!")
- /// .expect("writing to a `String` should never fail");
- /// });
- ///
- /// assert_eq!(Some("hello world!"), rx.recv_ref().as_deref().map(String::as_str));
- /// assert_eq!(None, rx.recv().as_deref());
- /// ```
- ///
- /// Values are buffered:
- ///
- /// ```
- /// use thingbuf::mpsc::blocking;
- /// use std::fmt::Write;
- ///
- /// let (tx, rx) = blocking::channel::<String>(100);
- ///
- /// write!(tx.send_ref().unwrap(), "hello").unwrap();
- /// write!(tx.send_ref().unwrap(), "world").unwrap();
- ///
- /// assert_eq!("hello", rx.recv_ref().unwrap().as_str());
- /// assert_eq!("world", rx.recv_ref().unwrap().as_str());
- /// ```
- ///
- /// [`send_ref`]: Sender::send_ref
- /// [`try_send_ref`]: Sender::try_send_ref
- pub fn recv_ref(&self) -> Option<RecvRef<'_, T>> {
- recv_ref(&self.inner.core, self.inner.slots.as_ref())
- }
- /// Receives the next message for this receiver, **by value**.
- ///
- /// This method returns `None` if the channel has been closed and there are
- /// no remaining messages in the channel's buffer. This indicates that no
- /// further values can ever be received from this `Receiver`. The channel is
- /// closed when all [`Sender`]s have been dropped.
- ///
- /// If there are no messages in the channel's buffer, but the channel has
- /// not yet been closed, this method will block until a message is sent or
- /// the channel is closed.
- ///
- /// When a message is received, it is moved out of the channel by value,
- /// and replaced with a new slot according to the configured [recycling
- /// policy]. If all [`Sender`]s for this channel write to the channel's
- /// slots in place by using the [`send_ref`] or [`try_send_ref`] methods,
- /// consider using the [`recv_ref`] method instead, to enable the
- /// reuse of heap allocations.
- ///
- /// # Examples
- ///
- /// ```
- /// use thingbuf::mpsc::blocking;
- /// use std::{thread, fmt::Write};
- ///
- /// let (tx, rx) = blocking::channel(100);
- ///
- /// thread::spawn(move || {
- /// tx.send(1).unwrap();
- /// });
- ///
- /// assert_eq!(Some(1), rx.recv());
- /// assert_eq!(None, rx.recv());
- /// ```
- ///
- /// Values are buffered:
- ///
- /// ```
- /// use thingbuf::mpsc::blocking;
- ///
- /// let (tx, rx) = blocking::channel(100);
- ///
- /// tx.send(1).unwrap();
- /// tx.send(2).unwrap();
- ///
- /// assert_eq!(Some(1), rx.recv());
- /// assert_eq!(Some(2), rx.recv());
- /// ```
- ///
- /// [`send_ref`]: Sender::send_ref
- /// [`try_send_ref`]: Sender::try_send_ref
- /// [recycling policy]: crate::recycling::Recycle
- /// [`recv_ref`]: Self::recv_ref
- pub fn recv(&self) -> Option<T>
- where
- R: Recycle<T>,
- {
- let mut val = self.recv_ref()?;
- Some(recycling::take(&mut *val, &self.inner.recycle))
- }
- /// Returns `true` if the channel has closed (all corresponding
- /// [`Sender`]s have been dropped).
- ///
- /// If this method returns `true`, no new messages will become available
- /// on this channel. Previously sent messages may still be available.
- pub fn is_closed(&self) -> bool {
- test_dbg!(self.inner.core.tx_count.load(Ordering::SeqCst)) <= 1
- }
- }
- impl<'a, T, R> Iterator for &'a Receiver<T, R> {
- type Item = RecvRef<'a, T>;
- fn next(&mut self) -> Option<Self::Item> {
- self.recv_ref()
- }
- }
- impl<T, R> Drop for Receiver<T, R> {
- fn drop(&mut self) {
- self.inner.core.close_rx();
- }
- }
- // === impl Inner ===
- impl<T, R: fmt::Debug> fmt::Debug for Inner<T, R> {
- fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
- f.debug_struct("Inner")
- .field("core", &self.core)
- .field("slots", &format_args!("Box<[..]>"))
- .field("recycle", &self.recycle)
- .finish()
- }
- }
- impl<T, R> Drop for Inner<T, R> {
- fn drop(&mut self) {
- self.core.core.drop_slots(&mut self.slots[..])
- }
- }
- #[inline]
- fn recv_ref<'a, T>(core: &'a ChannelCore<Thread>, slots: &'a [Slot<T>]) -> Option<RecvRef<'a, T>> {
- loop {
- match core.poll_recv_ref(slots, thread::current) {
- Poll::Ready(r) => {
- return r.map(|slot| RecvRef {
- _notify: super::NotifyTx(&core.tx_wait),
- slot,
- })
- }
- Poll::Pending => {
- test_println!("parking ({:?})", thread::current());
- thread::park();
- }
- }
- }
- }
- #[inline]
- fn send_ref<'a, T, R: Recycle<T>>(
- core: &'a ChannelCore<Thread>,
- slots: &'a [Slot<T>],
- recycle: &'a R,
- ) -> Result<SendRef<'a, T>, Closed<()>> {
- // fast path: avoid getting the thread and constructing the node if the
- // slot is immediately ready.
- match core.try_send_ref(slots, recycle) {
- Ok(slot) => return Ok(SendRef(slot)),
- Err(TrySendError::Closed(_)) => return Err(Closed(())),
- _ => {}
- }
- let mut waiter = queue::Waiter::new();
- let mut unqueued = true;
- let thread = thread::current();
- let mut boff = Backoff::new();
- loop {
- let node = unsafe {
- // Safety: in this case, it's totally safe to pin the waiter, as
- // it is owned uniquely by this function, and it cannot possibly
- // be moved while this thread is parked.
- Pin::new_unchecked(&mut waiter)
- };
- let wait = if unqueued {
- test_dbg!(core.tx_wait.start_wait(node, &thread))
- } else {
- test_dbg!(core.tx_wait.continue_wait(node, &thread))
- };
- match wait {
- WaitResult::Closed => return Err(Closed(())),
- WaitResult::Notified => {
- boff.spin_yield();
- match core.try_send_ref(slots.as_ref(), recycle) {
- Ok(slot) => return Ok(SendRef(slot)),
- Err(TrySendError::Closed(_)) => return Err(Closed(())),
- _ => {}
- }
- }
- WaitResult::Wait => {
- unqueued = false;
- thread::park();
- }
- }
- }
- }
|