wait_queue.rs 16 KB


  1. // #![allow(dead_code)]
  2. use core::intrinsics::unlikely;
  3. use alloc::{collections::VecDeque, sync::Arc, vec::Vec};
  4. use log::{error, warn};
  5. use system_error::SystemError;
  6. use crate::{
  7. arch::{ipc::signal::Signal, CurrentIrqArch},
  8. exception::InterruptArch,
  9. process::{ProcessControlBlock, ProcessManager, ProcessState},
  10. sched::{schedule, SchedMode},
  11. };
  12. use super::{
  13. mutex::MutexGuard,
  14. spinlock::{SpinLock, SpinLockGuard},
  15. };
  16. #[derive(Debug)]
  17. struct InnerWaitQueue {
  18. /// 等待队列是否已经死亡, 如果已经死亡, 则不能再添加新的等待进程
  19. dead: bool,
  20. /// 等待队列的链表
  21. wait_list: VecDeque<Arc<ProcessControlBlock>>,
  22. }
  23. /// 被自旋锁保护的等待队列
  24. #[derive(Debug)]
  25. pub struct WaitQueue {
  26. inner: SpinLock<InnerWaitQueue>,
  27. }
  28. #[allow(dead_code)]
  29. impl WaitQueue {
  30. pub const fn default() -> Self {
  31. WaitQueue {
  32. inner: SpinLock::new(InnerWaitQueue::INIT),
  33. }
  34. }
  35. fn inner_irqsave(&self) -> SpinLockGuard<InnerWaitQueue> {
  36. self.inner.lock_irqsave()
  37. }
  38. fn inner(&self) -> SpinLockGuard<InnerWaitQueue> {
  39. self.inner.lock()
  40. }
  41. pub fn prepare_to_wait_event(&self, interruptible: bool) -> Result<(), SystemError> {
  42. let mut guard: SpinLockGuard<InnerWaitQueue> = self.inner_irqsave();
  43. let pcb = ProcessManager::current_pcb();
  44. if !guard.can_sleep() {
  45. return Err(SystemError::ESRCH);
  46. }
  47. if Signal::signal_pending_state(interruptible, false, &pcb) {
  48. return Err(SystemError::ERESTARTSYS);
  49. } else {
  50. ProcessManager::mark_sleep(interruptible).unwrap_or_else(|e| {
  51. panic!("sleep error: {:?}", e);
  52. });
  53. guard.wait_list.push_back(ProcessManager::current_pcb());
  54. drop(guard);
  55. }
  56. Ok(())
  57. }
  58. pub fn finish_wait(&self) {
  59. let pcb = ProcessManager::current_pcb();
  60. let mut writer = pcb.sched_info().inner_lock_write_irqsave();
  61. let mut guard: SpinLockGuard<InnerWaitQueue> = self.inner_irqsave();
  62. writer.set_state(ProcessState::Runnable);
  63. writer.set_wakeup();
  64. guard.wait_list.retain(|x| !Arc::ptr_eq(x, &pcb));
  65. drop(guard);
  66. drop(writer);
  67. }
  68. /// @brief 让当前进程在等待队列上进行等待,并且,允许被信号打断
  69. pub fn sleep(&self) -> Result<(), SystemError> {
  70. before_sleep_check(0);
  71. let mut guard: SpinLockGuard<InnerWaitQueue> = self.inner_irqsave();
  72. if !guard.can_sleep() {
  73. return Err(SystemError::ESRCH);
  74. }
  75. ProcessManager::mark_sleep(true).unwrap_or_else(|e| {
  76. panic!("sleep error: {:?}", e);
  77. });
  78. guard.wait_list.push_back(ProcessManager::current_pcb());
  79. drop(guard);
  80. schedule(SchedMode::SM_NONE);
  81. Ok(())
  82. }
  83. /// 标记等待队列已经死亡,不能再添加新的等待进程
  84. pub fn mark_dead(&self) {
  85. let mut guard: SpinLockGuard<InnerWaitQueue> = self.inner_irqsave();
  86. guard.dead = true;
  87. drop(guard);
  88. }
  89. /// @brief 让当前进程在等待队列上进行等待,并且,在释放waitqueue的锁之前,执行f函数闭包
  90. pub fn sleep_with_func<F>(&self, f: F) -> Result<(), SystemError>
  91. where
  92. F: FnOnce(),
  93. {
  94. before_sleep_check(0);
  95. let mut guard: SpinLockGuard<InnerWaitQueue> = self.inner_irqsave();
  96. if !guard.can_sleep() {
  97. return Err(SystemError::ESRCH);
  98. }
  99. ProcessManager::mark_sleep(true).unwrap_or_else(|e| {
  100. panic!("sleep error: {:?}", e);
  101. });
  102. guard.wait_list.push_back(ProcessManager::current_pcb());
  103. f();
  104. drop(guard);
  105. schedule(SchedMode::SM_NONE);
  106. Ok(())
  107. }
  108. /// @brief 让当前进程在等待队列上进行等待. 但是,在释放waitqueue的锁之后,不会调用调度函数。
  109. /// 这样的设计,是为了让调用者可以在执行本函数之后,执行一些操作,然后再【手动调用调度函数】。
  110. ///
  111. /// 执行本函数前,需要确保处于【中断禁止】状态。
  112. ///
  113. /// 尽管sleep_with_func和sleep_without_schedule都可以实现这个功能,但是,sleep_with_func会在释放锁之前,执行f函数闭包。
  114. ///
  115. /// 考虑这样一个场景:
  116. /// 等待队列位于某个自旋锁保护的数据结构A中,我们希望在进程睡眠的同时,释放数据结构A的锁。
  117. /// 在这种情况下,如果使用sleep_with_func,所有权系统不会允许我们这么做。
  118. /// 因此,sleep_without_schedule的设计,正是为了解决这个问题。
  119. ///
  120. /// 由于sleep_without_schedule不会调用调度函数,因此,如果开发者忘记在执行本函数之后,手动调用调度函数,
  121. /// 由于时钟中断到来或者‘其他cpu kick了当前cpu’,可能会导致一些未定义的行为。
  122. pub unsafe fn sleep_without_schedule(&self) -> Result<(), SystemError> {
  123. before_sleep_check(1);
  124. // 安全检查:确保当前处于中断禁止状态
  125. assert!(!CurrentIrqArch::is_irq_enabled());
  126. let mut guard: SpinLockGuard<InnerWaitQueue> = self.inner_irqsave();
  127. if !guard.can_sleep() {
  128. return Err(SystemError::ESRCH);
  129. }
  130. ProcessManager::mark_sleep(true).unwrap_or_else(|e| {
  131. panic!("sleep error: {:?}", e);
  132. });
  133. guard.wait_list.push_back(ProcessManager::current_pcb());
  134. drop(guard);
  135. Ok(())
  136. }
  137. pub unsafe fn sleep_without_schedule_uninterruptible(&self) -> Result<(), SystemError> {
  138. before_sleep_check(1);
  139. // 安全检查:确保当前处于中断禁止状态
  140. assert!(!CurrentIrqArch::is_irq_enabled());
  141. let mut guard: SpinLockGuard<InnerWaitQueue> = self.inner_irqsave();
  142. if !guard.can_sleep() {
  143. return Err(SystemError::ESRCH);
  144. }
  145. ProcessManager::mark_sleep(false).unwrap_or_else(|e| {
  146. panic!("sleep error: {:?}", e);
  147. });
  148. guard.wait_list.push_back(ProcessManager::current_pcb());
  149. drop(guard);
  150. Ok(())
  151. }
  152. /// @brief 让当前进程在等待队列上进行等待,并且,不允许被信号打断
  153. pub fn sleep_uninterruptible(&self) -> Result<(), SystemError> {
  154. before_sleep_check(0);
  155. let mut guard: SpinLockGuard<InnerWaitQueue> = self.inner_irqsave();
  156. if !guard.can_sleep() {
  157. return Err(SystemError::ESRCH);
  158. }
  159. ProcessManager::mark_sleep(false).unwrap_or_else(|e| {
  160. panic!("sleep error: {:?}", e);
  161. });
  162. guard.wait_list.push_back(ProcessManager::current_pcb());
  163. drop(guard);
  164. schedule(SchedMode::SM_NONE);
  165. Ok(())
  166. }
  167. /// @brief 让当前进程在等待队列上进行等待,并且,允许被信号打断。
  168. /// 在当前进程的pcb加入队列后,解锁指定的自旋锁。
  169. pub fn sleep_unlock_spinlock<T>(&self, to_unlock: SpinLockGuard<T>) -> Result<(), SystemError> {
  170. before_sleep_check(1);
  171. let mut guard: SpinLockGuard<InnerWaitQueue> = self.inner_irqsave();
  172. if !guard.can_sleep() {
  173. return Err(SystemError::ESRCH);
  174. }
  175. ProcessManager::mark_sleep(true).unwrap_or_else(|e| {
  176. panic!("sleep error: {:?}", e);
  177. });
  178. guard.wait_list.push_back(ProcessManager::current_pcb());
  179. drop(to_unlock);
  180. drop(guard);
  181. schedule(SchedMode::SM_NONE);
  182. Ok(())
  183. }
  184. /// @brief 让当前进程在等待队列上进行等待,并且,允许被信号打断。
  185. /// 在当前进程的pcb加入队列后,解锁指定的Mutex。
  186. pub fn sleep_unlock_mutex<T>(&self, to_unlock: MutexGuard<T>) -> Result<(), SystemError> {
  187. before_sleep_check(1);
  188. let mut guard: SpinLockGuard<InnerWaitQueue> = self.inner_irqsave();
  189. if !guard.can_sleep() {
  190. return Err(SystemError::ESRCH);
  191. }
  192. ProcessManager::mark_sleep(true).unwrap_or_else(|e| {
  193. panic!("sleep error: {:?}", e);
  194. });
  195. guard.wait_list.push_back(ProcessManager::current_pcb());
  196. drop(to_unlock);
  197. drop(guard);
  198. schedule(SchedMode::SM_NONE);
  199. Ok(())
  200. }
  201. /// @brief 让当前进程在等待队列上进行等待,并且,不允许被信号打断。
  202. /// 在当前进程的pcb加入队列后,解锁指定的自旋锁。
  203. pub fn sleep_uninterruptible_unlock_spinlock<T>(&self, to_unlock: SpinLockGuard<T>) {
  204. before_sleep_check(1);
  205. let mut guard: SpinLockGuard<InnerWaitQueue> = self.inner_irqsave();
  206. let irq_guard = unsafe { CurrentIrqArch::save_and_disable_irq() };
  207. ProcessManager::mark_sleep(false).unwrap_or_else(|e| {
  208. panic!("sleep error: {:?}", e);
  209. });
  210. drop(irq_guard);
  211. guard.wait_list.push_back(ProcessManager::current_pcb());
  212. drop(to_unlock);
  213. drop(guard);
  214. schedule(SchedMode::SM_NONE);
  215. }
  216. /// @brief 让当前进程在等待队列上进行等待,并且,不允许被信号打断。
  217. /// 在当前进程的pcb加入队列后,解锁指定的Mutex。
  218. pub fn sleep_uninterruptible_unlock_mutex<T>(&self, to_unlock: MutexGuard<T>) {
  219. before_sleep_check(1);
  220. let mut guard: SpinLockGuard<InnerWaitQueue> = self.inner_irqsave();
  221. let irq_guard = unsafe { CurrentIrqArch::save_and_disable_irq() };
  222. ProcessManager::mark_sleep(false).unwrap_or_else(|e| {
  223. panic!("sleep error: {:?}", e);
  224. });
  225. drop(irq_guard);
  226. guard.wait_list.push_back(ProcessManager::current_pcb());
  227. drop(to_unlock);
  228. drop(guard);
  229. schedule(SchedMode::SM_NONE);
  230. }
  231. /// @brief 唤醒在队列中等待的第一个进程。
  232. /// 如果这个进程的state与给定的state进行and操作之后,结果不为0,则唤醒它。
  233. ///
  234. /// @param state 用于判断的state,如果队列第一个进程与这个state相同,或者为None(表示不进行这个判断),则唤醒这个进程。
  235. ///
  236. /// @return true 成功唤醒进程
  237. /// @return false 没有唤醒进程
  238. pub fn wakeup(&self, state: Option<ProcessState>) -> bool {
  239. let mut guard: SpinLockGuard<InnerWaitQueue> = self.inner_irqsave();
  240. // 如果队列为空,则返回
  241. if guard.wait_list.is_empty() {
  242. return false;
  243. }
  244. // 如果队列头部的pcb的state与给定的state相与,结果不为0,则唤醒
  245. if let Some(state) = state {
  246. if guard
  247. .wait_list
  248. .front()
  249. .unwrap()
  250. .sched_info()
  251. .inner_lock_read_irqsave()
  252. .state()
  253. != state
  254. {
  255. return false;
  256. }
  257. }
  258. let to_wakeup = guard.wait_list.pop_front().unwrap();
  259. drop(guard);
  260. let res = ProcessManager::wakeup(&to_wakeup).is_ok();
  261. return res;
  262. }
  263. /// @brief 唤醒在队列中,符合条件的所有进程。
  264. ///
  265. /// @param state 用于判断的state,如果一个进程与这个state相同,或者为None(表示不进行这个判断),则唤醒这个进程。
  266. pub fn wakeup_all(&self, state: Option<ProcessState>) {
  267. let mut guard: SpinLockGuard<InnerWaitQueue> = self.inner_irqsave();
  268. // 如果队列为空,则返回
  269. if guard.wait_list.is_empty() {
  270. return;
  271. }
  272. let mut to_push_back: Vec<Arc<ProcessControlBlock>> = Vec::new();
  273. // 如果队列头部的pcb的state与给定的state相与,结果不为0,则唤醒
  274. while let Some(to_wakeup) = guard.wait_list.pop_front() {
  275. let mut wake = false;
  276. if let Some(state) = state {
  277. if to_wakeup.sched_info().inner_lock_read_irqsave().state() == state {
  278. wake = true;
  279. }
  280. } else {
  281. wake = true;
  282. }
  283. if wake {
  284. ProcessManager::wakeup(&to_wakeup).unwrap_or_else(|e| {
  285. error!("wakeup pid: {:?} error: {:?}", to_wakeup.pid(), e);
  286. });
  287. continue;
  288. } else {
  289. to_push_back.push(to_wakeup);
  290. }
  291. }
  292. for to_wakeup in to_push_back {
  293. guard.wait_list.push_back(to_wakeup);
  294. }
  295. }
  296. /// @brief 获得当前等待队列的大小
  297. pub fn len(&self) -> usize {
  298. return self.inner_irqsave().wait_list.len();
  299. }
  300. }
  301. impl InnerWaitQueue {
  302. pub const INIT: InnerWaitQueue = InnerWaitQueue {
  303. wait_list: VecDeque::new(),
  304. dead: false,
  305. };
  306. pub fn can_sleep(&self) -> bool {
  307. return !self.dead;
  308. }
  309. }
  310. fn before_sleep_check(max_preempt: usize) {
  311. let pcb = ProcessManager::current_pcb();
  312. if unlikely(pcb.preempt_count() > max_preempt) {
  313. warn!(
  314. "Process {:?}: Try to sleep when preempt count is {}",
  315. pcb.pid().data(),
  316. pcb.preempt_count()
  317. );
  318. }
  319. }
  320. /// 事件等待队列
  321. #[derive(Debug)]
  322. pub struct EventWaitQueue {
  323. wait_list: SpinLock<Vec<(u64, Arc<ProcessControlBlock>)>>,
  324. }
  325. impl Default for EventWaitQueue {
  326. fn default() -> Self {
  327. Self::new()
  328. }
  329. }
  330. #[allow(dead_code)]
  331. impl EventWaitQueue {
  332. pub fn new() -> Self {
  333. Self {
  334. wait_list: SpinLock::new(Default::default()),
  335. }
  336. }
  337. /// ## 让当前进程在该队列上等待感兴趣的事件
  338. ///
  339. /// ### 参数
  340. /// - events: 进程感兴趣的事件,events最好是为位表示,一位表示一个事件
  341. ///
  342. /// 注意,使用前应该注意有可能其他地方定义了冲突的事件,可能会导致未定义行为
  343. pub fn sleep(&self, events: u64) {
  344. before_sleep_check(0);
  345. let mut guard = self.wait_list.lock_irqsave();
  346. ProcessManager::mark_sleep(true).unwrap_or_else(|e| {
  347. panic!("sleep error: {:?}", e);
  348. });
  349. guard.push((events, ProcessManager::current_pcb()));
  350. drop(guard);
  351. schedule(SchedMode::SM_NONE);
  352. }
  353. pub unsafe fn sleep_without_schedule(&self, events: u64) {
  354. before_sleep_check(1);
  355. let mut guard = self.wait_list.lock_irqsave();
  356. ProcessManager::mark_sleep(true).unwrap_or_else(|e| {
  357. panic!("sleep error: {:?}", e);
  358. });
  359. guard.push((events, ProcessManager::current_pcb()));
  360. drop(guard);
  361. }
  362. pub fn sleep_unlock_spinlock<T>(&self, events: u64, to_unlock: SpinLockGuard<T>) {
  363. before_sleep_check(1);
  364. let mut guard = self.wait_list.lock_irqsave();
  365. let irq_guard = unsafe { CurrentIrqArch::save_and_disable_irq() };
  366. ProcessManager::mark_sleep(true).unwrap_or_else(|e| {
  367. panic!("sleep error: {:?}", e);
  368. });
  369. drop(irq_guard);
  370. guard.push((events, ProcessManager::current_pcb()));
  371. drop(to_unlock);
  372. drop(guard);
  373. schedule(SchedMode::SM_NONE);
  374. }
  375. /// ### 唤醒该队列上等待events的进程
  376. ///
  377. /// ### 参数
  378. /// - events: 发生的事件
  379. ///
  380. /// 需要注意的是,只要触发了events中的任意一件事件,进程都会被唤醒
  381. pub fn wakeup_any(&self, events: u64) -> usize {
  382. let mut ret = 0;
  383. let mut wq_guard = self.wait_list.lock_irqsave();
  384. wq_guard.retain(|(es, pcb)| {
  385. if *es & events > 0 {
  386. // 有感兴趣的事件
  387. if ProcessManager::wakeup(pcb).is_ok() {
  388. ret += 1;
  389. return false;
  390. } else {
  391. return true;
  392. }
  393. } else {
  394. return true;
  395. }
  396. });
  397. ret
  398. }
  399. /// ### 唤醒该队列上等待events的进程
  400. ///
  401. /// ### 参数
  402. /// - events: 发生的事件
  403. ///
  404. /// 需要注意的是,只有满足所有事件的进程才会被唤醒
  405. pub fn wakeup(&self, events: u64) -> usize {
  406. let mut ret = 0;
  407. let mut wq_guard = self.wait_list.lock_irqsave();
  408. wq_guard.retain(|(es, pcb)| {
  409. if *es == events {
  410. // 有感兴趣的事件
  411. if ProcessManager::wakeup(pcb).is_ok() {
  412. ret += 1;
  413. return false;
  414. } else {
  415. return true;
  416. }
  417. } else {
  418. return true;
  419. }
  420. });
  421. ret
  422. }
  423. pub fn wakeup_all(&self) {
  424. self.wakeup_any(u64::MAX);
  425. }
  426. }