|
@@ -1,15 +1,253 @@
|
|
|
-//! Multi-producer, single-consumer channels using [`ThingBuf`](crate::ThingBuf).
|
|
|
+//! Multi-producer, single-consumer channels using [`ThingBuf`].
|
|
|
//!
|
|
|
-//! The default MPSC channel returned by the [`channel`] function is
|
|
|
-//! _asynchronous_: receiving from the channel is an `async fn`, and the
|
|
|
-//! receiving task willwait when there are no messages in the channel.
|
|
|
+//! A _channel_ is a synchronization and communication primitive that combines a
|
|
|
+//! shared queue with the ability to _wait_. Channels provide a [send]
|
|
|
+//! operation, which enqueues a message if there is capacity in the queue, or
|
|
|
+//! waits for capacity to become available if there is none; and a [receive]
|
|
|
+//! operation, which dequeues a message from the queue if any are available, or
|
|
|
+//! waits for a message to be sent if the queue is empty. This module provides
|
|
|
+//! an implementation of multi-producer, single-consumer channels built on top
|
|
|
+//! of the lock-free queue implemented by [`ThingBuf`].
|
|
|
+//!
|
|
|
+//! The channel API in this module is broadly similar to the one provided by
|
|
|
+//! other implementations, such as [`std::sync::mpsc`], [`tokio::sync::mpsc`],
|
|
|
+//! or [`crossbeam::channel`]. A given channel instance is represented by a pair
|
|
|
+//! of types:
|
|
|
+//!
|
|
|
+//! * a [`Sender`] handle, which represents the capacity to [send] messages to the
|
|
|
+//! channel
|
|
|
+//! * a [`Receiver`] handle, which represents the capacity to [receive] messages
|
|
|
+//! from the channel
|
|
|
+//!
|
|
|
+//! ```
|
|
|
+//! // Constructing a channel returns a paired `Sender` and `Receiver`:
|
|
|
+//! let (tx, rx) = thingbuf::mpsc::channel::<String>(10);
|
|
|
+//! # drop((tx, rx));
|
|
|
+//! ```
|
|
|
+//!
|
|
|
+//! As these are multi-producer, single-consumer channels, the [`Sender`] type
|
|
|
+//! implements `Clone`; it may be cloned any number of times to create multiple
|
|
|
+//! [`Sender`]s that send messages to the same channel. On the other hand, each
|
|
|
+//! channel instance has only a single [`Receiver`].
|
|
|
+//!
|
|
|
+//! # Disconnection
|
|
|
+//!
|
|
|
+//! When all [`Sender`] handles have been dropped, it is no longer
|
|
|
+//! possible to send values into the channel. When this occurs, the channel is
|
|
|
+//! considered to have been closed by the sender; calling [`Receiver::recv`]
|
|
|
+//! once a channel has closed will return `None`. Note that if the [`Receiver`] has
|
|
|
+//! not received any messages sent prior to the last [`Sender`] being dropped,
|
|
|
+//! those messages will be returned before [`Receiver::recv`] returns `None`.
|
|
|
+//!
|
|
|
+//! If the [`Receiver`] handle is dropped, then messages can no longer
|
|
|
+//! be read out of the channel. In this case, all further attempts to send will
|
|
|
+//! result in an error.
|
|
|
+//!
|
|
|
+//! # Channel Flavors
|
|
|
+//!
|
|
|
+//! This module contains several different "flavors" of multi-producer,
|
|
|
+//! single-consumer channels. The primary differences between the different
|
|
|
+//! channel implementations are whether they wait asynchronously or by blocking
|
|
|
+//! the current thread, and whether the array that stores channel messages is
|
|
|
+//! allocated dynamically on the heap or statically at compile-time.
|
|
|
+//!
|
|
|
+//! | | **Dynamic Allocation** | **Static Allocation** |
|
|
|
+//! |--------------|------------------------|------------------------------|
|
|
|
+//! | **Async** | [`channel`] | [`StaticChannel`] |
|
|
|
+//! | **Blocking** | [`blocking::channel`] | [`blocking::StaticChannel`] |
|
|
|
+//!
|
|
|
+//! ## Asynchronous and Blocking Channels
|
|
|
+//!
|
|
|
+//! The default channel implementation (in this module) are _asynchronous_.
|
|
|
+//! With these channels, the send and receive operations are [`async fn`]s
|
|
|
+//! — when it's necessary to wait to send or receive a message, the
|
|
|
+//! [`Future`]s returned by these functions will _yield_, allowing other [tasks]
|
|
|
+//! to execute. These asynchronous channels are suitable for use in an async
|
|
|
+//! runtime like [`tokio`], or in embedded or bare-metal systems that use
|
|
|
+//! `async`/`await` syntax. They do not require the Rust standard library.
|
|
|
//!
|
|
|
//! If the "std" feature flag is enabled, this module also provides a
|
|
|
-//! synchronous channel, in the [`blocking`] module. The synchronous receiver
|
|
|
-//! will instead wait for new messages by blocking the current thread.
|
|
|
-//! Naturally, this requires the Rust standard library. A synchronous channel
|
|
|
-//! can be constructed using the [`blocking::channel`] function.
|
|
|
-
|
|
|
+//! [`blocking`] submodule, which implements _blocking_ (or _synchronous_)
|
|
|
+//! channels. The [blocking receiver] will instead wait for new messages by
|
|
|
+//! blocking the current thread, and the [blocking sender] will wait for send
|
|
|
+//! capacity by blocking the current thread. A blocking channel can be
|
|
|
+//! constructed using the [`blocking::channel`] function. Naturally, blocking
|
|
|
+//! the current thread requires thread APIs, so these channels are only
|
|
|
+//! available when the Rust standard library is available.
|
|
|
+//!
|
|
|
+//! An asynchronous channel is used with asynchronous tasks:
|
|
|
+//!
|
|
|
+//! ```rust
|
|
|
+//! # async fn example() {
|
|
|
+//! # mod tokio {
|
|
|
+//! # pub fn spawn(_: impl std::future::Future) {}
|
|
|
+//! # }
|
|
|
+//! use thingbuf::mpsc;
|
|
|
+//!
|
|
|
+//! let (tx, rx) = mpsc::channel(8);
|
|
|
+//!
|
|
|
+//! // Spawn some tasks that write to the channel:
|
|
|
+//! for i in 0..10 {
|
|
|
+//! let tx = tx.clone();
|
|
|
+//! tokio::spawn(async move {
|
|
|
+//! tx.send(i).await.unwrap();
|
|
|
+//! });
|
|
|
+//! }
|
|
|
+//!
|
|
|
+//! // Print out every message recieved from the channel:
|
|
|
+//! for _ in 0..10 {
|
|
|
+//! let j = rx.recv().await.unwrap();
|
|
|
+//!
|
|
|
+//! println!("received {}", j);
|
|
|
+//! assert!(0 <= j && j < 10);
|
|
|
+//! }
|
|
|
+//! # }
|
|
|
+//! ```
|
|
|
+//!
|
|
|
+//! A blocking channel is used with threads:
|
|
|
+//!
|
|
|
+//! ```rust
|
|
|
+//! use thingbuf::mpsc::blocking;
|
|
|
+//! use std::thread;
|
|
|
+//!
|
|
|
+//! let (tx, rx) = blocking::channel(8);
|
|
|
+//!
|
|
|
+//! // Spawn some threads that write to the channel:
|
|
|
+//! for i in 0..10 {
|
|
|
+//! let tx = tx.clone();
|
|
|
+//! thread::spawn(move || {
|
|
|
+//! tx.send(i).unwrap();
|
|
|
+//! });
|
|
|
+//! }
|
|
|
+//!
|
|
|
+//! // Print out every message recieved from the channel:
|
|
|
+//! for _ in 0..10 {
|
|
|
+//! let j = rx.recv().unwrap();
|
|
|
+//!
|
|
|
+//! println!("received {}", j);
|
|
|
+//! assert!(0 <= j && j < 10);
|
|
|
+//! }
|
|
|
+//! ```
|
|
|
+//!
|
|
|
+//! ## Static and Dynamically Allocated Channels
|
|
|
+//!
|
|
|
+//! The other difference between channel flavors is whether the array that backs
|
|
|
+//! the channel's queue is allocated dynamically on the heap, or allocated
|
|
|
+//! statically.
|
|
|
+//!
|
|
|
+//! The default channels returned by [`channel`] and [`blocking::channel`] are
|
|
|
+//! _dynamically allocated_: although they are fixed-size, the size of the
|
|
|
+//! channel can be determined at runtime when the channel is created, and any
|
|
|
+//! number of dynamically-allocated channels may be created and destroyed over
|
|
|
+//! the lifetime of the program. Because these channels dynamically allocate
|
|
|
+//! memory, they require the "alloc" feature flag.
|
|
|
+//!
|
|
|
+//! In some use cases, though, dynamic memory allocation may not be available
|
|
|
+//! (such as in some embedded systems, or within a memory allocator
|
|
|
+//! implementation), or it may be desirable to avoid dynamic memory allocation
|
|
|
+//! (such as in very performance-sensitive applications). To support those use
|
|
|
+//! cases, `thingbuf::mpsc` also provides _static channels_. The size of these
|
|
|
+//! channels is determined at compile-time (by a const generic parameter) rather
|
|
|
+//! than at runtime, so they may be constructed in a static initializer. This
|
|
|
+//! allows allocating the array that backs the channel's queue statically, so
|
|
|
+//! that dynamic allocation is not needed.The [`StaticChannel`] and
|
|
|
+//! [`blocking::StaticChannel`] types are used to construct static channels.
|
|
|
+//!
|
|
|
+//! A dynamically allocated channel's size can be determined at runtime:
|
|
|
+//!
|
|
|
+//! ```
|
|
|
+//! use thingbuf::mpsc::blocking;
|
|
|
+//! use std::env;
|
|
|
+//!
|
|
|
+//! # // the `main` fn is used explicitly to show that this is an "application".
|
|
|
+//! # #[allow(clippy::needless_doctest_main)]
|
|
|
+//! fn main() {
|
|
|
+//! // Determine the channel capacity from the first command-line line
|
|
|
+//! // argument, if one is present.
|
|
|
+//! let channel_capacity = env::args()
|
|
|
+//! .nth(1)
|
|
|
+//! .and_then(|cap| cap.parse::<usize>().ok())
|
|
|
+//! .unwrap_or(16);
|
|
|
+//!
|
|
|
+//! // Construct a dynamically-sized blocking channel.
|
|
|
+//! let (tx, rx) = blocking::channel(channel_capacity);
|
|
|
+//!
|
|
|
+//! tx.send("hello world").unwrap();
|
|
|
+//!
|
|
|
+//! // ...
|
|
|
+//! # drop(tx); drop(rx);
|
|
|
+//! }
|
|
|
+//! ```
|
|
|
+//!
|
|
|
+//! A statically allocated channel may be used without heap allocation:
|
|
|
+//!
|
|
|
+//! ```
|
|
|
+//! // We are in a `no_std` context with no memory allocator!
|
|
|
+//! #![no_std]
|
|
|
+//! use thingbuf::mpsc;
|
|
|
+//!
|
|
|
+//! // Create a channel backed by a static array with 256 entries.
|
|
|
+//! static KERNEL_EVENTS: mpsc::StaticChannel<KernelEvent, 256> = mpsc::StaticChannel::new();
|
|
|
+//!
|
|
|
+//! #[no_mangle]
|
|
|
+//! pub fn kernel_main() {
|
|
|
+//! // Split the static channel into a sender/receiver pair.
|
|
|
+//! let (event_tx, event_rx) = KERNEL_EVENTS.split();
|
|
|
+//! let mut kernel_tasks = TaskExecutor::new();
|
|
|
+//!
|
|
|
+//! kernel_tasks.spawn(async move {
|
|
|
+//! // Process kernel events
|
|
|
+//! while let Some(event) = event_rx.recv().await {
|
|
|
+//! // ...
|
|
|
+//! # drop(event);
|
|
|
+//! }
|
|
|
+//! });
|
|
|
+//!
|
|
|
+//! // Some device driver that needs to emit events to the channel.
|
|
|
+//! kernel_tasks.spawn(some_device_driver(event_tx.clone()));
|
|
|
+//!
|
|
|
+//! loop {
|
|
|
+//! kernel_tasks.tick();
|
|
|
+//! }
|
|
|
+//! }
|
|
|
+//!
|
|
|
+//! async fn some_device_driver(event_tx: mpsc::StaticSender<KernelEvent>) {
|
|
|
+//! let mut device = SomeDevice::new(0x42);
|
|
|
+//!
|
|
|
+//! loop {
|
|
|
+//! // When the device has data, emit a kernel event.
|
|
|
+//! match device.poll_for_events().await {
|
|
|
+//! Ok(event) => event_tx.send(event).await.unwrap(),
|
|
|
+//! Err(err) => event_tx.send(KernelEvent::from(err)).await.unwrap(),
|
|
|
+//! }
|
|
|
+//! }
|
|
|
+//! }
|
|
|
+//! # type KernelEvent = ();
|
|
|
+//! # struct TaskExecutor;
|
|
|
+//! # impl TaskExecutor {
|
|
|
+//! # const fn new() -> Self { Self }
|
|
|
+//! # fn spawn(&mut self, _: impl core::future::Future) {}
|
|
|
+//! # fn tick(&mut self) {}
|
|
|
+//! # }
|
|
|
+//! # struct SomeDevice(u64);
|
|
|
+//! # impl SomeDevice {
|
|
|
+//! # fn new(u: u64) -> Self { Self(u) }
|
|
|
+//! # async fn poll_for_events(&mut self) -> Result<(), ()> { Ok(()) }
|
|
|
+//! # }
|
|
|
+//! # fn main() {}
|
|
|
+//! ```
|
|
|
+//!
|
|
|
+//! [send]: Sender::send
|
|
|
+//! [receive]: Receiver::recv
|
|
|
+//! [`ThingBuf`]: crate::ThingBuf
|
|
|
+//! [`tokio::sync::mpsc`]: https://docs.rs/tokio/latest/tokio/sync/mpsc/index.html
|
|
|
+//! [`crossbeam::channel`]: https://docs.rs/crossbeam/latest/crossbeam/channel/index.html
|
|
|
+//! [`async fn`]: https://rust-lang.github.io/async-book/01_getting_started/04_async_await_primer.html
|
|
|
+//! [`Future`]: core::future::Future
|
|
|
+//! [tasks]: core::task
|
|
|
+//! [`tokio`]: https://tokio.rs
|
|
|
+//! [blocking receiver]: blocking::Receiver
|
|
|
+//! [blocking sender]: blocking::Sender
|
|
|
use crate::{
|
|
|
loom::{atomic::AtomicUsize, hint},
|
|
|
recycling::Recycle,
|