sync.rs 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148
  1. //! A synchronous multi-producer, single-consumer channel.
  2. //!
  3. //! This provides an equivalent API to the [`mpsc`](crate::mpsc) module, but the
  4. //! [`Receiver`] type in this module waits by blocking the current thread,
  5. //! rather than asynchronously yielding.
  6. use super::*;
  7. use crate::{
  8. loom::{
  9. atomic::{self, Ordering},
  10. sync::Arc,
  11. thread::{self, Thread},
  12. },
  13. Ref, ThingBuf,
  14. };
  15. use core::fmt;
  16. /// Returns a new asynchronous multi-producer, single consumer channel.
  17. pub fn channel<T>(thingbuf: ThingBuf<T>) -> (Sender<T>, Receiver<T>) {
  18. let inner = Arc::new(Inner::new(thingbuf));
  19. let tx = Sender {
  20. inner: inner.clone(),
  21. };
  22. let rx = Receiver { inner };
  23. (tx, rx)
  24. }
  25. #[derive(Debug)]
  26. pub struct Sender<T> {
  27. inner: Arc<Inner<T, Thread>>,
  28. }
  29. #[derive(Debug)]
  30. pub struct Receiver<T> {
  31. inner: Arc<Inner<T, Thread>>,
  32. }
  33. impl_send_ref! {
  34. pub struct SendRef<Thread>;
  35. }
  36. impl_recv_ref! {
  37. pub struct RecvRef<Thread>;
  38. }
  39. // === impl Sender ===
  40. impl<T: Default> Sender<T> {
  41. pub fn try_send_ref(&self) -> Result<SendRef<'_, T>, TrySendError> {
  42. self.inner.try_send_ref().map(SendRef)
  43. }
  44. pub fn try_send(&self, val: T) -> Result<(), TrySendError<T>> {
  45. self.inner.try_send(val)
  46. }
  47. pub fn send_ref(&self) -> Result<SendRef<'_, T>, Closed> {
  48. loop {
  49. // perform one send ref loop iteration
  50. if let Poll::Ready(result) = self.inner.poll_send_ref(thread::current) {
  51. return result.map(SendRef);
  52. }
  53. // if that iteration failed, park the thread.
  54. thread::park();
  55. }
  56. }
  57. pub fn send(&self, val: T) -> Result<(), Closed<T>> {
  58. match self.send_ref() {
  59. Err(Closed(())) => Err(Closed(val)),
  60. Ok(mut slot) => {
  61. slot.with_mut(|slot| *slot = val);
  62. Ok(())
  63. }
  64. }
  65. }
  66. }
  67. impl<T> Clone for Sender<T> {
  68. fn clone(&self) -> Self {
  69. test_dbg!(self.inner.tx_count.fetch_add(1, Ordering::Relaxed));
  70. Self {
  71. inner: self.inner.clone(),
  72. }
  73. }
  74. }
  75. impl<T> Drop for Sender<T> {
  76. fn drop(&mut self) {
  77. if test_dbg!(self.inner.tx_count.fetch_sub(1, Ordering::Release)) > 1 {
  78. return;
  79. }
  80. // if we are the last sender, synchronize
  81. test_dbg!(atomic::fence(Ordering::SeqCst));
  82. if self.inner.thingbuf.core.close() {
  83. self.inner.rx_wait.close_tx();
  84. }
  85. }
  86. }
  87. // === impl Receiver ===
  88. impl<T: Default> Receiver<T> {
  89. pub fn recv_ref(&self) -> Option<RecvRef<'_, T>> {
  90. loop {
  91. match self.inner.poll_recv_ref(thread::current) {
  92. Poll::Ready(r) => {
  93. return r.map(|slot| RecvRef {
  94. _notify: super::NotifyTx(&self.inner.tx_wait),
  95. slot,
  96. })
  97. }
  98. Poll::Pending => {
  99. test_println!("parking ({:?})", thread::current());
  100. thread::park();
  101. }
  102. }
  103. }
  104. }
  105. pub fn try_recv_ref(&self) -> Option<Ref<'_, T>> {
  106. self.inner.thingbuf.pop_ref()
  107. }
  108. pub fn recv(&self) -> Option<T> {
  109. let val = self.recv_ref()?.with_mut(core::mem::take);
  110. Some(val)
  111. }
  112. pub fn is_closed(&self) -> bool {
  113. test_dbg!(self.inner.tx_count.load(Ordering::SeqCst)) <= 1
  114. }
  115. }
  116. impl<'a, T: Default> Iterator for &'a Receiver<T> {
  117. type Item = RecvRef<'a, T>;
  118. fn next(&mut self) -> Option<Self::Item> {
  119. self.recv_ref()
  120. }
  121. }
  122. impl<T> Drop for Receiver<T> {
  123. fn drop(&mut self) {
  124. self.inner.close_rx();
  125. }
  126. }