queue.rs 29 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863
  1. use crate::{
  2. loom::{
  3. atomic::{AtomicUsize, Ordering::*},
  4. cell::UnsafeCell,
  5. },
  6. util::{mutex::Mutex, CachePadded},
  7. wait::{Notify, WaitResult},
  8. };
  9. use core::{fmt, marker::PhantomPinned, pin::Pin, ptr::NonNull};
  10. /// A queue of waiters ([`core::task::Waker`]s or [`std::thread::Thread`]s)
  11. /// implemented as a doubly-linked intrusive list.
  12. ///
  13. /// The *[intrusive]* aspect of this list is important, as it means that it does
  14. /// not allocate memory. Instead, nodes in the linked list are stored in the
  15. /// futures of tasks trying to wait for capacity, or on the stack frames of
  16. /// threads blocking on channel capacity.
  17. ///
  18. /// Using an intrusive list is critical if the MPSC is to *truly* have bounded
  19. /// memory use: if the channel has capacity for a bounded number of *messages*,
  20. /// but when it is full, must allocate memory to store the threads or tasks that
  21. /// are waiting to send messages, it does not actually bound the maximum amount
  22. /// of memory used by the channel! This is, potentially, quite bad, as
  23. /// increasing backpressure can cause out of memory conditions.
  24. ///
  25. /// Also, not allocating can have a performance advantage, since tasks waiting
  26. /// for capacity will never hit `malloc` or `free`. This reduces the overhead of
  27. /// waiting, but (more importantly!) it also avoids allocator churn that can
  28. /// hurt `malloc` performance for *other* parts of the program that need to
  29. /// allocate memory.
  30. ///
  31. /// Finally, this will allow using a `ThingBuf` MPSC channel with exclusively
  32. /// static allocations, making it much easier to use in embedded systems or on
  33. /// bare metal when an allocator may not always be available.
  34. ///
  35. /// However, the intrusive linked list introduces one new danger: because
  36. /// futures can be *cancelled*, and the linked list nodes live within the
  37. /// futures trying to wait for channel capacity, we *must* ensure that the node
  38. /// is unlinked from the list before dropping a cancelled future. Failure to do
  39. /// so would result in the list containing dangling pointers. Therefore, we must
  40. /// use a *doubly-linked* list, so that nodes can edit both the previous and
  41. /// next node when they have to remove themselves. This is kind of a bummer, as
  42. /// it means we can't use something nice like this [intrusive queue by Dmitry
  43. /// Vyukov][2], and there are not really practical designs for lock-free
  44. /// doubly-linked lists that don't rely on some kind of deferred reclamation
  45. /// scheme such as hazard pointers or QSBR.
  46. ///
  47. /// Instead, we just stick a mutex around the linked list, which must be
  48. /// acquired to pop nodes from it, or for nodes to remove themselves when
  49. /// futures are cancelled. This is a bit sad, but the critical sections for this
  50. /// mutex are short enough that we still get pretty good performance despite it.
  51. ///
  52. /// A spinlock is used on `no_std` platforms; [`std::sync::Mutex`] is used when
  53. /// the standard library is available.
  54. ///
  55. /// [intrusive]: https://fuchsia.dev/fuchsia-src/development/languages/c-cpp/fbl_containers_guide/introduction
  56. /// [2]: https://www.1024cores.net/home/lock-free-algorithms/queues/intrusive-mpsc-node-based-queue
  57. #[derive(Debug)]
  58. pub(crate) struct WaitQueue<T> {
  59. /// The wait queue's state variable.
  60. ///
  61. /// The queue is always in one of the following states:
  62. ///
  63. /// - [`EMPTY`]: No waiters are queued, and there is no pending notification.
  64. /// Waiting while the queue is in this state will enqueue the waiter;
  65. /// notifying while in this state will store a pending notification in the
  66. /// queue, transitioning to the `WAKING` state.
  67. ///
  68. /// - [`WAITING`]: There are one or more waiters in the queue. Waiting while
  69. /// the queue is in this state will not transition the state. Waking while
  70. /// in this state will wake the first waiter in the queue; if this empties
  71. /// the queue, then the queue will transition to the `EMPTY` state.
  72. ///
  73. /// - [`WAKING`]: The queue has a stored notification. Waiting while the queue
  74. /// is in this state will consume the pending notification *without*
  75. /// enqueueing the waiter and transition the queue to the `EMPTY` state.
  76. /// Waking while in this state will leave the queue in this state.
  77. ///
  78. /// - [`CLOSED`]: The queue is closed. Waiting while in this state will return
  79. /// [`WaitResult::Closed`] without transitioning the queue's state.
  80. state: CachePadded<AtomicUsize>,
  81. /// The linked list of waiters.
  82. ///
  83. /// # Safety
  84. ///
  85. /// This is protected by a mutex; the mutex *must* be acquired when
  86. /// manipulating the linked list, OR when manipulating waiter nodes that may
  87. /// be linked into the list. If a node is known to not be linked, it is safe
  88. /// to modify that node (such as by setting or unsetting its
  89. /// `Waker`/`Thread`) without holding the lock; otherwise, it may be
  90. /// modified through the list, so the lock must be held when modifying the
  91. /// node.
  92. ///
  93. /// A spinlock is used on `no_std` platforms; [`std::sync::Mutex`] or
  94. /// `parking_lot::Mutex` are used when the standard library is available
  95. /// (depending on feature flags).
  96. list: Mutex<List<T>>,
  97. }
  98. /// A waiter node which may be linked into a wait queue.
  99. #[derive(Debug)]
  100. pub(crate) struct Waiter<T> {
  101. /// The waiter's state variable.
  102. ///
  103. /// A waiter is always in one of the following states:
  104. ///
  105. /// - [`EMPTY`]: The waiter is not linked in the queue, and does not have a
  106. /// `Thread`/`Waker`.
  107. ///
  108. /// - [`WAITING`]: The waiter is linked in the queue and has a
  109. /// `Thread`/`Waker`.
  110. ///
  111. /// - [`WAKING`]: The waiter has been notified by the wait queue. If it is in
  112. /// this state, it is *not* linked into the queue, and does not have a
  113. /// `Thread`/`Waker`.
  114. ///
  115. /// - [`WAKING`]: The waiter has been notified because the wait queue closed.
  116. /// If it is in this state, it is *not* linked into the queue, and does
  117. /// not have a `Thread`/`Waker`.
  118. ///
  119. /// This may be inspected without holding the lock; it can be used to
  120. /// determine whether the lock must be acquired.
  121. state: CachePadded<AtomicUsize>,
  122. /// The linked list node and stored `Thread`/`Waker`.
  123. ///
  124. /// # Safety
  125. ///
  126. /// This `UnsafeCell` may only be accessed while holding the `Mutex` around
  127. /// the wait queue's linked list!
  128. node: UnsafeCell<Node<T>>,
  129. }
  130. #[derive(Debug)]
  131. #[pin_project::pin_project]
  132. struct Node<T> {
  133. next: Link<Waiter<T>>,
  134. prev: Link<Waiter<T>>,
  135. waiter: Option<T>,
  136. // This type is !Unpin due to the heuristic from:
  137. // <https://github.com/rust-lang/rust/pull/82834>
  138. #[pin]
  139. _pin: PhantomPinned,
  140. }
  141. type Link<T> = Option<NonNull<T>>;
  142. struct List<T> {
  143. head: Link<Waiter<T>>,
  144. tail: Link<Waiter<T>>,
  145. }
  146. const EMPTY: usize = 0;
  147. const WAITING: usize = 1;
  148. const WAKING: usize = 2;
  149. const CLOSED: usize = 3;
  150. impl<T> WaitQueue<T> {
  151. #[cfg(loom)]
  152. pub(crate) fn new() -> Self {
  153. Self {
  154. state: CachePadded(AtomicUsize::new(EMPTY)),
  155. list: Mutex::new(List::new()),
  156. }
  157. }
  158. #[cfg(not(loom))]
  159. pub(crate) const fn new() -> Self {
  160. Self {
  161. state: CachePadded(AtomicUsize::new(EMPTY)),
  162. list: crate::util::mutex::const_mutex(List::new()),
  163. }
  164. }
  165. }
  166. impl<T: Notify + Unpin> WaitQueue<T> {
  167. /// Start waiting for a notification.
  168. ///
  169. /// If the queue has a stored notification, this consumes it and returns
  170. /// [`WaitResult::Notified`] without adding the waiter to the queue. If the
  171. /// queue is closed, this returns [`WaitResult::Closed`] without adding the
  172. /// waiter to the queue. Otherwise, the waiter is enqueued, and this returns
  173. /// [`WaitResult::Wait`].
  174. #[inline(always)]
  175. pub(crate) fn start_wait(&self, node: Pin<&mut Waiter<T>>, waiter: &T) -> WaitResult {
  176. test_println!("WaitQueue::start_wait({:p})", node);
  177. // Optimistically, acquire a stored notification before trying to lock
  178. // the wait list.
  179. match test_dbg!(self.state.compare_exchange(WAKING, EMPTY, SeqCst, SeqCst)) {
  180. Ok(_) => return WaitResult::Notified,
  181. Err(CLOSED) => return WaitResult::Closed,
  182. Err(_) => {}
  183. }
  184. // Slow path: the queue is not closed, and we failed to consume a stored
  185. // notification. We need to acquire the lock and enqueue the waiter.
  186. self.start_wait_slow(node, waiter)
  187. }
  188. /// Slow path of `start_wait`: acquires the linked list lock, and adds the
  189. /// waiter to the queue.
  190. #[cold]
  191. #[inline(never)]
  192. fn start_wait_slow(&self, node: Pin<&mut Waiter<T>>, waiter: &T) -> WaitResult {
  193. test_println!("WaitQueue::start_wait_slow({:p})", node);
  194. // There are no queued notifications to consume, and the queue is
  195. // still open. Therefore, it's time to actually push the waiter to
  196. // the queue...finally lol :)
  197. // Grab the lock...
  198. let mut list = self.list.lock();
  199. // Reload the queue's state, as it may have changed while we were
  200. // waiting to lock the linked list.
  201. let mut state = self.state.load(Acquire);
  202. loop {
  203. match test_dbg!(state) {
  204. // The queue is empty: transition the state to WAITING, as we
  205. // are adding a waiter.
  206. EMPTY => {
  207. match test_dbg!(self
  208. .state
  209. .compare_exchange_weak(EMPTY, WAITING, SeqCst, SeqCst))
  210. {
  211. Ok(_) => break,
  212. Err(actual) => {
  213. debug_assert!(actual == EMPTY || actual == WAKING || actual == CLOSED);
  214. state = actual;
  215. }
  216. }
  217. }
  218. // The queue was woken while we were waiting to acquire the
  219. // lock. Attempt to consume the wakeup.
  220. WAKING => {
  221. match test_dbg!(self
  222. .state
  223. .compare_exchange_weak(WAKING, EMPTY, SeqCst, SeqCst))
  224. {
  225. // Consumed the wakeup!
  226. Ok(_) => return WaitResult::Notified,
  227. Err(actual) => {
  228. debug_assert!(actual == WAKING || actual == EMPTY || actual == CLOSED);
  229. state = actual;
  230. }
  231. }
  232. }
  233. // The queue closed while we were waiting to acquire the lock;
  234. // we're done here!
  235. CLOSED => return WaitResult::Closed,
  236. // The queue is already in the WAITING state, so we don't need
  237. // to mess with it.
  238. _state => {
  239. debug_assert_eq!(_state, WAITING,
  240. "start_wait_slow: unexpected state value {:?} (expected WAITING). this is a bug!",
  241. _state,
  242. );
  243. break;
  244. }
  245. }
  246. }
  247. // Time to wait! Store the waiter in the node, advance the node's state
  248. // to Waiting, and add it to the queue.
  249. node.with_node(&mut *list, |node| {
  250. let _prev = node.waiter.replace(waiter.clone());
  251. debug_assert!(
  252. _prev.is_none(),
  253. "start_wait_slow: called with a node that already had a waiter!"
  254. );
  255. });
  256. let _prev_state = test_dbg!(node.state.swap(WAITING, Release));
  257. debug_assert!(
  258. _prev_state == EMPTY || _prev_state == WAKING,
  259. "start_wait_slow: called with a node that was not empty ({}) or woken ({})! actual={}",
  260. EMPTY,
  261. WAKING,
  262. _prev_state,
  263. );
  264. list.enqueue(node);
  265. WaitResult::Wait
  266. }
  267. /// Continue waiting for a notification.
  268. ///
  269. /// This is called when a waiter has been woken. It determines if the
  270. /// node was woken from the queue, or if the wakeup was spurious. If the
  271. /// wakeup was from the queue, this returns [`WaitResult::Notified`] or
  272. /// [`WaitResult::Closed`]. Otherwise, if the wakeup was spurious, this will
  273. /// lock the queue and check if the node's waiter needs to be updated.
  274. #[inline(always)]
  275. pub(crate) fn continue_wait(&self, node: Pin<&mut Waiter<T>>, my_waiter: &T) -> WaitResult {
  276. test_println!("WaitQueue::continue_wait({:p})", node);
  277. // Fast path: check if the node was woken from the queue.
  278. let state = test_dbg!(node.state.load(Acquire));
  279. match state {
  280. WAKING => return WaitResult::Notified,
  281. CLOSED => return WaitResult::Closed,
  282. _state => {
  283. debug_assert_eq!(
  284. _state, WAITING,
  285. "continue_wait should not be called unless the node has been enqueued"
  286. );
  287. }
  288. }
  289. // Slow path: received a spurious wakeup. We must lock the queue so that
  290. // we can potentially modify the node's waiter.
  291. self.continue_wait_slow(node, my_waiter)
  292. }
  293. /// Slow path for `continue_wait`: locks the linked list and updates the
  294. /// node with a new waiter.
  295. #[cold]
  296. #[inline(never)]
  297. fn continue_wait_slow(&self, node: Pin<&mut Waiter<T>>, my_waiter: &T) -> WaitResult {
  298. test_println!("WaitQueue::continue_wait_slow({:p})", node);
  299. // If the waiting task/thread was woken but no wakeup was assigned to
  300. // the node, we may need to update the node with a new waiter.
  301. // Therefore, lock the queue in order to modify the node.
  302. let mut list = self.list.lock();
  303. // The node may have been woken while we were waiting to acquire the
  304. // lock. If so, check the new state.
  305. match test_dbg!(node.state.load(Acquire)) {
  306. WAKING => return WaitResult::Notified,
  307. CLOSED => return WaitResult::Closed,
  308. _state => {
  309. debug_assert_eq!(
  310. _state, WAITING,
  311. "continue_wait_slow should not be called unless the node has been enqueued"
  312. );
  313. }
  314. }
  315. // Okay, we were not woken and need to continue waiting. It may be
  316. // necessary to update the waiter with a new waiter (in practice, this
  317. // is only necessary in async).
  318. node.with_node(&mut *list, |node| {
  319. if let Some(ref mut waiter) = node.waiter {
  320. if !waiter.same(my_waiter) {
  321. *waiter = my_waiter.clone();
  322. }
  323. } else {
  324. // XXX(eliza): This branch should _probably_ never occur...
  325. node.waiter = Some(my_waiter.clone());
  326. }
  327. });
  328. WaitResult::Wait
  329. }
  330. /// Notify one waiter from the queue. If there are no waiters in the linked
  331. /// list, the notification is instead assigned to the queue itself.
  332. ///
  333. /// If a waiter was popped from the queue, returns `true`. Otherwise, if the
  334. /// notification was assigned to the queue, returns `false`.
  335. #[inline(always)]
  336. pub(crate) fn notify(&self) -> bool {
  337. test_println!("WaitQueue::notify()");
  338. // Fast path: If the queue is empty, we can simply assign the
  339. // notification to the queue.
  340. let mut state = self.state.load(Acquire);
  341. while test_dbg!(state) == WAKING || state == EMPTY {
  342. match test_dbg!(self
  343. .state
  344. .compare_exchange_weak(state, WAKING, SeqCst, SeqCst))
  345. {
  346. // No waiters are currently waiting, assign the notification to
  347. // the queue to be consumed by the next wait attempt.
  348. Ok(_) => return false,
  349. Err(actual) => state = actual,
  350. }
  351. }
  352. // Slow path: there are waiters in the queue, so we must acquire the
  353. // lock and wake one of them.
  354. self.notify_slow(state)
  355. }
  356. /// Slow path for `notify`: acquire the lock on the linked list, dequeue a
  357. /// waiter, and notify it.
  358. #[cold]
  359. #[inline(never)]
  360. fn notify_slow(&self, state: usize) -> bool {
  361. test_println!("WaitQueue::notify_slow(state: {})", state);
  362. let mut list = self.list.lock();
  363. match state {
  364. EMPTY | WAKING => {
  365. if let Err(actual) = self.state.compare_exchange(state, WAKING, SeqCst, SeqCst) {
  366. debug_assert!(actual == EMPTY || actual == WAKING);
  367. self.state.store(WAKING, SeqCst);
  368. }
  369. }
  370. WAITING => {
  371. let waiter = list.dequeue(WAKING);
  372. debug_assert!(
  373. waiter.is_some(),
  374. "if we were in the `WAITING` state, there must be a waiter in the queue!\nself={:#?}",
  375. self,
  376. );
  377. // If we popped the last node, transition back to the empty
  378. // state.
  379. if test_dbg!(list.is_empty()) {
  380. self.state.store(EMPTY, SeqCst);
  381. }
  382. // drop the lock
  383. drop(list);
  384. // wake the waiter
  385. if let Some(waiter) = waiter {
  386. waiter.notify();
  387. return true;
  388. }
  389. }
  390. _weird => {
  391. // huh, if `notify_slow` was called, we *probably* shouldn't be
  392. // in the `closed` state...
  393. #[cfg(debug_assertions)]
  394. unreachable!("notify_slow: unexpected state value {:?}", _weird);
  395. }
  396. }
  397. false
  398. }
  399. /// Close the queue, notifying all waiting tasks.
  400. pub(crate) fn close(&self) {
  401. test_println!("WaitQueue::close()");
  402. test_dbg!(self.state.swap(CLOSED, SeqCst));
  403. let mut list = self.list.lock();
  404. while !list.is_empty() {
  405. if let Some(waiter) = list.dequeue(CLOSED) {
  406. waiter.notify();
  407. }
  408. }
  409. }
  410. }
  411. // === impl Waiter ===
  412. impl<T: Notify> Waiter<T> {
  413. pub(crate) fn new() -> Self {
  414. Self {
  415. state: CachePadded(AtomicUsize::new(EMPTY)),
  416. node: UnsafeCell::new(Node {
  417. next: None,
  418. prev: None,
  419. waiter: None,
  420. _pin: PhantomPinned,
  421. }),
  422. }
  423. }
  424. #[inline(never)]
  425. pub(crate) fn remove(self: Pin<&mut Self>, q: &WaitQueue<T>) {
  426. test_println!("Waiter::remove({:p})", self);
  427. let mut list = q.list.lock();
  428. unsafe {
  429. // Safety: removing a node is unsafe even when the list is locked,
  430. // because there's no way to guarantee that the node is part of
  431. // *this* list. However, the potential callers of this method will
  432. // never have access to any other linked lists, so we can just kind
  433. // of assume that this is safe.
  434. list.remove(self);
  435. }
  436. if test_dbg!(list.is_empty()) {
  437. let _ = test_dbg!(q.state.compare_exchange(WAITING, EMPTY, SeqCst, SeqCst));
  438. }
  439. }
  440. #[inline]
  441. pub(crate) fn is_linked(&self) -> bool {
  442. test_dbg!(self.state.load(Acquire)) == WAITING
  443. }
  444. }
  445. impl<T> Waiter<T> {
  446. /// # Safety
  447. ///
  448. /// This is only safe to call while the list is locked. The dummy `_list`
  449. /// parameter ensures this method is only called while holding the lock, so
  450. /// this can be safe.
  451. #[inline(always)]
  452. #[cfg_attr(loom, track_caller)]
  453. fn with_node<U>(&self, _list: &mut List<T>, f: impl FnOnce(&mut Node<T>) -> U) -> U {
  454. self.node.with_mut(|node| unsafe {
  455. // Safety: the dummy `_list` argument ensures that the caller has
  456. // the right to mutate the list (e.g. the list is locked).
  457. f(&mut *node)
  458. })
  459. }
  460. /// # Safety
  461. ///
  462. /// This is only safe to call while the list is locked.
  463. #[cfg_attr(loom, track_caller)]
  464. unsafe fn set_prev(&mut self, prev: Option<NonNull<Waiter<T>>>) {
  465. self.node.with_mut(|node| (*node).prev = prev);
  466. }
  467. /// # Safety
  468. ///
  469. /// This is only safe to call while the list is locked.
  470. #[cfg_attr(loom, track_caller)]
  471. unsafe fn take_prev(&mut self) -> Option<NonNull<Waiter<T>>> {
  472. self.node.with_mut(|node| (*node).prev.take())
  473. }
  474. /// # Safety
  475. ///
  476. /// This is only safe to call while the list is locked.
  477. #[cfg_attr(loom, track_caller)]
  478. unsafe fn take_next(&mut self) -> Option<NonNull<Waiter<T>>> {
  479. self.node.with_mut(|node| (*node).next.take())
  480. }
  481. }
  482. unsafe impl<T: Send> Send for Waiter<T> {}
  483. unsafe impl<T: Send> Sync for Waiter<T> {}
  484. // === impl List ===
  485. impl<T> List<T> {
  486. const fn new() -> Self {
  487. Self {
  488. head: None,
  489. tail: None,
  490. }
  491. }
  492. fn enqueue(&mut self, waiter: Pin<&mut Waiter<T>>) {
  493. test_println!("List::enqueue({:p})", waiter);
  494. let node = unsafe { waiter.get_unchecked_mut() };
  495. let head = self.head.take();
  496. node.with_node(self, |node| {
  497. node.next = head;
  498. node.prev = None;
  499. });
  500. let ptr = NonNull::from(node);
  501. debug_assert_ne!(
  502. self.head,
  503. Some(ptr),
  504. "tried to enqueue the same waiter twice!"
  505. );
  506. if let Some(mut head) = head {
  507. unsafe {
  508. head.as_mut().set_prev(Some(ptr));
  509. }
  510. }
  511. self.head = Some(ptr);
  512. if self.tail.is_none() {
  513. self.tail = Some(ptr);
  514. }
  515. }
  516. fn dequeue(&mut self, new_state: usize) -> Option<T> {
  517. let mut last = self.tail?;
  518. test_println!("List::dequeue({:?}) -> {:p}", new_state, last);
  519. let last = unsafe { last.as_mut() };
  520. let _prev_state = test_dbg!(last.state.swap(new_state, Release));
  521. debug_assert_eq!(_prev_state, WAITING);
  522. let (prev, waiter) = last.with_node(self, |node| {
  523. node.next = None;
  524. (node.prev.take(), node.waiter.take())
  525. });
  526. match prev {
  527. Some(mut prev) => unsafe {
  528. let _ = prev.as_mut().take_next();
  529. },
  530. None => self.head = None,
  531. }
  532. self.tail = prev;
  533. waiter
  534. }
  535. unsafe fn remove(&mut self, node: Pin<&mut Waiter<T>>) {
  536. test_println!("List::remove({:p})", node);
  537. let node_ref = node.get_unchecked_mut();
  538. let prev = node_ref.take_prev();
  539. let next = node_ref.take_next();
  540. let ptr = NonNull::from(node_ref);
  541. if let Some(mut prev) = prev {
  542. prev.as_mut().with_node(self, |prev| {
  543. debug_assert_eq!(prev.next, Some(ptr));
  544. prev.next = next;
  545. });
  546. } else if self.head == Some(ptr) {
  547. self.head = next;
  548. }
  549. if let Some(mut next) = next {
  550. next.as_mut().with_node(self, |next| {
  551. debug_assert_eq!(next.prev, Some(ptr));
  552. next.prev = prev;
  553. });
  554. } else if self.tail == Some(ptr) {
  555. self.tail = prev;
  556. }
  557. }
  558. fn is_empty(&self) -> bool {
  559. self.head.is_none() && self.tail.is_none()
  560. }
  561. }
  562. impl<T> fmt::Debug for List<T> {
  563. fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
  564. f.debug_struct("List")
  565. .field("head", &self.head)
  566. .field("tail", &self.tail)
  567. .field("is_empty", &self.is_empty())
  568. .finish()
  569. }
  570. }
  571. unsafe impl<T: Send> Send for List<T> {}
  572. #[cfg(all(test, not(loom)))]
  573. mod tests {
  574. use super::*;
  575. use std::sync::{
  576. atomic::{AtomicBool, Ordering},
  577. Arc,
  578. };
  579. #[derive(Debug, Clone)]
  580. struct MockNotify(Arc<AtomicBool>);
  581. impl Notify for MockNotify {
  582. fn notify(self) {
  583. self.0.store(true, Ordering::SeqCst);
  584. }
  585. fn same(&self, Self(other): &Self) -> bool {
  586. Arc::ptr_eq(&self.0, other)
  587. }
  588. }
  589. impl MockNotify {
  590. fn new() -> Self {
  591. Self(Arc::new(AtomicBool::new(false)))
  592. }
  593. fn was_notified(&self) -> bool {
  594. self.0.load(Ordering::SeqCst)
  595. }
  596. }
  597. #[test]
  598. fn notify_one() {
  599. let q = WaitQueue::new();
  600. let notify1 = MockNotify::new();
  601. let notify2 = MockNotify::new();
  602. let mut waiter1 = Box::pin(Waiter::new());
  603. let mut waiter2 = Box::pin(Waiter::new());
  604. assert_eq_dbg!(q.start_wait(waiter1.as_mut(), &notify1), WaitResult::Wait);
  605. assert_dbg!(waiter1.is_linked());
  606. assert_eq_dbg!(q.start_wait(waiter2.as_mut(), &notify2), WaitResult::Wait);
  607. assert_dbg!(waiter2.is_linked());
  608. assert_dbg!(!notify1.was_notified());
  609. assert_dbg!(!notify2.was_notified());
  610. assert_dbg!(q.notify());
  611. assert_dbg!(notify1.was_notified());
  612. assert_dbg!(!waiter1.is_linked());
  613. assert_dbg!(!notify2.was_notified());
  614. assert_dbg!(waiter2.is_linked());
  615. assert_eq_dbg!(
  616. q.continue_wait(waiter2.as_mut(), &notify2),
  617. WaitResult::Wait
  618. );
  619. assert_eq_dbg!(
  620. q.continue_wait(waiter1.as_mut(), &notify1),
  621. WaitResult::Notified
  622. );
  623. }
  624. #[test]
  625. fn close() {
  626. let q = WaitQueue::new();
  627. let notify1 = MockNotify::new();
  628. let notify2 = MockNotify::new();
  629. let mut waiter1 = Box::pin(Waiter::new());
  630. let mut waiter2 = Box::pin(Waiter::new());
  631. assert_eq_dbg!(q.start_wait(waiter1.as_mut(), &notify1), WaitResult::Wait);
  632. assert_dbg!(waiter1.is_linked());
  633. assert_eq_dbg!(q.start_wait(waiter2.as_mut(), &notify2), WaitResult::Wait);
  634. assert_dbg!(waiter2.is_linked());
  635. assert_dbg!(!notify1.was_notified());
  636. assert_dbg!(!notify2.was_notified());
  637. q.close();
  638. assert_dbg!(notify1.was_notified());
  639. assert_dbg!(!waiter1.is_linked());
  640. assert_dbg!(notify2.was_notified());
  641. assert_dbg!(!waiter2.is_linked());
  642. assert_eq_dbg!(
  643. q.continue_wait(waiter2.as_mut(), &notify2),
  644. WaitResult::Closed
  645. );
  646. assert_eq_dbg!(
  647. q.continue_wait(waiter1.as_mut(), &notify1),
  648. WaitResult::Closed
  649. );
  650. }
  651. #[test]
  652. fn remove_from_middle() {
  653. let q = WaitQueue::new();
  654. let notify1 = MockNotify::new();
  655. let notify2 = MockNotify::new();
  656. let notify3 = MockNotify::new();
  657. let mut waiter1 = Box::pin(Waiter::new());
  658. let mut waiter2 = Box::pin(Waiter::new());
  659. let mut waiter3 = Box::pin(Waiter::new());
  660. assert_eq_dbg!(q.start_wait(waiter1.as_mut(), &notify1), WaitResult::Wait);
  661. assert_dbg!(waiter1.is_linked());
  662. assert_eq_dbg!(q.start_wait(waiter2.as_mut(), &notify2), WaitResult::Wait);
  663. assert_dbg!(waiter2.is_linked());
  664. assert_eq_dbg!(q.start_wait(waiter3.as_mut(), &notify3), WaitResult::Wait);
  665. assert_dbg!(waiter2.is_linked());
  666. assert_dbg!(!notify1.was_notified());
  667. assert_dbg!(!notify2.was_notified());
  668. assert_dbg!(!notify3.was_notified());
  669. waiter2.as_mut().remove(&q);
  670. assert_dbg!(!notify2.was_notified());
  671. drop(waiter2);
  672. assert_dbg!(q.notify());
  673. assert_dbg!(notify1.was_notified());
  674. assert_dbg!(!waiter1.is_linked());
  675. assert_dbg!(!notify3.was_notified());
  676. assert_dbg!(waiter3.is_linked());
  677. assert_eq_dbg!(
  678. q.continue_wait(waiter3.as_mut(), &notify3),
  679. WaitResult::Wait
  680. );
  681. assert_eq_dbg!(
  682. q.continue_wait(waiter1.as_mut(), &notify1),
  683. WaitResult::Notified
  684. );
  685. }
  686. #[test]
  687. fn remove_after_notify() {
  688. let q = WaitQueue::new();
  689. let notify1 = MockNotify::new();
  690. let notify2 = MockNotify::new();
  691. let notify3 = MockNotify::new();
  692. let mut waiter1 = Box::pin(Waiter::new());
  693. let mut waiter2 = Box::pin(Waiter::new());
  694. let mut waiter3 = Box::pin(Waiter::new());
  695. assert_eq_dbg!(q.start_wait(waiter1.as_mut(), &notify1), WaitResult::Wait);
  696. assert_dbg!(waiter1.is_linked());
  697. assert_eq_dbg!(q.start_wait(waiter2.as_mut(), &notify2), WaitResult::Wait);
  698. assert_dbg!(waiter2.is_linked());
  699. assert_eq_dbg!(q.start_wait(waiter3.as_mut(), &notify3), WaitResult::Wait);
  700. assert_dbg!(waiter2.is_linked());
  701. assert_dbg!(!notify1.was_notified());
  702. assert_dbg!(!notify2.was_notified());
  703. assert_dbg!(!notify3.was_notified());
  704. assert_dbg!(q.notify());
  705. assert_dbg!(notify1.was_notified());
  706. assert_dbg!(!waiter1.is_linked());
  707. assert_dbg!(!notify2.was_notified());
  708. assert_dbg!(waiter2.is_linked());
  709. assert_dbg!(!notify3.was_notified());
  710. assert_dbg!(waiter3.is_linked());
  711. assert_eq_dbg!(
  712. q.continue_wait(waiter3.as_mut(), &notify3),
  713. WaitResult::Wait
  714. );
  715. assert_eq_dbg!(
  716. q.continue_wait(waiter2.as_mut(), &notify2),
  717. WaitResult::Wait
  718. );
  719. assert_eq_dbg!(
  720. q.continue_wait(waiter1.as_mut(), &notify1),
  721. WaitResult::Notified
  722. );
  723. waiter2.as_mut().remove(&q);
  724. assert_dbg!(!notify2.was_notified());
  725. drop(waiter2);
  726. assert_dbg!(!notify3.was_notified());
  727. assert_dbg!(waiter3.is_linked());
  728. assert_eq_dbg!(
  729. q.continue_wait(waiter3.as_mut(), &notify3),
  730. WaitResult::Wait
  731. );
  732. }
  733. }