pipe.rs 16 KB


  1. use core::sync::atomic::compiler_fence;
  2. use crate::{
  3. arch::ipc::signal::{SigCode, Signal},
  4. filesystem::{
  5. epoll::{event_poll::EventPoll, EPollEventType, EPollItem},
  6. vfs::{
  7. file::FileMode, syscall::ModeType, vcore::generate_inode_id, FilePrivateData,
  8. FileSystem, FileType, IndexNode, Metadata, PollableInode,
  9. },
  10. },
  11. libs::{
  12. spinlock::{SpinLock, SpinLockGuard},
  13. wait_queue::WaitQueue,
  14. },
  15. process::{ProcessFlags, ProcessManager, ProcessState},
  16. sched::SchedMode,
  17. time::PosixTimeSpec,
  18. };
  19. use alloc::{
  20. collections::LinkedList,
  21. sync::{Arc, Weak},
  22. };
  23. use system_error::SystemError;
  24. use super::signal_types::{SigInfo, SigType};
  25. /// 我们设定pipe_buff的总大小为1024字节
  26. const PIPE_BUFF_SIZE: usize = 1024;
  27. #[derive(Debug, Clone)]
  28. pub struct PipeFsPrivateData {
  29. mode: FileMode,
  30. }
  31. impl PipeFsPrivateData {
  32. pub fn new(mode: FileMode) -> Self {
  33. return PipeFsPrivateData { mode };
  34. }
  35. pub fn set_mode(&mut self, mode: FileMode) {
  36. self.mode = mode;
  37. }
  38. }
  39. /// @brief 管道文件i节点(锁)
  40. #[derive(Debug)]
  41. pub struct LockedPipeInode {
  42. inner: SpinLock<InnerPipeInode>,
  43. read_wait_queue: WaitQueue,
  44. write_wait_queue: WaitQueue,
  45. epitems: SpinLock<LinkedList<Arc<EPollItem>>>,
  46. }
  47. /// @brief 管道文件i节点(无锁)
  48. #[derive(Debug)]
  49. pub struct InnerPipeInode {
  50. self_ref: Weak<LockedPipeInode>,
  51. /// 管道内可读的数据数
  52. valid_cnt: i32,
  53. read_pos: i32,
  54. write_pos: i32,
  55. data: [u8; PIPE_BUFF_SIZE],
  56. /// INode 元数据
  57. metadata: Metadata,
  58. reader: u32,
  59. writer: u32,
  60. had_reader: bool,
  61. }
  62. impl InnerPipeInode {
  63. pub fn poll(&self, private_data: &FilePrivateData) -> Result<usize, SystemError> {
  64. let mut events = EPollEventType::empty();
  65. let mode = if let FilePrivateData::Pipefs(PipeFsPrivateData { mode }) = private_data {
  66. mode
  67. } else {
  68. return Err(SystemError::EBADFD);
  69. };
  70. if mode.contains(FileMode::O_RDONLY) {
  71. if self.valid_cnt != 0 {
  72. // 有数据可读
  73. events.insert(EPollEventType::EPOLLIN | EPollEventType::EPOLLRDNORM);
  74. }
  75. // 没有写者
  76. if self.writer == 0 {
  77. events.insert(EPollEventType::EPOLLHUP)
  78. }
  79. }
  80. if mode.contains(FileMode::O_WRONLY) {
  81. // 管道内数据未满
  82. if self.valid_cnt as usize != PIPE_BUFF_SIZE {
  83. events.insert(EPollEventType::EPOLLOUT | EPollEventType::EPOLLWRNORM);
  84. }
  85. // 没有读者
  86. if self.reader == 0 {
  87. events.insert(EPollEventType::EPOLLERR);
  88. }
  89. }
  90. Ok(events.bits() as usize)
  91. }
  92. fn buf_full(&self) -> bool {
  93. return self.valid_cnt as usize == PIPE_BUFF_SIZE;
  94. }
  95. }
  96. impl LockedPipeInode {
  97. pub fn new() -> Arc<Self> {
  98. let inner = InnerPipeInode {
  99. self_ref: Weak::default(),
  100. valid_cnt: 0,
  101. read_pos: 0,
  102. write_pos: 0,
  103. had_reader: false,
  104. data: [0; PIPE_BUFF_SIZE],
  105. metadata: Metadata {
  106. dev_id: 0,
  107. inode_id: generate_inode_id(),
  108. size: PIPE_BUFF_SIZE as i64,
  109. blk_size: 0,
  110. blocks: 0,
  111. atime: PosixTimeSpec::default(),
  112. mtime: PosixTimeSpec::default(),
  113. ctime: PosixTimeSpec::default(),
  114. btime: PosixTimeSpec::default(),
  115. file_type: FileType::Pipe,
  116. mode: ModeType::from_bits_truncate(0o666),
  117. nlinks: 1,
  118. uid: 0,
  119. gid: 0,
  120. raw_dev: Default::default(),
  121. },
  122. reader: 0,
  123. writer: 0,
  124. };
  125. let result = Arc::new(Self {
  126. inner: SpinLock::new(inner),
  127. read_wait_queue: WaitQueue::default(),
  128. write_wait_queue: WaitQueue::default(),
  129. epitems: SpinLock::new(LinkedList::new()),
  130. });
  131. let mut guard = result.inner.lock();
  132. guard.self_ref = Arc::downgrade(&result);
  133. // 释放锁
  134. drop(guard); //这一步其实不需要,只要离开作用域,guard生命周期结束,自会解锁
  135. return result;
  136. }
  137. pub fn inner(&self) -> &SpinLock<InnerPipeInode> {
  138. &self.inner
  139. }
  140. fn readable(&self) -> bool {
  141. let inode = self.inner.lock();
  142. return inode.valid_cnt > 0 || inode.writer == 0;
  143. }
  144. fn writeable(&self) -> bool {
  145. let inode = self.inner.lock();
  146. return !inode.buf_full() || inode.reader == 0;
  147. }
  148. }
  149. impl PollableInode for LockedPipeInode {
  150. fn poll(&self, private_data: &FilePrivateData) -> Result<usize, SystemError> {
  151. self.inner.lock().poll(private_data)
  152. }
  153. fn add_epitem(
  154. &self,
  155. epitem: Arc<EPollItem>,
  156. _private_data: &FilePrivateData,
  157. ) -> Result<(), SystemError> {
  158. self.epitems.lock().push_back(epitem);
  159. Ok(())
  160. }
  161. fn remove_epitem(
  162. &self,
  163. epitem: &Arc<EPollItem>,
  164. _private_data: &FilePrivateData,
  165. ) -> Result<(), SystemError> {
  166. let mut guard = self.epitems.lock();
  167. let len = guard.len();
  168. guard.retain(|x| !Arc::ptr_eq(x, epitem));
  169. if len != guard.len() {
  170. return Ok(());
  171. }
  172. Err(SystemError::ENOENT)
  173. }
  174. }
  175. impl IndexNode for LockedPipeInode {
  176. fn read_at(
  177. &self,
  178. _offset: usize,
  179. len: usize,
  180. buf: &mut [u8],
  181. data_guard: SpinLockGuard<FilePrivateData>,
  182. ) -> Result<usize, SystemError> {
  183. let data = data_guard.clone();
  184. drop(data_guard);
  185. // 获取mode
  186. let mode: FileMode;
  187. if let FilePrivateData::Pipefs(pdata) = &data {
  188. mode = pdata.mode;
  189. } else {
  190. return Err(SystemError::EBADF);
  191. }
  192. if buf.len() < len {
  193. return Err(SystemError::EINVAL);
  194. }
  195. // log::debug!("pipe mode: {:?}", mode);
  196. // 加锁
  197. let mut inner_guard = self.inner.lock();
  198. // 如果管道里面没有数据,则唤醒写端,
  199. while inner_guard.valid_cnt == 0 {
  200. // 如果当前管道写者数为0,则返回EOF
  201. if inner_guard.writer == 0 {
  202. return Ok(0);
  203. }
  204. self.write_wait_queue
  205. .wakeup(Some(ProcessState::Blocked(true)));
  206. // 如果为非阻塞管道,直接返回错误
  207. if mode.contains(FileMode::O_NONBLOCK) {
  208. drop(inner_guard);
  209. return Err(SystemError::EAGAIN_OR_EWOULDBLOCK);
  210. }
  211. // 否则在读等待队列中睡眠,并释放锁
  212. drop(inner_guard);
  213. let r = wq_wait_event_interruptible!(self.read_wait_queue, self.readable(), {});
  214. if r.is_err() {
  215. ProcessManager::current_pcb()
  216. .flags()
  217. .insert(ProcessFlags::HAS_PENDING_SIGNAL);
  218. return Err(SystemError::ERESTARTSYS);
  219. }
  220. inner_guard = self.inner.lock();
  221. }
  222. let mut num = inner_guard.valid_cnt as usize;
  223. //决定要输出的字节
  224. let start = inner_guard.read_pos as usize;
  225. //如果读端希望读取的字节数大于有效字节数,则输出有效字节
  226. let mut end =
  227. (inner_guard.valid_cnt as usize + inner_guard.read_pos as usize) % PIPE_BUFF_SIZE;
  228. //如果读端希望读取的字节数少于有效字节数,则输出希望读取的字节
  229. if len < inner_guard.valid_cnt as usize {
  230. end = (len + inner_guard.read_pos as usize) % PIPE_BUFF_SIZE;
  231. num = len;
  232. }
  233. // 从管道拷贝数据到用户的缓冲区
  234. if end < start {
  235. buf[0..(PIPE_BUFF_SIZE - start)]
  236. .copy_from_slice(&inner_guard.data[start..PIPE_BUFF_SIZE]);
  237. buf[(PIPE_BUFF_SIZE - start)..num].copy_from_slice(&inner_guard.data[0..end]);
  238. } else {
  239. buf[0..num].copy_from_slice(&inner_guard.data[start..end]);
  240. }
  241. //更新读位置以及valid_cnt
  242. inner_guard.read_pos = (inner_guard.read_pos + num as i32) % PIPE_BUFF_SIZE as i32;
  243. inner_guard.valid_cnt -= num as i32;
  244. // 读完以后如果未读完,则唤醒下一个读者
  245. if inner_guard.valid_cnt > 0 {
  246. self.read_wait_queue
  247. .wakeup(Some(ProcessState::Blocked(true)));
  248. }
  249. //读完后解锁并唤醒等待在写等待队列中的进程
  250. self.write_wait_queue
  251. .wakeup(Some(ProcessState::Blocked(true)));
  252. let pollflag = EPollEventType::from_bits_truncate(inner_guard.poll(&data)? as u32);
  253. drop(inner_guard);
  254. // 唤醒epoll中等待的进程
  255. EventPoll::wakeup_epoll(&self.epitems, pollflag)?;
  256. //返回读取的字节数
  257. return Ok(num);
  258. }
  259. fn open(
  260. &self,
  261. mut data: SpinLockGuard<FilePrivateData>,
  262. mode: &crate::filesystem::vfs::file::FileMode,
  263. ) -> Result<(), SystemError> {
  264. let accmode = mode.accmode();
  265. let mut guard = self.inner.lock();
  266. // 不能以读写方式打开管道
  267. if accmode == FileMode::O_RDWR.bits() {
  268. return Err(SystemError::EACCES);
  269. } else if accmode == FileMode::O_RDONLY.bits() {
  270. guard.reader += 1;
  271. guard.had_reader = true;
  272. // println!(
  273. // "FIFO: pipe try open in read mode with reader pid:{:?}",
  274. // ProcessManager::current_pid()
  275. // );
  276. } else if accmode == FileMode::O_WRONLY.bits() {
  277. // println!(
  278. // "FIFO: pipe try open in write mode with {} reader, writer pid:{:?}",
  279. // guard.reader,
  280. // ProcessManager::current_pid()
  281. // );
  282. if guard.reader == 0 && mode.contains(FileMode::O_NONBLOCK) {
  283. return Err(SystemError::ENXIO);
  284. }
  285. guard.writer += 1;
  286. }
  287. // 设置mode
  288. *data = FilePrivateData::Pipefs(PipeFsPrivateData { mode: *mode });
  289. return Ok(());
  290. }
  291. fn metadata(&self) -> Result<crate::filesystem::vfs::Metadata, SystemError> {
  292. let inode = self.inner.lock();
  293. let mut metadata = inode.metadata.clone();
  294. metadata.size = inode.data.len() as i64;
  295. return Ok(metadata);
  296. }
  297. fn close(&self, data: SpinLockGuard<FilePrivateData>) -> Result<(), SystemError> {
  298. let mode: FileMode;
  299. if let FilePrivateData::Pipefs(pipe_data) = &*data {
  300. mode = pipe_data.mode;
  301. } else {
  302. return Err(SystemError::EBADF);
  303. }
  304. let accmode = mode.accmode();
  305. let mut guard = self.inner.lock();
  306. // 写端关闭
  307. if accmode == FileMode::O_WRONLY.bits() {
  308. assert!(guard.writer > 0);
  309. guard.writer -= 1;
  310. // 如果已经没有写端了,则唤醒读端
  311. if guard.writer == 0 {
  312. self.read_wait_queue
  313. .wakeup_all(Some(ProcessState::Blocked(true)));
  314. }
  315. }
  316. // 读端关闭
  317. if accmode == FileMode::O_RDONLY.bits() {
  318. assert!(guard.reader > 0);
  319. guard.reader -= 1;
  320. // 如果已经没有写端了,则唤醒读端
  321. if guard.reader == 0 {
  322. self.write_wait_queue
  323. .wakeup_all(Some(ProcessState::Blocked(true)));
  324. }
  325. }
  326. return Ok(());
  327. }
  328. fn write_at(
  329. &self,
  330. _offset: usize,
  331. len: usize,
  332. buf: &[u8],
  333. data: SpinLockGuard<FilePrivateData>,
  334. ) -> Result<usize, SystemError> {
  335. // 获取mode
  336. let mode: FileMode;
  337. if let FilePrivateData::Pipefs(pdata) = &*data {
  338. mode = pdata.mode;
  339. } else {
  340. return Err(SystemError::EBADF);
  341. }
  342. if buf.len() < len || len > PIPE_BUFF_SIZE {
  343. return Err(SystemError::EINVAL);
  344. }
  345. // 加锁
  346. let mut inner_guard = self.inner.lock();
  347. if inner_guard.reader == 0 {
  348. if !inner_guard.had_reader {
  349. // 如果从未有读端,直接返回 ENXIO,无论是否阻塞模式
  350. return Err(SystemError::ENXIO);
  351. } else {
  352. // 如果曾经有读端,现在已关闭
  353. match mode.contains(FileMode::O_NONBLOCK) {
  354. true => {
  355. // 非阻塞模式,直接返回 EPIPE
  356. return Err(SystemError::EPIPE);
  357. }
  358. false => {
  359. let sig = Signal::SIGPIPE;
  360. let mut info = SigInfo::new(
  361. sig,
  362. 0,
  363. SigCode::Kernel,
  364. SigType::Kill(ProcessManager::current_pcb().task_pid_vnr()),
  365. );
  366. compiler_fence(core::sync::atomic::Ordering::SeqCst);
  367. let _retval = sig
  368. .send_signal_info(
  369. Some(&mut info),
  370. ProcessManager::current_pcb().task_pid_vnr(),
  371. )
  372. .map(|x| x as usize);
  373. compiler_fence(core::sync::atomic::Ordering::SeqCst);
  374. return Err(SystemError::EPIPE);
  375. }
  376. }
  377. }
  378. }
  379. // 如果管道空间不够
  380. while len + inner_guard.valid_cnt as usize > PIPE_BUFF_SIZE {
  381. // 唤醒读端
  382. self.read_wait_queue
  383. .wakeup(Some(ProcessState::Blocked(true)));
  384. // 如果为非阻塞管道,直接返回错误
  385. if mode.contains(FileMode::O_NONBLOCK) {
  386. drop(inner_guard);
  387. return Err(SystemError::ENOMEM);
  388. }
  389. // 解锁并睡眠
  390. drop(inner_guard);
  391. let r = wq_wait_event_interruptible!(self.write_wait_queue, self.writeable(), {});
  392. if r.is_err() {
  393. return Err(SystemError::ERESTARTSYS);
  394. }
  395. inner_guard = self.inner.lock();
  396. }
  397. // 决定要输入的字节
  398. let start = inner_guard.write_pos as usize;
  399. let end = (inner_guard.write_pos as usize + len) % PIPE_BUFF_SIZE;
  400. // 从用户的缓冲区拷贝数据到管道
  401. if end < start {
  402. inner_guard.data[start..PIPE_BUFF_SIZE]
  403. .copy_from_slice(&buf[0..(PIPE_BUFF_SIZE - start)]);
  404. inner_guard.data[0..end].copy_from_slice(&buf[(PIPE_BUFF_SIZE - start)..len]);
  405. } else {
  406. inner_guard.data[start..end].copy_from_slice(&buf[0..len]);
  407. }
  408. // 更新写位置以及valid_cnt
  409. inner_guard.write_pos = (inner_guard.write_pos + len as i32) % PIPE_BUFF_SIZE as i32;
  410. inner_guard.valid_cnt += len as i32;
  411. // 写完后还有位置,则唤醒下一个写者
  412. if (inner_guard.valid_cnt as usize) < PIPE_BUFF_SIZE {
  413. self.write_wait_queue
  414. .wakeup(Some(ProcessState::Blocked(true)));
  415. }
  416. // 读完后解锁并唤醒等待在读等待队列中的进程
  417. self.read_wait_queue
  418. .wakeup(Some(ProcessState::Blocked(true)));
  419. let pollflag = EPollEventType::from_bits_truncate(inner_guard.poll(&data)? as u32);
  420. drop(inner_guard);
  421. // 唤醒epoll中等待的进程
  422. EventPoll::wakeup_epoll(&self.epitems, pollflag)?;
  423. // 返回写入的字节数
  424. return Ok(len);
  425. }
  426. fn as_any_ref(&self) -> &dyn core::any::Any {
  427. self
  428. }
  429. fn get_entry_name_and_metadata(
  430. &self,
  431. ino: crate::filesystem::vfs::InodeId,
  432. ) -> Result<(alloc::string::String, crate::filesystem::vfs::Metadata), SystemError> {
  433. // 如果有条件,请在文件系统中使用高效的方式实现本接口,而不是依赖这个低效率的默认实现。
  434. let name = self.get_entry_name(ino)?;
  435. let entry = self.find(&name)?;
  436. return Ok((name, entry.metadata()?));
  437. }
  438. fn fs(&self) -> Arc<(dyn FileSystem)> {
  439. todo!()
  440. }
  441. fn list(&self) -> Result<alloc::vec::Vec<alloc::string::String>, SystemError> {
  442. return Err(SystemError::ENOSYS);
  443. }
  444. fn as_pollable_inode(&self) -> Result<&dyn PollableInode, SystemError> {
  445. Ok(self)
  446. }
  447. }