wait.rs 3.2 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788
  1. //! Waiter cells and queues to allow threads/tasks to wait for notifications.
  2. //!
  3. //! The MPSC channel enhance a ThingBuf --- which implements a non-blocking
  4. //! queue --- with the capacity to wait. A `ThingBuf` only has `try_send` and
  5. //! `try_recv`-like operations, which immediately return in the case where the
  6. //! queue is full or empty, respectively. In a MPSC channel, the sender is
  7. //! provided with the ability to *wait* until there is capacity in the queue to
  8. //! send its message. Similarly, a receiver can wait until the channel has
  9. //! messages to receive.
  10. //!
  11. //! This module implements two types of structure for waiting: a wait *cell*,
  12. //! which stores a *single* waiting thread or task, and a wait *queue*, which
  13. //! stores a queue of waiting tasks. Since the channel is a MPSC (multiple
  14. //! producer, single consumer) channel, the wait queue is used to store waiting
  15. //! senders, while the wait cell is used to store a waiting receiver (as there
  16. //! is only ever one thread/task waiting to receive from a channel).
  17. //!
  18. //! This module is generic over the _type_ of the waiter; they may either be
  19. //! [`core::task::Waker`]s, for the async MPSC, or [`std::thread::Thread`]s, for
  20. //! the blocking MPSC. In either case, the role played by these types is fairly
  21. //! analogous.
  22. use crate::util::panic::UnwindSafe;
  23. use core::{fmt, task::Waker};
  24. mod cell;
  25. pub(crate) mod queue;
  26. pub(crate) use self::{cell::WaitCell, queue::WaitQueue};
  27. #[cfg(feature = "std")]
  28. use crate::loom::thread;
  29. /// What happened while trying to register to wait.
  30. #[derive(Debug, Eq, PartialEq)]
  31. pub(crate) enum WaitResult {
  32. /// The waiter was registered, and the calling thread/task can now wait.
  33. Wait,
  34. /// The channel is closed.
  35. ///
  36. /// When registering a sender, this means the receiver was dropped; when
  37. /// registering a receiver, this means that all senders have been dropped.
  38. /// In this case, the waiting thread/task should *not* wait, because it will
  39. /// never be woken back up.
  40. ///
  41. /// If this is returned, the waiter (`Thread`/`Waker`) was *not* registered.
  42. Closed,
  43. /// We were notified while trying to register a waiter.
  44. ///
  45. /// This means that, while we were trying to access the wait cell or wait
  46. /// queue, the other side of the channel sent a notification. In this case,
  47. /// we don't need to wait, and we can try the operation we were attempting
  48. /// again, as it may now be ready.
  49. ///
  50. /// If this is returned, the waiter (`Thread`/`Waker`) was *not* registered.
  51. Notified,
  52. }
  53. pub(crate) trait Notify: UnwindSafe + fmt::Debug + Clone {
  54. fn notify(self);
  55. fn same(&self, other: &Self) -> bool;
  56. }
  57. #[cfg(feature = "std")]
  58. impl Notify for thread::Thread {
  59. #[inline]
  60. fn notify(self) {
  61. test_println!("NOTIFYING {:?} (from {:?})", self, thread::current());
  62. self.unpark();
  63. }
  64. #[inline]
  65. fn same(&self, other: &Self) -> bool {
  66. other.id() == self.id()
  67. }
  68. }
  69. impl Notify for Waker {
  70. #[inline]
  71. fn notify(self) {
  72. test_println!("WAKING TASK {:?} (from {:?})", self, thread::current());
  73. self.wake();
  74. }
  75. #[inline]
  76. fn same(&self, other: &Self) -> bool {
  77. other.will_wake(self)
  78. }
  79. }