blocking.rs 40 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071
  1. //! A synchronous multi-producer, single-consumer channel.
  2. //!
  3. //! This provides an equivalent API to the [`mpsc`](crate::mpsc) module, but the
  4. //! [`Receiver`] types in this module wait by blocking the current thread,
  5. //! rather than asynchronously yielding.
  6. use super::*;
  7. use crate::{
  8. loom::{
  9. atomic::{self, Ordering},
  10. sync::Arc,
  11. thread::{self, Thread},
  12. },
  13. recycling::{self, Recycle},
  14. util::Backoff,
  15. wait::queue,
  16. Ref,
  17. };
  18. use core::{fmt, pin::Pin};
  19. use errors::*;
  20. /// Returns a new synchronous multi-producer, single consumer (MPSC)
  21. /// channel with the provided capacity.
  22. ///
  23. /// This channel will use the [default recycling policy].
  24. ///
  25. /// [recycling policy]: crate::recycling::DefaultRecycle
  26. pub fn channel<T: Default + Clone>(capacity: usize) -> (Sender<T>, Receiver<T>) {
  27. with_recycle(capacity, recycling::DefaultRecycle::new())
  28. }
  29. /// Returns a new synchronous multi-producer, single consumer channel with
  30. /// the provided [recycling policy].
  31. ///
  32. /// [recycling policy]: crate::recycling::Recycle
  33. pub fn with_recycle<T, R: Recycle<T>>(
  34. capacity: usize,
  35. recycle: R,
  36. ) -> (Sender<T, R>, Receiver<T, R>) {
  37. assert!(capacity > 0);
  38. let inner = Arc::new(Inner {
  39. core: ChannelCore::new(capacity),
  40. slots: Slot::make_boxed_array(capacity),
  41. recycle,
  42. });
  43. let tx = Sender {
  44. inner: inner.clone(),
  45. };
  46. let rx = Receiver { inner };
  47. (tx, rx)
  48. }
  49. /// Synchronously receives values from associated [`Sender`]s.
  50. ///
  51. /// Instances of this struct are created by the [`channel`] and
  52. /// [`with_recycle`] functions.
  53. #[derive(Debug)]
  54. pub struct Sender<T, R = recycling::DefaultRecycle> {
  55. inner: Arc<Inner<T, R>>,
  56. }
  57. /// Synchronously sends values to an associated [`Receiver`].
  58. ///
  59. /// Instances of this struct are created by the [`channel`] and
  60. /// [`with_recycle`] functions.
  61. #[derive(Debug)]
  62. pub struct Receiver<T, R = recycling::DefaultRecycle> {
  63. inner: Arc<Inner<T, R>>,
  64. }
  65. struct Inner<T, R> {
  66. core: super::ChannelCore<Thread>,
  67. slots: Box<[Slot<T>]>,
  68. recycle: R,
  69. }
  70. #[cfg(not(all(loom, test)))]
  71. feature! {
  72. #![feature = "static"]
  73. use crate::loom::atomic::AtomicBool;
  74. /// A statically-allocated, blocking bounded MPSC channel.
  75. ///
  76. /// A statically-allocated channel allows using a MPSC channel without
  77. /// requiring _any_ heap allocations. The [asynchronous variant][async] may be
  78. /// used in `#![no_std]` environments without requiring `liballoc`. This is a
  79. /// synchronous version which requires the Rust standard library, because it
  80. /// blocks the current thread in order to wait for send capacity. However, in
  81. /// some cases, it may offer _very slightly_ better performance than the
  82. /// non-static blocking channel due to requiring fewer heap pointer
  83. /// dereferences.
  84. ///
  85. /// In order to use a statically-allocated channel, a `StaticChannel` must
  86. /// be constructed in a `static` initializer. This reserves storage for the
  87. /// channel's message queue at compile-time. Then, at runtime, the channel
  88. /// is [`split`] into a [`StaticSender`]/[`StaticReceiver`] pair in order to
  89. /// be used.
  90. ///
  91. /// # Examples
  92. ///
  93. /// ```
  94. /// use thingbuf::mpsc::blocking::StaticChannel;
  95. ///
  96. /// // Construct a statically-allocated channel of `usize`s with a capacity
  97. /// // of 16 messages.
  98. /// static MY_CHANNEL: StaticChannel<usize, 16> = StaticChannel::new();
  99. ///
  100. /// fn main() {
  101. /// // Split the `StaticChannel` into a sender-receiver pair.
  102. /// let (tx, rx) = MY_CHANNEL.split();
  103. ///
  104. /// // Now, `tx` and `rx` can be used just like any other async MPSC
  105. /// // channel...
  106. /// # drop(tx); drop(rx);
  107. /// }
  108. /// ```
  109. ///
  110. /// [async]: crate::mpsc::StaticChannel
  111. /// [`split`]: StaticChannel::split
  112. pub struct StaticChannel<T, const CAPACITY: usize, R = recycling::DefaultRecycle> {
  113. core: ChannelCore<Thread>,
  114. slots: [Slot<T>; CAPACITY],
  115. is_split: AtomicBool,
  116. recycle: R,
  117. }
  118. /// Synchronously sends values to an associated [`StaticReceiver`].
  119. ///
  120. /// Instances of this struct are created by the [`StaticChannel::split`] and
  121. /// [``StaticChannel::try_split`] functions.
  122. pub struct StaticSender<T: 'static, R: 'static = recycling::DefaultRecycle> {
  123. core: &'static ChannelCore<Thread>,
  124. slots: &'static [Slot<T>],
  125. recycle: &'static R,
  126. }
  127. /// Synchronously receives values from associated [`StaticSender`]s.
  128. ///
  129. /// Instances of this struct are created by the [`StaticChannel::split`] and
  130. /// [``StaticChannel::try_split`] functions.
  131. pub struct StaticReceiver<T: 'static, R: 'static = recycling::DefaultRecycle> {
  132. core: &'static ChannelCore<Thread>,
  133. slots: &'static [Slot<T>],
  134. recycle: &'static R,
  135. }
  136. // === impl StaticChannel ===
  137. impl<T, const CAPACITY: usize> StaticChannel<T, CAPACITY> {
  138. /// Constructs a new statically-allocated, blocking bounded MPSC channel.
  139. ///
  140. /// A statically-allocated channel allows using a MPSC channel without
  141. /// requiring _any_ heap allocations. The [asynchronous variant][async] may be
  142. /// used in `#![no_std]` environments without requiring `liballoc`. This is a
  143. /// synchronous version which requires the Rust standard library, because it
  144. /// blocks the current thread in order to wait for send capacity. However, in
  145. /// some cases, it may offer _very slightly_ better performance than the
  146. /// non-static blocking channel due to requiring fewer heap pointer
  147. /// dereferences.
  148. ///
  149. /// In order to use a statically-allocated channel, a `StaticChannel` must
  150. /// be constructed in a `static` initializer. This reserves storage for the
  151. /// channel's message queue at compile-time. Then, at runtime, the channel
  152. /// is [`split`] into a [`StaticSender`]/[`StaticReceiver`] pair in order to
  153. /// be used.
  154. ///
  155. /// # Examples
  156. ///
  157. /// ```
  158. /// use thingbuf::mpsc::StaticChannel;
  159. ///
  160. /// // Construct a statically-allocated channel of `usize`s with a capacity
  161. /// // of 16 messages.
  162. /// static MY_CHANNEL: StaticChannel<usize, 16> = StaticChannel::new();
  163. ///
  164. /// fn main() {
  165. /// // Split the `StaticChannel` into a sender-receiver pair.
  166. /// let (tx, rx) = MY_CHANNEL.split();
  167. ///
  168. /// // Now, `tx` and `rx` can be used just like any other async MPSC
  169. /// // channel...
  170. /// # drop(tx); drop(rx);
  171. /// }
  172. /// ```
  173. ///
  174. /// [async]: crate::mpsc::StaticChannel
  175. /// [`split`]: StaticChannel::split
  176. pub const fn new() -> Self {
  177. Self {
  178. core: ChannelCore::new(CAPACITY),
  179. slots: Slot::make_static_array::<CAPACITY>(),
  180. is_split: AtomicBool::new(false),
  181. recycle: recycling::DefaultRecycle::new(),
  182. }
  183. }
  184. }
  185. impl<T, R, const CAPACITY: usize> StaticChannel<T, CAPACITY, R> {
  186. /// Split a [`StaticChannel`] into a [`StaticSender`]/[`StaticReceiver`]
  187. /// pair.
  188. ///
  189. /// A static channel can only be split a single time. If
  190. /// [`StaticChannel::split`] or [`StaticChannel::try_split`] have been
  191. /// called previously, this method will panic. For a non-panicking version
  192. /// of this method, see [`StaticChannel::try_split`].
  193. ///
  194. /// # Panics
  195. ///
  196. /// If the channel has already been split.
  197. pub fn split(&'static self) -> (StaticSender<T, R>, StaticReceiver<T, R>) {
  198. self.try_split().expect("channel already split")
  199. }
  200. /// Try to split a [`StaticChannel`] into a [`StaticSender`]/[`StaticReceiver`]
  201. /// pair, returning `None` if it has already been split.
  202. ///
  203. /// A static channel can only be split a single time. If
  204. /// [`StaticChannel::split`] or [`StaticChannel::try_split`] have been
  205. /// called previously, this method returns `None`.
  206. pub fn try_split(&'static self) -> Option<(StaticSender<T, R>, StaticReceiver<T, R>)> {
  207. self.is_split
  208. .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
  209. .ok()?;
  210. let tx = StaticSender {
  211. core: &self.core,
  212. slots: &self.slots[..],
  213. recycle: &self.recycle,
  214. };
  215. let rx = StaticReceiver {
  216. core: &self.core,
  217. slots: &self.slots[..],
  218. recycle: &self.recycle,
  219. };
  220. Some((tx, rx))
  221. }
  222. }
  223. // === impl StaticSender ===
  224. impl<T, R> StaticSender<T, R>
  225. where
  226. R: Recycle<T>,
  227. {
  228. /// Reserves a slot in the channel to mutate in place, blocking until
  229. /// there is a free slot to write to.
  230. ///
  231. /// This is similar to the [`send`] method, but, rather than taking a
  232. /// message by value to write to the channel, this method reserves a
  233. /// writable slot in the channel, and returns a [`SendRef`] that allows
  234. /// mutating the slot in place. If the [`StaticReceiver`] end of the
  235. /// channel uses the [`StaticReceiver::recv_ref`] method for receiving
  236. /// from the channel, this allows allocations for channel messages to be
  237. /// reused in place.
  238. ///
  239. /// # Errors
  240. ///
  241. /// If the [`StaticReceiver`] end of the channel has been dropped, this
  242. /// returns a [`Closed`] error.
  243. ///
  244. /// # Examples
  245. ///
  246. /// Sending formatted strings by writing them directly to channel slots,
  247. /// in place:
  248. /// ```
  249. /// use thingbuf::mpsc::blocking::StaticChannel;
  250. /// use std::{fmt::Write, thread};
  251. ///
  252. /// static CHANNEL: StaticChannel<String, 8> = StaticChannel::new();
  253. ///
  254. /// let (tx, rx) = CHANNEL.split();
  255. ///
  256. /// // Spawn a thread that prints each message received from the channel:
  257. /// thread::spawn(move || {
  258. /// for _ in 0..10 {
  259. /// let msg = rx.recv_ref().unwrap();
  260. /// println!("{}", msg);
  261. /// }
  262. /// });
  263. ///
  264. /// // Until the channel closes, write formatted messages to the channel.
  265. /// let mut count = 1;
  266. /// while let Ok(mut value) = tx.send_ref() {
  267. /// // Writing to the `SendRef` will reuse the *existing* string
  268. /// // allocation in place.
  269. /// write!(value, "hello from message {}", count)
  270. /// .expect("writing to a `String` should never fail");
  271. /// count += 1;
  272. /// }
  273. /// ```
  274. ///
  275. /// [`send`]: Self::send
  276. pub fn send_ref(&self) -> Result<SendRef<'_, T>, Closed> {
  277. send_ref(self.core, self.slots, self.recycle)
  278. }
  279. /// Sends a message by value, blocking until there is a free slot to
  280. /// write to.
  281. ///
  282. /// This method takes the message by value, and replaces any previous
  283. /// value in the slot. This means that the channel will *not* function
  284. /// as an object pool while sending messages with `send`. This method is
  285. /// most appropriate when messages don't own reusable heap allocations,
  286. /// or when the [`StaticReceiver`] end of the channel must receive messages
  287. /// by moving them out of the channel by value (using the
  288. /// [`StaticReceiver::recv`] method). When messages in the channel own
  289. /// reusable heap allocations (such as `String`s or `Vec`s), and the
  290. /// [`StaticReceiver`] doesn't need to receive them by value, consider using
  291. /// [`send_ref`] instead, to enable allocation reuse.
  292. ///
  293. /// # Errors
  294. ///
  295. /// If the [`StaticReceiver`] end of the channel has been dropped, this
  296. /// returns a [`Closed`] error containing the sent value.
  297. ///
  298. /// # Examples
  299. ///
  300. /// ```
  301. /// use thingbuf::mpsc::blocking::StaticChannel;
  302. /// use std::{fmt::Write, thread};
  303. ///
  304. /// static CHANNEL: StaticChannel<i32, 8> = StaticChannel::new();
  305. /// let (tx, rx) = CHANNEL.split();
  306. ///
  307. /// // Spawn a thread that prints each message received from the channel:
  308. /// thread::spawn(move || {
  309. /// for _ in 0..10 {
  310. /// let msg = rx.recv().unwrap();
  311. /// println!("received message {}", msg);
  312. /// }
  313. /// });
  314. ///
  315. /// // Until the channel closes, write the current iteration to the channel.
  316. /// let mut count = 1;
  317. /// while tx.send(count).is_ok() {
  318. /// count += 1;
  319. /// }
  320. /// ```
  321. /// [`send_ref`]: Self::send_ref
  322. pub fn send(&self, val: T) -> Result<(), Closed<T>> {
  323. match self.send_ref() {
  324. Err(Closed(())) => Err(Closed(val)),
  325. Ok(mut slot) => {
  326. *slot = val;
  327. Ok(())
  328. }
  329. }
  330. }
  331. /// Attempts to reserve a slot in the channel to mutate in place,
  332. /// without blocking until capacity is available.
  333. ///
  334. /// This method differs from [`send_ref`] by returning immediately if the
  335. /// channel’s buffer is full or no [`StaticReceiver`] exists. Compared with
  336. /// [`send_ref`], this method has two failure cases instead of one (one for
  337. /// disconnection, one for a full buffer), and this method will never block.
  338. ///
  339. /// Like [`send_ref`], this method returns a [`SendRef`] that may be
  340. /// used to mutate a slot in the channel's buffer in place. Dropping the
  341. /// [`SendRef`] completes the send operation and makes the mutated value
  342. /// available to the [`StaticReceiver`].
  343. ///
  344. /// # Errors
  345. ///
  346. /// If the channel capacity has been reached (i.e., the channel has `n`
  347. /// buffered values where `n` is the `usize` const generic parameter of
  348. /// the [`StaticChannel`]), then [`TrySendError::Full`] is returned. In
  349. /// this case, a future call to `try_send` may succeed if additional
  350. /// capacity becomes available.
  351. ///
  352. /// If the receive half of the channel is closed (i.e., the [`StaticReceiver`]
  353. /// handle was dropped), the function returns [`TrySendError::Closed`].
  354. /// Once the channel has closed, subsequent calls to `try_send_ref` will
  355. /// never succeed.
  356. ///
  357. /// [`send_ref`]: Self::send_ref
  358. pub fn try_send_ref(&self) -> Result<SendRef<'_, T>, TrySendError> {
  359. self.core
  360. .try_send_ref(self.slots, self.recycle)
  361. .map(SendRef)
  362. }
  363. /// Attempts to send a message by value immediately, without blocking until
  364. /// capacity is available.
  365. ///
  366. /// This method differs from [`send`] by returning immediately if the
  367. /// channel’s buffer is full or no [`StaticReceiver`] exists. Compared
  368. /// with [`send`], this method has two failure cases instead of one (one for
  369. /// disconnection, one for a full buffer), and this method will never block.
  370. ///
  371. /// # Errors
  372. ///
  373. /// If the channel capacity has been reached (i.e., the channel has `n`
  374. /// buffered values where `n` is the `usize` const generic parameter of
  375. /// the [`StaticChannel`]), then [`TrySendError::Full`] is returned. In
  376. /// this case, a future call to `try_send` may succeed if additional
  377. /// capacity becomes available.
  378. ///
  379. /// If the receive half of the channel is closed (i.e., the
  380. /// [`StaticReceiver`] handle was dropped), the function returns
  381. /// [`TrySendError::Closed`]. Once the channel has closed, subsequent
  382. /// calls to `try_send` will never succeed.
  383. ///
  384. /// In both cases, the error includes the value passed to `try_send`.
  385. ///
  386. /// [`send`]: Self::send
  387. pub fn try_send(&self, val: T) -> Result<(), TrySendError<T>> {
  388. self.core.try_send(self.slots, val, self.recycle)
  389. }
  390. }
  391. impl<T, R> Clone for StaticSender<T, R> {
  392. fn clone(&self) -> Self {
  393. test_dbg!(self.core.tx_count.fetch_add(1, Ordering::Relaxed));
  394. Self {
  395. core: self.core,
  396. slots: self.slots,
  397. recycle: self.recycle,
  398. }
  399. }
  400. }
  401. impl<T, R> Drop for StaticSender<T, R> {
  402. fn drop(&mut self) {
  403. if test_dbg!(self.core.tx_count.fetch_sub(1, Ordering::Release)) > 1 {
  404. return;
  405. }
  406. // if we are the last sender, synchronize
  407. test_dbg!(atomic::fence(Ordering::SeqCst));
  408. if self.core.core.close() {
  409. self.core.rx_wait.close_tx();
  410. }
  411. }
  412. }
  413. impl<T, R: fmt::Debug> fmt::Debug for StaticSender<T, R> {
  414. fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
  415. f.debug_struct("StaticSender")
  416. .field("core", &self.core)
  417. .field("slots", &format_args!("&[..]"))
  418. .field("recycle", &self.recycle)
  419. .finish()
  420. }
  421. }
  422. // === impl StaticReceiver ===
  423. impl<T, R> StaticReceiver<T, R> {
  424. /// Receives the next message for this receiver, **by reference**.
  425. ///
  426. /// This method returns `None` if the channel has been closed and there are
  427. /// no remaining messages in the channel's buffer. This indicates that no
  428. /// further values can ever be received from this `StaticReceiver`. The channel is
  429. /// closed when all [`StaticSender`]s have been dropped.
  430. ///
  431. /// If there are no messages in the channel's buffer, but the channel has
  432. /// not yet been closed, this method will block until a message is sent or
  433. /// the channel is closed.
  434. ///
  435. /// This method returns a [`RecvRef`] that can be used to read from (or
  436. /// mutate) the received message by reference. When the [`RecvRef`] is
  437. /// dropped, the receive operation completes and the slot occupied by
  438. /// the received message becomes usable for a future [`send_ref`] operation.
  439. ///
  440. /// If all [`StaticSender`]s for this channel write to the channel's
  441. /// slots in place by using the [`send_ref`] or [`try_send_ref`]
  442. /// methods, this method allows messages that own heap allocations to be reused in
  443. /// place.
  444. ///
  445. /// # Examples
  446. ///
  447. /// ```
  448. /// use thingbuf::mpsc::blocking::StaticChannel;
  449. /// use std::{thread, fmt::Write};
  450. ///
  451. /// static CHANNEL: StaticChannel<String, 100> = StaticChannel::new();
  452. /// let (tx, rx) = CHANNEL.split();
  453. ///
  454. /// thread::spawn(move || {
  455. /// let mut value = tx.send_ref().unwrap();
  456. /// write!(value, "hello world!")
  457. /// .expect("writing to a `String` should never fail");
  458. /// });
  459. ///
  460. /// assert_eq!(Some("hello world!"), rx.recv_ref().as_deref().map(String::as_str));
  461. /// assert_eq!(None, rx.recv().as_deref());
  462. /// ```
  463. ///
  464. /// Values are buffered:
  465. ///
  466. /// ```
  467. /// use thingbuf::mpsc::blocking::StaticChannel;
  468. /// use std::fmt::Write;
  469. ///
  470. /// static CHANNEL: StaticChannel<String, 100> = StaticChannel::new();
  471. /// let (tx, rx) = CHANNEL.split();
  472. ///
  473. /// write!(tx.send_ref().unwrap(), "hello").unwrap();
  474. /// write!(tx.send_ref().unwrap(), "world").unwrap();
  475. ///
  476. /// assert_eq!("hello", rx.recv_ref().unwrap().as_str());
  477. /// assert_eq!("world", rx.recv_ref().unwrap().as_str());
  478. /// ```
  479. ///
  480. /// [`send_ref`]: StaticSender::send_ref
  481. /// [`try_send_ref`]: StaticSender::try_send_ref
  482. pub fn recv_ref(&self) -> Option<RecvRef<'_, T>> {
  483. recv_ref(self.core, self.slots)
  484. }
  485. /// Receives the next message for this receiver, **by value**.
  486. ///
  487. /// This method returns `None` if the channel has been closed and there are
  488. /// no remaining messages in the channel's buffer. This indicates that no
  489. /// further values can ever be received from this `StaticReceiver`. The channel is
  490. /// closed when all [`StaticSender`]s have been dropped.
  491. ///
  492. /// If there are no messages in the channel's buffer, but the channel has
  493. /// not yet been closed, this method will block until a message is sent or
  494. /// the channel is closed.
  495. ///
  496. /// When a message is received, it is moved out of the channel by value,
  497. /// and replaced with a new slot according to the configured [recycling
  498. /// policy]. If all [`StaticSender`]s for this channel write to the channel's
  499. /// slots in place by using the [`send_ref`] or [`try_send_ref`] methods,
  500. /// consider using the [`recv_ref`] method instead, to enable the
  501. /// reuse of heap allocations.
  502. ///
  503. /// # Examples
  504. ///
  505. /// ```
  506. /// use thingbuf::mpsc::blocking::StaticChannel;
  507. /// use std::thread;
  508. ///
  509. /// static CHANNEL: StaticChannel<i32, 100> = StaticChannel::new();
  510. /// let (tx, rx) = CHANNEL.split();
  511. ///
  512. /// thread::spawn(move || {
  513. /// tx.send(1).unwrap();
  514. /// });
  515. ///
  516. /// assert_eq!(Some(1), rx.recv());
  517. /// assert_eq!(None, rx.recv());
  518. /// ```
  519. ///
  520. /// Values are buffered:
  521. ///
  522. /// ```
  523. /// use thingbuf::mpsc::blocking::StaticChannel;
  524. ///
  525. /// static CHANNEL: StaticChannel<i32, 100> = StaticChannel::new();
  526. /// let (tx, rx) = CHANNEL.split();
  527. ///
  528. /// tx.send(1).unwrap();
  529. /// tx.send(2).unwrap();
  530. ///
  531. /// assert_eq!(Some(1), rx.recv());
  532. /// assert_eq!(Some(2), rx.recv());
  533. /// ```
  534. ///
  535. /// [`send_ref`]: StaticSender::send_ref
  536. /// [`try_send_ref`]: StaticSender::try_send_ref
  537. /// [recycling policy]: crate::recycling::Recycle
  538. /// [`recv_ref`]: Self::recv_ref
  539. pub fn recv(&self) -> Option<T>
  540. where
  541. R: Recycle<T>,
  542. {
  543. let mut val = self.recv_ref()?;
  544. Some(recycling::take(&mut *val, self.recycle))
  545. }
  546. /// Returns `true` if the channel has closed (all corresponding
  547. /// [`StaticSender`]s have been dropped).
  548. ///
  549. /// If this method returns `true`, no new messages will become available
  550. /// on this channel. Previously sent messages may still be available.
  551. pub fn is_closed(&self) -> bool {
  552. test_dbg!(self.core.tx_count.load(Ordering::SeqCst)) <= 1
  553. }
  554. }
  555. impl<'a, T, R> Iterator for &'a StaticReceiver<T, R> {
  556. type Item = RecvRef<'a, T>;
  557. fn next(&mut self) -> Option<Self::Item> {
  558. self.recv_ref()
  559. }
  560. }
  561. impl<T, R> Drop for StaticReceiver<T, R> {
  562. fn drop(&mut self) {
  563. self.core.close_rx();
  564. }
  565. }
  566. impl<T, R: fmt::Debug> fmt::Debug for StaticReceiver<T, R> {
  567. fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
  568. f.debug_struct("StaticReceiver")
  569. .field("core", &self.core)
  570. .field("slots", &format_args!("&[..]"))
  571. .field("recycle", &self.recycle)
  572. .finish()
  573. }
  574. }
  575. }
  576. impl_send_ref! {
  577. /// A reference to a message being sent to a blocking channel.
  578. ///
  579. /// A `SendRef` represents the exclusive permission to mutate a given
  580. /// element in a channel. A `SendRef<T>` [implements `DerefMut<T>`] to allow
  581. /// writing to that element. This is analogous to the [`Ref`] type, except
  582. /// that it completes a `send_ref` or `try_send_ref` operation when it is
  583. /// dropped.
  584. ///
  585. /// This type is returned by the [`Sender::send_ref`] and
  586. /// [`Sender::try_send_ref`] (or [`StaticSender::send_ref`] and
  587. /// [`StaticSender::try_send_ref`]) methods.
  588. ///
  589. /// [implements `DerefMut<T>`]: #impl-DerefMut
  590. /// [`Ref`]: crate::Ref
  591. pub struct SendRef<Thread>;
  592. }
  593. impl_recv_ref! {
  594. /// A reference to a message being received from a blocking channel.
  595. ///
  596. /// A `RecvRef` represents the exclusive permission to mutate a given
  597. /// element in a channel. A `RecvRef<T>` [implements `DerefMut<T>`] to allow
  598. /// writing to that element. This is analogous to the [`Ref`] type, except
  599. /// that it completes a `recv_ref` operation when it is dropped.
  600. ///
  601. /// This type is returned by the [`Receiver::recv_ref`] and
  602. /// [`StaticReceiver::recv_ref`] methods.
  603. ///
  604. /// [implements `DerefMut<T>`]: #impl-DerefMut
  605. /// [`Ref`]: crate::Ref
  606. pub struct RecvRef<Thread>;
  607. }
  608. // === impl Sender ===
  609. impl<T, R> Sender<T, R>
  610. where
  611. R: Recycle<T>,
  612. {
  613. /// Reserves a slot in the channel to mutate in place, blocking until
  614. /// there is a free slot to write to.
  615. ///
  616. /// This is similar to the [`send`] method, but, rather than taking a
  617. /// message by value to write to the channel, this method reserves a
  618. /// writable slot in the channel, and returns a [`SendRef`] that allows
  619. /// mutating the slot in place. If the [`Receiver`] end of the channel
  620. /// uses the [`Receiver::recv_ref`] method for receiving from the channel,
  621. /// this allows allocations for channel messages to be reused in place.
  622. ///
  623. /// # Errors
  624. ///
  625. /// If the [`Receiver`] end of the channel has been dropped, this
  626. /// returns a [`Closed`] error.
  627. ///
  628. /// # Examples
  629. ///
  630. /// Sending formatted strings by writing them directly to channel slots,
  631. /// in place:
  632. /// ```
  633. /// use thingbuf::mpsc::blocking;
  634. /// use std::{fmt::Write, thread};
  635. ///
  636. /// let (tx, rx) = blocking::channel::<String>(8);
  637. ///
  638. /// // Spawn a thread that prints each message received from the channel:
  639. /// thread::spawn(move || {
  640. /// for _ in 0..10 {
  641. /// let msg = rx.recv_ref().unwrap();
  642. /// println!("{}", msg);
  643. /// }
  644. /// });
  645. ///
  646. /// // Until the channel closes, write formatted messages to the channel.
  647. /// let mut count = 1;
  648. /// while let Ok(mut value) = tx.send_ref() {
  649. /// // Writing to the `SendRef` will reuse the *existing* string
  650. /// // allocation in place.
  651. /// write!(value, "hello from message {}", count)
  652. /// .expect("writing to a `String` should never fail");
  653. /// count += 1;
  654. /// }
  655. /// ```
  656. ///
  657. /// [`send`]: Self::send
  658. pub fn send_ref(&self) -> Result<SendRef<'_, T>, Closed> {
  659. send_ref(
  660. &self.inner.core,
  661. self.inner.slots.as_ref(),
  662. &self.inner.recycle,
  663. )
  664. }
  665. /// Sends a message by value, blocking until there is a free slot to
  666. /// write to.
  667. ///
  668. /// This method takes the message by value, and replaces any previous
  669. /// value in the slot. This means that the channel will *not* function
  670. /// as an object pool while sending messages with `send`. This method is
  671. /// most appropriate when messages don't own reusable heap allocations,
  672. /// or when the [`Receiver`] end of the channel must receive messages by
  673. /// moving them out of the channel by value (using the
  674. /// [`Receiver::recv`] method). When messages in the channel own
  675. /// reusable heap allocations (such as `String`s or `Vec`s), and the
  676. /// [`Receiver`] doesn't need to receive them by value, consider using
  677. /// [`send_ref`] instead, to enable allocation reuse.
  678. ///
  679. /// # Errors
  680. ///
  681. /// If the [`Receiver`] end of the channel has been dropped, this
  682. /// returns a [`Closed`] error containing the sent value.
  683. ///
  684. /// # Examples
  685. ///
  686. /// ```
  687. /// use thingbuf::mpsc::blocking;
  688. /// use std::thread;
  689. ///
  690. /// let (tx, rx) = blocking::channel(8);
  691. ///
  692. /// // Spawn a thread that prints each message received from the channel:
  693. /// thread::spawn(move || {
  694. /// for _ in 0..10 {
  695. /// let msg = rx.recv().unwrap();
  696. /// println!("received message {}", msg);
  697. /// }
  698. /// });
  699. ///
  700. /// // Until the channel closes, write the current iteration to the channel.
  701. /// let mut count = 1;
  702. /// while tx.send(count).is_ok() {
  703. /// count += 1;
  704. /// }
  705. /// ```
  706. /// [`send_ref`]: Self::send_ref
  707. pub fn send(&self, val: T) -> Result<(), Closed<T>> {
  708. match self.send_ref() {
  709. Err(Closed(())) => Err(Closed(val)),
  710. Ok(mut slot) => {
  711. *slot = val;
  712. Ok(())
  713. }
  714. }
  715. }
  716. /// Attempts to reserve a slot in the channel to mutate in place,
  717. /// without blocking until capacity is available.
  718. ///
  719. /// This method differs from [`send_ref`] by returning immediately if the
  720. /// channel’s buffer is full or no [`Receiver`] exists. Compared with
  721. /// [`send_ref`], this method has two failure cases instead of one (one for
  722. /// disconnection, one for a full buffer), and this method will never block.
  723. ///
  724. /// Like [`send_ref`], this method returns a [`SendRef`] that may be
  725. /// used to mutate a slot in the channel's buffer in place. Dropping the
  726. /// [`SendRef`] completes the send operation and makes the mutated value
  727. /// available to the [`Receiver`].
  728. ///
  729. /// # Errors
  730. ///
  731. /// If the channel capacity has been reached (i.e., the channel has `n`
  732. /// buffered values where `n` is the argument passed to
  733. /// [`channel`]/[`with_recycle`]), then [`TrySendError::Full`] is
  734. /// returned. In this case, a future call to `try_send` may succeed if
  735. /// additional capacity becomes available.
  736. ///
  737. /// If the receive half of the channel is closed (i.e., the [`Receiver`]
  738. /// handle was dropped), the function returns [`TrySendError::Closed`].
  739. /// Once the channel has closed, subsequent calls to `try_send_ref` will
  740. /// never succeed.
  741. ///
  742. /// [`send_ref`]: Self::send_ref
  743. pub fn try_send_ref(&self) -> Result<SendRef<'_, T>, TrySendError> {
  744. self.inner
  745. .core
  746. .try_send_ref(self.inner.slots.as_ref(), &self.inner.recycle)
  747. .map(SendRef)
  748. }
  749. /// Attempts to send a message by value immediately, without blocking until
  750. /// capacity is available.
  751. ///
  752. /// This method differs from [`send`] by returning immediately if the
  753. /// channel’s buffer is full or no [`Receiver`] exists. Compared with
  754. /// [`send`], this method has two failure cases instead of one (one for
  755. /// disconnection, one for a full buffer), and this method will never block.
  756. ///
  757. /// # Errors
  758. ///
  759. /// If the channel capacity has been reached (i.e., the channel has `n`
  760. /// buffered values where `n` is the argument passed to
  761. /// [`channel`]/[`with_recycle`]), then [`TrySendError::Full`] is
  762. /// returned. In this case, a future call to `try_send` may succeed if
  763. /// additional capacity becomes available.
  764. ///
  765. /// If the receive half of the channel is closed (i.e., the [`Receiver`]
  766. /// handle was dropped), the function returns [`TrySendError::Closed`].
  767. /// Once the channel has closed, subsequent calls to `try_send` will
  768. /// never succeed.
  769. ///
  770. /// In both cases, the error includes the value passed to `try_send`.
  771. ///
  772. /// [`send`]: Self::send
  773. pub fn try_send(&self, val: T) -> Result<(), TrySendError<T>> {
  774. self.inner
  775. .core
  776. .try_send(self.inner.slots.as_ref(), val, &self.inner.recycle)
  777. }
  778. }
  779. impl<T, R> Clone for Sender<T, R> {
  780. fn clone(&self) -> Self {
  781. test_dbg!(self.inner.core.tx_count.fetch_add(1, Ordering::Relaxed));
  782. Self {
  783. inner: self.inner.clone(),
  784. }
  785. }
  786. }
  787. impl<T, R> Drop for Sender<T, R> {
  788. fn drop(&mut self) {
  789. if test_dbg!(self.inner.core.tx_count.fetch_sub(1, Ordering::Release)) > 1 {
  790. return;
  791. }
  792. // if we are the last sender, synchronize
  793. test_dbg!(atomic::fence(Ordering::SeqCst));
  794. if self.inner.core.core.close() {
  795. self.inner.core.rx_wait.close_tx();
  796. }
  797. }
  798. }
  799. // === impl Receiver ===
  800. impl<T, R> Receiver<T, R> {
  801. /// Receives the next message for this receiver, **by reference**.
  802. ///
  803. /// This method returns `None` if the channel has been closed and there are
  804. /// no remaining messages in the channel's buffer. This indicates that no
  805. /// further values can ever be received from this `Receiver`. The channel is
  806. /// closed when all [`Sender`]s have been dropped.
  807. ///
  808. /// If there are no messages in the channel's buffer, but the channel has
  809. /// not yet been closed, this method will block until a message is sent or
  810. /// the channel is closed.
  811. ///
  812. /// This method returns a [`RecvRef`] that can be used to read from (or
  813. /// mutate) the received message by reference. When the [`RecvRef`] is
  814. /// dropped, the receive operation completes and the slot occupied by
  815. /// the received message becomes usable for a future [`send_ref`] operation.
  816. ///
  817. /// If all [`Sender`]s for this channel write to the channel's slots in
  818. /// place by using the [`send_ref`] or [`try_send_ref`] methods, this
  819. /// method allows messages that own heap allocations to be reused in
  820. /// place.
  821. ///
  822. /// # Examples
  823. ///
  824. /// ```
  825. /// use thingbuf::mpsc::blocking;
  826. /// use std::{thread, fmt::Write};
  827. ///
  828. /// let (tx, rx) = blocking::channel::<String>(100);
  829. ///
  830. /// thread::spawn(move || {
  831. /// let mut value = tx.send_ref().unwrap();
  832. /// write!(value, "hello world!")
  833. /// .expect("writing to a `String` should never fail");
  834. /// });
  835. ///
  836. /// assert_eq!(Some("hello world!"), rx.recv_ref().as_deref().map(String::as_str));
  837. /// assert_eq!(None, rx.recv().as_deref());
  838. /// ```
  839. ///
  840. /// Values are buffered:
  841. ///
  842. /// ```
  843. /// use thingbuf::mpsc::blocking;
  844. /// use std::fmt::Write;
  845. ///
  846. /// let (tx, rx) = blocking::channel::<String>(100);
  847. ///
  848. /// write!(tx.send_ref().unwrap(), "hello").unwrap();
  849. /// write!(tx.send_ref().unwrap(), "world").unwrap();
  850. ///
  851. /// assert_eq!("hello", rx.recv_ref().unwrap().as_str());
  852. /// assert_eq!("world", rx.recv_ref().unwrap().as_str());
  853. /// ```
  854. ///
  855. /// [`send_ref`]: Sender::send_ref
  856. /// [`try_send_ref`]: Sender::try_send_ref
  857. pub fn recv_ref(&self) -> Option<RecvRef<'_, T>> {
  858. recv_ref(&self.inner.core, self.inner.slots.as_ref())
  859. }
  860. /// Receives the next message for this receiver, **by value**.
  861. ///
  862. /// This method returns `None` if the channel has been closed and there are
  863. /// no remaining messages in the channel's buffer. This indicates that no
  864. /// further values can ever be received from this `Receiver`. The channel is
  865. /// closed when all [`Sender`]s have been dropped.
  866. ///
  867. /// If there are no messages in the channel's buffer, but the channel has
  868. /// not yet been closed, this method will block until a message is sent or
  869. /// the channel is closed.
  870. ///
  871. /// When a message is received, it is moved out of the channel by value,
  872. /// and replaced with a new slot according to the configured [recycling
  873. /// policy]. If all [`Sender`]s for this channel write to the channel's
  874. /// slots in place by using the [`send_ref`] or [`try_send_ref`] methods,
  875. /// consider using the [`recv_ref`] method instead, to enable the
  876. /// reuse of heap allocations.
  877. ///
  878. /// # Examples
  879. ///
  880. /// ```
  881. /// use thingbuf::mpsc::blocking;
  882. /// use std::{thread, fmt::Write};
  883. ///
  884. /// let (tx, rx) = blocking::channel(100);
  885. ///
  886. /// thread::spawn(move || {
  887. /// tx.send(1).unwrap();
  888. /// });
  889. ///
  890. /// assert_eq!(Some(1), rx.recv());
  891. /// assert_eq!(None, rx.recv());
  892. /// ```
  893. ///
  894. /// Values are buffered:
  895. ///
  896. /// ```
  897. /// use thingbuf::mpsc::blocking;
  898. ///
  899. /// let (tx, rx) = blocking::channel(100);
  900. ///
  901. /// tx.send(1).unwrap();
  902. /// tx.send(2).unwrap();
  903. ///
  904. /// assert_eq!(Some(1), rx.recv());
  905. /// assert_eq!(Some(2), rx.recv());
  906. /// ```
  907. ///
  908. /// [`send_ref`]: Sender::send_ref
  909. /// [`try_send_ref`]: Sender::try_send_ref
  910. /// [recycling policy]: crate::recycling::Recycle
  911. /// [`recv_ref`]: Self::recv_ref
  912. pub fn recv(&self) -> Option<T>
  913. where
  914. R: Recycle<T>,
  915. {
  916. let mut val = self.recv_ref()?;
  917. Some(recycling::take(&mut *val, &self.inner.recycle))
  918. }
  919. /// Returns `true` if the channel has closed (all corresponding
  920. /// [`Sender`]s have been dropped).
  921. ///
  922. /// If this method returns `true`, no new messages will become available
  923. /// on this channel. Previously sent messages may still be available.
  924. pub fn is_closed(&self) -> bool {
  925. test_dbg!(self.inner.core.tx_count.load(Ordering::SeqCst)) <= 1
  926. }
  927. }
  928. impl<'a, T, R> Iterator for &'a Receiver<T, R> {
  929. type Item = RecvRef<'a, T>;
  930. fn next(&mut self) -> Option<Self::Item> {
  931. self.recv_ref()
  932. }
  933. }
  934. impl<T, R> Drop for Receiver<T, R> {
  935. fn drop(&mut self) {
  936. self.inner.core.close_rx();
  937. }
  938. }
  939. // === impl Inner ===
  940. impl<T, R: fmt::Debug> fmt::Debug for Inner<T, R> {
  941. fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
  942. f.debug_struct("Inner")
  943. .field("core", &self.core)
  944. .field("slots", &format_args!("Box<[..]>"))
  945. .field("recycle", &self.recycle)
  946. .finish()
  947. }
  948. }
  949. impl<T, R> Drop for Inner<T, R> {
  950. fn drop(&mut self) {
  951. self.core.core.drop_slots(&mut self.slots[..])
  952. }
  953. }
  954. #[inline]
  955. fn recv_ref<'a, T>(core: &'a ChannelCore<Thread>, slots: &'a [Slot<T>]) -> Option<RecvRef<'a, T>> {
  956. loop {
  957. match core.poll_recv_ref(slots, thread::current) {
  958. Poll::Ready(r) => {
  959. return r.map(|slot| RecvRef {
  960. _notify: super::NotifyTx(&core.tx_wait),
  961. slot,
  962. })
  963. }
  964. Poll::Pending => {
  965. test_println!("parking ({:?})", thread::current());
  966. thread::park();
  967. }
  968. }
  969. }
  970. }
  971. #[inline]
  972. fn send_ref<'a, T, R: Recycle<T>>(
  973. core: &'a ChannelCore<Thread>,
  974. slots: &'a [Slot<T>],
  975. recycle: &'a R,
  976. ) -> Result<SendRef<'a, T>, Closed<()>> {
  977. // fast path: avoid getting the thread and constructing the node if the
  978. // slot is immediately ready.
  979. match core.try_send_ref(slots, recycle) {
  980. Ok(slot) => return Ok(SendRef(slot)),
  981. Err(TrySendError::Closed(_)) => return Err(Closed(())),
  982. _ => {}
  983. }
  984. let mut waiter = queue::Waiter::new();
  985. let mut unqueued = true;
  986. let thread = thread::current();
  987. let mut boff = Backoff::new();
  988. loop {
  989. let node = unsafe {
  990. // Safety: in this case, it's totally safe to pin the waiter, as
  991. // it is owned uniquely by this function, and it cannot possibly
  992. // be moved while this thread is parked.
  993. Pin::new_unchecked(&mut waiter)
  994. };
  995. let wait = if unqueued {
  996. test_dbg!(core.tx_wait.start_wait(node, &thread))
  997. } else {
  998. test_dbg!(core.tx_wait.continue_wait(node, &thread))
  999. };
  1000. match wait {
  1001. WaitResult::Closed => return Err(Closed(())),
  1002. WaitResult::Notified => {
  1003. boff.spin_yield();
  1004. match core.try_send_ref(slots.as_ref(), recycle) {
  1005. Ok(slot) => return Ok(SendRef(slot)),
  1006. Err(TrySendError::Closed(_)) => return Err(Closed(())),
  1007. _ => {}
  1008. }
  1009. }
  1010. WaitResult::Wait => {
  1011. unqueued = false;
  1012. thread::park();
  1013. }
  1014. }
  1015. }
  1016. }