|
@@ -1,6 +1,7 @@
|
|
|
use super::*;
|
|
|
use crate::{
|
|
|
loom::atomic::{self, AtomicBool, Ordering},
|
|
|
+ recycling::{self, Recycle},
|
|
|
wait::queue,
|
|
|
Ref,
|
|
|
};
|
|
@@ -16,12 +17,21 @@ feature! {
|
|
|
|
|
|
use crate::loom::sync::Arc;
|
|
|
|
|
|
- /// Returns a new synchronous multi-producer, single consumer channel.
|
|
|
- pub fn channel<T: Default>(capacity: usize) -> (Sender<T>, Receiver<T>) {
|
|
|
+ /// Returns a new asynchronous multi-producer, single consumer channel.
|
|
|
+ pub fn channel<T: Default + Clone>(capacity: usize) -> (Sender<T>, Receiver<T>) {
|
|
|
+ with_recycle(capacity, recycling::DefaultRecycle::new())
|
|
|
+ }
|
|
|
+
|
|
|
+ /// Returns a new asynchronous multi-producer, single consumer channel with
|
|
|
+ /// the provided [recycling policy].
|
|
|
+ ///
|
|
|
+ /// [recycling policy]: crate::recycling::Recycle
|
|
|
+ pub fn with_recycle<T, R: Recycle<T>>(capacity: usize, recycle: R) -> (Sender<T, R>, Receiver<T, R>) {
|
|
|
assert!(capacity > 0);
|
|
|
let inner = Arc::new(Inner {
|
|
|
core: ChannelCore::new(capacity),
|
|
|
slots: Slot::make_boxed_array(capacity),
|
|
|
+ recycle,
|
|
|
});
|
|
|
let tx = Sender {
|
|
|
inner: inner.clone(),
|
|
@@ -32,18 +42,19 @@ feature! {
|
|
|
|
|
|
#[derive(Debug)]
|
|
|
|
|
|
- pub struct Receiver<T> {
|
|
|
- inner: Arc<Inner<T>>,
|
|
|
+ pub struct Receiver<T, R = recycling::DefaultRecycle> {
|
|
|
+ inner: Arc<Inner<T, R>>,
|
|
|
}
|
|
|
|
|
|
#[derive(Debug)]
|
|
|
- pub struct Sender<T> {
|
|
|
- inner: Arc<Inner<T>>,
|
|
|
+ pub struct Sender<T, R = recycling::DefaultRecycle> {
|
|
|
+ inner: Arc<Inner<T, R>>,
|
|
|
}
|
|
|
|
|
|
- struct Inner<T> {
|
|
|
+ struct Inner<T, R> {
|
|
|
core: super::ChannelCore<Waker>,
|
|
|
slots: Box<[Slot<T>]>,
|
|
|
+ recycle: R,
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -79,19 +90,22 @@ feature! {
|
|
|
/// ```
|
|
|
/// [`split`]: StaticChannel::split
|
|
|
#[cfg_attr(all(loom, test), allow(dead_code))]
|
|
|
-pub struct StaticChannel<T, const CAPACITY: usize> {
|
|
|
+pub struct StaticChannel<T, const CAPACITY: usize, R = recycling::DefaultRecycle> {
|
|
|
core: ChannelCore<Waker>,
|
|
|
+ recycle: R,
|
|
|
slots: [Slot<T>; CAPACITY],
|
|
|
is_split: AtomicBool,
|
|
|
}
|
|
|
|
|
|
-pub struct StaticSender<T: 'static> {
|
|
|
+pub struct StaticSender<T: 'static, R: 'static = recycling::DefaultRecycle> {
|
|
|
core: &'static ChannelCore<Waker>,
|
|
|
+ recycle: &'static R,
|
|
|
slots: &'static [Slot<T>],
|
|
|
}
|
|
|
|
|
|
-pub struct StaticReceiver<T: 'static> {
|
|
|
+pub struct StaticReceiver<T: 'static, R: 'static = recycling::DefaultRecycle> {
|
|
|
core: &'static ChannelCore<Waker>,
|
|
|
+ recycle: &'static R,
|
|
|
slots: &'static [Slot<T>],
|
|
|
}
|
|
|
|
|
@@ -122,15 +136,17 @@ pub struct RecvRefFuture<'a, T> {
|
|
|
///
|
|
|
/// [`ThingBuf`]: crate::ThingBuf
|
|
|
#[must_use = "futures do nothing unless you `.await` or poll them"]
|
|
|
-pub struct RecvFuture<'a, T> {
|
|
|
+pub struct RecvFuture<'a, T, R = recycling::DefaultRecycle> {
|
|
|
core: &'a ChannelCore<Waker>,
|
|
|
slots: &'a [Slot<T>],
|
|
|
+ recycle: &'a R,
|
|
|
}
|
|
|
|
|
|
#[pin_project::pin_project(PinnedDrop)]
|
|
|
-struct SendRefFuture<'sender, T> {
|
|
|
+struct SendRefFuture<'sender, T, R> {
|
|
|
core: &'sender ChannelCore<Waker>,
|
|
|
slots: &'sender [Slot<T>],
|
|
|
+ recycle: &'sender R,
|
|
|
state: State,
|
|
|
#[pin]
|
|
|
waiter: queue::Waiter<Waker>,
|
|
@@ -183,9 +199,12 @@ impl<T, const CAPACITY: usize> StaticChannel<T, CAPACITY> {
|
|
|
core: ChannelCore::new(CAPACITY),
|
|
|
slots: Slot::make_static_array::<CAPACITY>(),
|
|
|
is_split: AtomicBool::new(false),
|
|
|
+ recycle: recycling::DefaultRecycle::new(),
|
|
|
}
|
|
|
}
|
|
|
+}
|
|
|
|
|
|
+impl<T, R, const CAPACITY: usize> StaticChannel<T, CAPACITY, R> {
|
|
|
/// Split a [`StaticChannel`] into a [`StaticSender`]/[`StaticReceiver`]
|
|
|
/// pair.
|
|
|
///
|
|
@@ -197,7 +216,7 @@ impl<T, const CAPACITY: usize> StaticChannel<T, CAPACITY> {
|
|
|
/// # Panics
|
|
|
///
|
|
|
/// If the channel has already been split.
|
|
|
- pub fn split(&'static self) -> (StaticSender<T>, StaticReceiver<T>) {
|
|
|
+ pub fn split(&'static self) -> (StaticSender<T, R>, StaticReceiver<T, R>) {
|
|
|
self.try_split().expect("channel already split")
|
|
|
}
|
|
|
|
|
@@ -207,16 +226,18 @@ impl<T, const CAPACITY: usize> StaticChannel<T, CAPACITY> {
|
|
|
/// A static channel can only be split a single time. If
|
|
|
/// [`StaticChannel::split`] or [`StaticChannel::try_split`] have been
|
|
|
/// called previously, this method returns `None`.
|
|
|
- pub fn try_split(&'static self) -> Option<(StaticSender<T>, StaticReceiver<T>)> {
|
|
|
+ pub fn try_split(&'static self) -> Option<(StaticSender<T, R>, StaticReceiver<T, R>)> {
|
|
|
self.is_split
|
|
|
.compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
|
|
|
.ok()?;
|
|
|
let tx = StaticSender {
|
|
|
core: &self.core,
|
|
|
+ recycle: &self.recycle,
|
|
|
slots: &self.slots[..],
|
|
|
};
|
|
|
let rx = StaticReceiver {
|
|
|
core: &self.core,
|
|
|
+ recycle: &self.recycle,
|
|
|
slots: &self.slots[..],
|
|
|
};
|
|
|
Some((tx, rx))
|
|
@@ -226,22 +247,28 @@ impl<T, const CAPACITY: usize> StaticChannel<T, CAPACITY> {
|
|
|
// === impl Sender ===
|
|
|
|
|
|
#[cfg(feature = "alloc")]
|
|
|
-impl<T: Default> Sender<T> {
|
|
|
+impl<T, R> Sender<T, R>
|
|
|
+where
|
|
|
+ R: Recycle<T>,
|
|
|
+{
|
|
|
pub fn try_send_ref(&self) -> Result<SendRef<'_, T>, TrySendError> {
|
|
|
self.inner
|
|
|
.core
|
|
|
- .try_send_ref(self.inner.slots.as_ref())
|
|
|
+ .try_send_ref(self.inner.slots.as_ref(), &self.inner.recycle)
|
|
|
.map(SendRef)
|
|
|
}
|
|
|
|
|
|
pub fn try_send(&self, val: T) -> Result<(), TrySendError<T>> {
|
|
|
- self.inner.core.try_send(self.inner.slots.as_ref(), val)
|
|
|
+ self.inner
|
|
|
+ .core
|
|
|
+ .try_send(self.inner.slots.as_ref(), val, &self.inner.recycle)
|
|
|
}
|
|
|
|
|
|
pub async fn send_ref(&self) -> Result<SendRef<'_, T>, Closed> {
|
|
|
SendRefFuture {
|
|
|
core: &self.inner.core,
|
|
|
slots: self.inner.slots.as_ref(),
|
|
|
+ recycle: &self.inner.recycle,
|
|
|
state: State::Start,
|
|
|
waiter: queue::Waiter::new(),
|
|
|
}
|
|
@@ -260,7 +287,7 @@ impl<T: Default> Sender<T> {
|
|
|
}
|
|
|
|
|
|
#[cfg(feature = "alloc")]
|
|
|
-impl<T> Clone for Sender<T> {
|
|
|
+impl<T, R> Clone for Sender<T, R> {
|
|
|
fn clone(&self) -> Self {
|
|
|
test_dbg!(self.inner.core.tx_count.fetch_add(1, Ordering::Relaxed));
|
|
|
Self {
|
|
@@ -270,7 +297,7 @@ impl<T> Clone for Sender<T> {
|
|
|
}
|
|
|
|
|
|
#[cfg(feature = "alloc")]
|
|
|
-impl<T> Drop for Sender<T> {
|
|
|
+impl<T, R> Drop for Sender<T, R> {
|
|
|
fn drop(&mut self) {
|
|
|
if test_dbg!(self.inner.core.tx_count.fetch_sub(1, Ordering::Release)) > 1 {
|
|
|
return;
|
|
@@ -286,7 +313,7 @@ impl<T> Drop for Sender<T> {
|
|
|
// === impl Receiver ===
|
|
|
|
|
|
#[cfg(feature = "alloc")]
|
|
|
-impl<T: Default> Receiver<T> {
|
|
|
+impl<T, R> Receiver<T, R> {
|
|
|
pub fn recv_ref(&self) -> RecvRefFuture<'_, T> {
|
|
|
RecvRefFuture {
|
|
|
core: &self.inner.core,
|
|
@@ -294,10 +321,14 @@ impl<T: Default> Receiver<T> {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- pub fn recv(&self) -> RecvFuture<'_, T> {
|
|
|
+ pub fn recv(&self) -> RecvFuture<'_, T, R>
|
|
|
+ where
|
|
|
+ R: Recycle<T>,
|
|
|
+ {
|
|
|
RecvFuture {
|
|
|
core: &self.inner.core,
|
|
|
slots: self.inner.slots.as_ref(),
|
|
|
+ recycle: &self.inner.recycle,
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -331,9 +362,12 @@ impl<T: Default> Receiver<T> {
|
|
|
/// sender, or when the channel is closed. Note that on multiple calls to
|
|
|
/// `poll_recv`, only the [`Waker`] from the [`Context`] passed to the most
|
|
|
/// recent call is scheduled to receive a wakeup.
|
|
|
- pub fn poll_recv(&self, cx: &mut Context<'_>) -> Poll<Option<T>> {
|
|
|
+ pub fn poll_recv(&self, cx: &mut Context<'_>) -> Poll<Option<T>>
|
|
|
+ where
|
|
|
+ R: Recycle<T>,
|
|
|
+ {
|
|
|
self.poll_recv_ref(cx)
|
|
|
- .map(|opt| opt.map(|mut r| r.with_mut(core::mem::take)))
|
|
|
+ .map(|opt| opt.map(|mut r| recycling::take(&mut *r, &self.inner.recycle)))
|
|
|
}
|
|
|
|
|
|
pub fn is_closed(&self) -> bool {
|
|
@@ -342,7 +376,7 @@ impl<T: Default> Receiver<T> {
|
|
|
}
|
|
|
|
|
|
#[cfg(feature = "alloc")]
|
|
|
-impl<T> Drop for Receiver<T> {
|
|
|
+impl<T, R> Drop for Receiver<T, R> {
|
|
|
fn drop(&mut self) {
|
|
|
self.inner.core.close_rx();
|
|
|
}
|
|
@@ -350,19 +384,25 @@ impl<T> Drop for Receiver<T> {
|
|
|
|
|
|
// === impl StaticSender ===
|
|
|
|
|
|
-impl<T: Default> StaticSender<T> {
|
|
|
+impl<T, R> StaticSender<T, R>
|
|
|
+where
|
|
|
+ R: Recycle<T>,
|
|
|
+{
|
|
|
pub fn try_send_ref(&self) -> Result<SendRef<'_, T>, TrySendError> {
|
|
|
- self.core.try_send_ref(self.slots).map(SendRef)
|
|
|
+ self.core
|
|
|
+ .try_send_ref(self.slots, self.recycle)
|
|
|
+ .map(SendRef)
|
|
|
}
|
|
|
|
|
|
pub fn try_send(&self, val: T) -> Result<(), TrySendError<T>> {
|
|
|
- self.core.try_send(self.slots, val)
|
|
|
+ self.core.try_send(self.slots, val, self.recycle)
|
|
|
}
|
|
|
|
|
|
pub async fn send_ref(&self) -> Result<SendRef<'_, T>, Closed> {
|
|
|
SendRefFuture {
|
|
|
core: self.core,
|
|
|
slots: self.slots,
|
|
|
+ recycle: self.recycle,
|
|
|
state: State::Start,
|
|
|
waiter: queue::Waiter::new(),
|
|
|
}
|
|
@@ -386,11 +426,12 @@ impl<T> Clone for StaticSender<T> {
|
|
|
Self {
|
|
|
core: self.core,
|
|
|
slots: self.slots,
|
|
|
+ recycle: self.recycle,
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-impl<T> Drop for StaticSender<T> {
|
|
|
+impl<T, R> Drop for StaticSender<T, R> {
|
|
|
fn drop(&mut self) {
|
|
|
if test_dbg!(self.core.tx_count.fetch_sub(1, Ordering::Release)) > 1 {
|
|
|
return;
|
|
@@ -403,18 +444,19 @@ impl<T> Drop for StaticSender<T> {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-impl<T> fmt::Debug for StaticSender<T> {
|
|
|
+impl<T, R: fmt::Debug> fmt::Debug for StaticSender<T, R> {
|
|
|
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
|
|
f.debug_struct("StaticSender")
|
|
|
.field("core", &self.core)
|
|
|
.field("slots", &format_args!("&[..]"))
|
|
|
+ .field("recycle", self.recycle)
|
|
|
.finish()
|
|
|
}
|
|
|
}
|
|
|
|
|
|
// === impl StaticReceiver ===
|
|
|
|
|
|
-impl<T: Default> StaticReceiver<T> {
|
|
|
+impl<T, R> StaticReceiver<T, R> {
|
|
|
pub fn recv_ref(&self) -> RecvRefFuture<'_, T> {
|
|
|
RecvRefFuture {
|
|
|
core: self.core,
|
|
@@ -422,10 +464,14 @@ impl<T: Default> StaticReceiver<T> {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- pub fn recv(&self) -> RecvFuture<'_, T> {
|
|
|
+ pub fn recv(&self) -> RecvFuture<'_, T, R>
|
|
|
+ where
|
|
|
+ R: Recycle<T>,
|
|
|
+ {
|
|
|
RecvFuture {
|
|
|
core: self.core,
|
|
|
slots: self.slots,
|
|
|
+ recycle: self.recycle,
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -459,9 +505,12 @@ impl<T: Default> StaticReceiver<T> {
|
|
|
/// sender, or when the channel is closed. Note that on multiple calls to
|
|
|
/// `poll_recv`, only the [`Waker`] from the [`Context`] passed to the most
|
|
|
/// recent call is scheduled to receive a wakeup.
|
|
|
- pub fn poll_recv(&self, cx: &mut Context<'_>) -> Poll<Option<T>> {
|
|
|
+ pub fn poll_recv(&self, cx: &mut Context<'_>) -> Poll<Option<T>>
|
|
|
+ where
|
|
|
+ R: Recycle<T>,
|
|
|
+ {
|
|
|
self.poll_recv_ref(cx)
|
|
|
- .map(|opt| opt.map(|mut r| r.with_mut(core::mem::take)))
|
|
|
+ .map(|opt| opt.map(|mut r| recycling::take(&mut *r, self.recycle)))
|
|
|
}
|
|
|
|
|
|
pub fn is_closed(&self) -> bool {
|
|
@@ -469,17 +518,18 @@ impl<T: Default> StaticReceiver<T> {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-impl<T> Drop for StaticReceiver<T> {
|
|
|
+impl<T, R> Drop for StaticReceiver<T, R> {
|
|
|
fn drop(&mut self) {
|
|
|
self.core.close_rx();
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-impl<T> fmt::Debug for StaticReceiver<T> {
|
|
|
+impl<T, R: fmt::Debug> fmt::Debug for StaticReceiver<T, R> {
|
|
|
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
|
|
f.debug_struct("StaticReceiver")
|
|
|
.field("core", &self.core)
|
|
|
.field("slots", &format_args!("&[..]"))
|
|
|
+ .field("recycle", &self.recycle)
|
|
|
.finish()
|
|
|
}
|
|
|
}
|
|
@@ -487,7 +537,7 @@ impl<T> fmt::Debug for StaticReceiver<T> {
|
|
|
// === impl RecvRefFuture ===
|
|
|
|
|
|
#[inline]
|
|
|
-fn poll_recv_ref<'a, T: Default>(
|
|
|
+fn poll_recv_ref<'a, T>(
|
|
|
core: &'a ChannelCore<Waker>,
|
|
|
slots: &'a [Slot<T>],
|
|
|
cx: &mut Context<'_>,
|
|
@@ -501,7 +551,7 @@ fn poll_recv_ref<'a, T: Default>(
|
|
|
})
|
|
|
}
|
|
|
|
|
|
-impl<'a, T: Default> Future for RecvRefFuture<'a, T> {
|
|
|
+impl<'a, T> Future for RecvRefFuture<'a, T> {
|
|
|
type Output = Option<RecvRef<'a, T>>;
|
|
|
|
|
|
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
|
@@ -511,20 +561,24 @@ impl<'a, T: Default> Future for RecvRefFuture<'a, T> {
|
|
|
|
|
|
// === impl Recv ===
|
|
|
|
|
|
-impl<'a, T: Default> Future for RecvFuture<'a, T> {
|
|
|
+impl<'a, T, R> Future for RecvFuture<'a, T, R>
|
|
|
+where
|
|
|
+ R: Recycle<T>,
|
|
|
+{
|
|
|
type Output = Option<T>;
|
|
|
|
|
|
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
|
|
poll_recv_ref(self.core, self.slots, cx)
|
|
|
- .map(|opt| opt.map(|mut r| r.with_mut(core::mem::take)))
|
|
|
+ .map(|opt| opt.map(|mut r| recycling::take(&mut *r, self.recycle)))
|
|
|
}
|
|
|
}
|
|
|
|
|
|
// === impl SendRefFuture ===
|
|
|
|
|
|
-impl<'sender, T: Default + 'sender> Future for SendRefFuture<'sender, T>
|
|
|
+impl<'sender, T, R> Future for SendRefFuture<'sender, T, R>
|
|
|
where
|
|
|
- T: Default + 'sender,
|
|
|
+ R: Recycle<T> + 'sender,
|
|
|
+ T: 'sender,
|
|
|
{
|
|
|
type Output = Result<SendRef<'sender, T>, Closed>;
|
|
|
|
|
@@ -536,7 +590,7 @@ where
|
|
|
let node = this.waiter;
|
|
|
match test_dbg!(*this.state) {
|
|
|
State::Start => {
|
|
|
- match this.core.try_send_ref(this.slots) {
|
|
|
+ match this.core.try_send_ref(this.slots, *this.recycle) {
|
|
|
Ok(slot) => return Poll::Ready(Ok(SendRef(slot))),
|
|
|
Err(TrySendError::Closed(_)) => return Poll::Ready(Err(Closed(()))),
|
|
|
Err(_) => {}
|
|
@@ -573,7 +627,7 @@ where
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
- State::Done => match this.core.try_send_ref(this.slots) {
|
|
|
+ State::Done => match this.core.try_send_ref(this.slots, *this.recycle) {
|
|
|
Ok(slot) => return Poll::Ready(Ok(SendRef(slot))),
|
|
|
Err(TrySendError::Closed(_)) => return Poll::Ready(Err(Closed(()))),
|
|
|
Err(_) => {
|
|
@@ -586,7 +640,7 @@ where
|
|
|
}
|
|
|
|
|
|
#[pin_project::pinned_drop]
|
|
|
-impl<T> PinnedDrop for SendRefFuture<'_, T> {
|
|
|
+impl<T, R> PinnedDrop for SendRefFuture<'_, T, R> {
|
|
|
fn drop(self: Pin<&mut Self>) {
|
|
|
test_println!("SendRefFuture::drop({:p})", self);
|
|
|
let this = self.project();
|
|
@@ -598,16 +652,17 @@ impl<T> PinnedDrop for SendRefFuture<'_, T> {
|
|
|
|
|
|
feature! {
|
|
|
#![feature = "alloc"]
|
|
|
- impl<T> fmt::Debug for Inner<T> {
|
|
|
+ impl<T, R: fmt::Debug> fmt::Debug for Inner<T, R> {
|
|
|
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
|
|
f.debug_struct("Inner")
|
|
|
.field("core", &self.core)
|
|
|
.field("slots", &format_args!("Box<[..]>"))
|
|
|
+ .field("recycle", &self.recycle)
|
|
|
.finish()
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- impl<T> Drop for Inner<T> {
|
|
|
+ impl<T, R> Drop for Inner<T, R> {
|
|
|
fn drop(&mut self) {
|
|
|
self.core.core.drop_slots(&mut self.slots[..])
|
|
|
}
|