pipe.rs 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502
  1. use core::sync::atomic::compiler_fence;
  2. use crate::{
  3. arch::ipc::signal::{SigCode, Signal},
  4. filesystem::vfs::{
  5. core::generate_inode_id, file::FileMode, syscall::ModeType, FilePrivateData, FileSystem,
  6. FileType, IndexNode, Metadata,
  7. },
  8. libs::{
  9. spinlock::{SpinLock, SpinLockGuard},
  10. wait_queue::WaitQueue,
  11. },
  12. net::event_poll::{EPollEventType, EPollItem, EventPoll},
  13. process::{ProcessFlags, ProcessManager, ProcessState},
  14. sched::SchedMode,
  15. time::PosixTimeSpec,
  16. };
  17. use alloc::{
  18. collections::LinkedList,
  19. sync::{Arc, Weak},
  20. vec::Vec,
  21. };
  22. use system_error::SystemError;
  23. use super::signal_types::{SigInfo, SigType};
  24. /// 我们设定pipe_buff的总大小为1024字节
  25. const PIPE_BUFF_SIZE: usize = 1024;
  26. #[derive(Debug, Clone)]
  27. pub struct PipeFsPrivateData {
  28. mode: FileMode,
  29. }
  30. impl PipeFsPrivateData {
  31. pub fn new(mode: FileMode) -> Self {
  32. return PipeFsPrivateData { mode };
  33. }
  34. pub fn set_mode(&mut self, mode: FileMode) {
  35. self.mode = mode;
  36. }
  37. }
  38. /// @brief 管道文件i节点(锁)
  39. #[derive(Debug)]
  40. pub struct LockedPipeInode {
  41. inner: SpinLock<InnerPipeInode>,
  42. read_wait_queue: WaitQueue,
  43. write_wait_queue: WaitQueue,
  44. epitems: SpinLock<LinkedList<Arc<EPollItem>>>,
  45. }
  46. /// @brief 管道文件i节点(无锁)
  47. #[derive(Debug)]
  48. pub struct InnerPipeInode {
  49. self_ref: Weak<LockedPipeInode>,
  50. /// 管道内可读的数据数
  51. valid_cnt: i32,
  52. read_pos: i32,
  53. write_pos: i32,
  54. data: [u8; PIPE_BUFF_SIZE],
  55. /// INode 元数据
  56. metadata: Metadata,
  57. reader: u32,
  58. writer: u32,
  59. had_reader: bool,
  60. }
  61. impl InnerPipeInode {
  62. pub fn poll(&self, private_data: &FilePrivateData) -> Result<usize, SystemError> {
  63. let mut events = EPollEventType::empty();
  64. let mode = if let FilePrivateData::Pipefs(PipeFsPrivateData { mode }) = private_data {
  65. mode
  66. } else {
  67. return Err(SystemError::EBADFD);
  68. };
  69. if mode.contains(FileMode::O_RDONLY) {
  70. if self.valid_cnt != 0 {
  71. // 有数据可读
  72. events.insert(EPollEventType::EPOLLIN | EPollEventType::EPOLLRDNORM);
  73. }
  74. // 没有写者
  75. if self.writer == 0 {
  76. events.insert(EPollEventType::EPOLLHUP)
  77. }
  78. }
  79. if mode.contains(FileMode::O_WRONLY) {
  80. // 管道内数据未满
  81. if self.valid_cnt as usize != PIPE_BUFF_SIZE {
  82. events.insert(EPollEventType::EPOLLOUT | EPollEventType::EPOLLWRNORM);
  83. }
  84. // 没有读者
  85. if self.reader == 0 {
  86. events.insert(EPollEventType::EPOLLERR);
  87. }
  88. }
  89. Ok(events.bits() as usize)
  90. }
  91. fn buf_full(&self) -> bool {
  92. return self.valid_cnt as usize == PIPE_BUFF_SIZE;
  93. }
  94. }
  95. impl LockedPipeInode {
  96. pub fn new() -> Arc<Self> {
  97. let inner = InnerPipeInode {
  98. self_ref: Weak::default(),
  99. valid_cnt: 0,
  100. read_pos: 0,
  101. write_pos: 0,
  102. had_reader: false,
  103. data: [0; PIPE_BUFF_SIZE],
  104. metadata: Metadata {
  105. dev_id: 0,
  106. inode_id: generate_inode_id(),
  107. size: PIPE_BUFF_SIZE as i64,
  108. blk_size: 0,
  109. blocks: 0,
  110. atime: PosixTimeSpec::default(),
  111. mtime: PosixTimeSpec::default(),
  112. ctime: PosixTimeSpec::default(),
  113. file_type: FileType::Pipe,
  114. mode: ModeType::from_bits_truncate(0o666),
  115. nlinks: 1,
  116. uid: 0,
  117. gid: 0,
  118. raw_dev: Default::default(),
  119. },
  120. reader: 0,
  121. writer: 0,
  122. };
  123. let result = Arc::new(Self {
  124. inner: SpinLock::new(inner),
  125. read_wait_queue: WaitQueue::default(),
  126. write_wait_queue: WaitQueue::default(),
  127. epitems: SpinLock::new(LinkedList::new()),
  128. });
  129. let mut guard = result.inner.lock();
  130. guard.self_ref = Arc::downgrade(&result);
  131. // 释放锁
  132. drop(guard); //这一步其实不需要,只要离开作用域,guard生命周期结束,自会解锁
  133. return result;
  134. }
  135. pub fn inner(&self) -> &SpinLock<InnerPipeInode> {
  136. &self.inner
  137. }
  138. fn readable(&self) -> bool {
  139. let inode = self.inner.lock();
  140. return inode.valid_cnt > 0 || inode.writer == 0;
  141. }
  142. fn writeable(&self) -> bool {
  143. let inode = self.inner.lock();
  144. return !inode.buf_full() || inode.reader == 0;
  145. }
  146. pub fn add_epoll(&self, epitem: Arc<EPollItem>) -> Result<(), SystemError> {
  147. self.epitems.lock().push_back(epitem);
  148. Ok(())
  149. }
  150. pub fn remove_epoll(&self, epoll: &Weak<SpinLock<EventPoll>>) -> Result<(), SystemError> {
  151. let is_remove = !self
  152. .epitems
  153. .lock_irqsave()
  154. .extract_if(|x| x.epoll().ptr_eq(epoll))
  155. .collect::<Vec<_>>()
  156. .is_empty();
  157. if is_remove {
  158. return Ok(());
  159. }
  160. Err(SystemError::ENOENT)
  161. }
  162. }
  163. impl IndexNode for LockedPipeInode {
  164. fn read_at(
  165. &self,
  166. _offset: usize,
  167. len: usize,
  168. buf: &mut [u8],
  169. data_guard: SpinLockGuard<FilePrivateData>,
  170. ) -> Result<usize, SystemError> {
  171. let data = data_guard.clone();
  172. drop(data_guard);
  173. // 获取mode
  174. let mode: FileMode;
  175. if let FilePrivateData::Pipefs(pdata) = &data {
  176. mode = pdata.mode;
  177. } else {
  178. return Err(SystemError::EBADF);
  179. }
  180. if buf.len() < len {
  181. return Err(SystemError::EINVAL);
  182. }
  183. // log::debug!("pipe mode: {:?}", mode);
  184. // 加锁
  185. let mut inner_guard = self.inner.lock();
  186. // 如果管道里面没有数据,则唤醒写端,
  187. while inner_guard.valid_cnt == 0 {
  188. // 如果当前管道写者数为0,则返回EOF
  189. if inner_guard.writer == 0 {
  190. return Ok(0);
  191. }
  192. self.write_wait_queue
  193. .wakeup(Some(ProcessState::Blocked(true)));
  194. // 如果为非阻塞管道,直接返回错误
  195. if mode.contains(FileMode::O_NONBLOCK) {
  196. drop(inner_guard);
  197. return Err(SystemError::EAGAIN_OR_EWOULDBLOCK);
  198. }
  199. // 否则在读等待队列中睡眠,并释放锁
  200. drop(inner_guard);
  201. let r = wq_wait_event_interruptible!(self.read_wait_queue, self.readable(), {});
  202. if r.is_err() {
  203. ProcessManager::current_pcb()
  204. .flags()
  205. .insert(ProcessFlags::HAS_PENDING_SIGNAL);
  206. return Err(SystemError::ERESTARTSYS);
  207. }
  208. inner_guard = self.inner.lock();
  209. }
  210. let mut num = inner_guard.valid_cnt as usize;
  211. //决定要输出的字节
  212. let start = inner_guard.read_pos as usize;
  213. //如果读端希望读取的字节数大于有效字节数,则输出有效字节
  214. let mut end =
  215. (inner_guard.valid_cnt as usize + inner_guard.read_pos as usize) % PIPE_BUFF_SIZE;
  216. //如果读端希望读取的字节数少于有效字节数,则输出希望读取的字节
  217. if len < inner_guard.valid_cnt as usize {
  218. end = (len + inner_guard.read_pos as usize) % PIPE_BUFF_SIZE;
  219. num = len;
  220. }
  221. // 从管道拷贝数据到用户的缓冲区
  222. if end < start {
  223. buf[0..(PIPE_BUFF_SIZE - start)]
  224. .copy_from_slice(&inner_guard.data[start..PIPE_BUFF_SIZE]);
  225. buf[(PIPE_BUFF_SIZE - start)..num].copy_from_slice(&inner_guard.data[0..end]);
  226. } else {
  227. buf[0..num].copy_from_slice(&inner_guard.data[start..end]);
  228. }
  229. //更新读位置以及valid_cnt
  230. inner_guard.read_pos = (inner_guard.read_pos + num as i32) % PIPE_BUFF_SIZE as i32;
  231. inner_guard.valid_cnt -= num as i32;
  232. // 读完以后如果未读完,则唤醒下一个读者
  233. if inner_guard.valid_cnt > 0 {
  234. self.read_wait_queue
  235. .wakeup(Some(ProcessState::Blocked(true)));
  236. }
  237. //读完后解锁并唤醒等待在写等待队列中的进程
  238. self.write_wait_queue
  239. .wakeup(Some(ProcessState::Blocked(true)));
  240. let pollflag = EPollEventType::from_bits_truncate(inner_guard.poll(&data)? as u32);
  241. drop(inner_guard);
  242. // 唤醒epoll中等待的进程
  243. EventPoll::wakeup_epoll(&self.epitems, Some(pollflag))?;
  244. //返回读取的字节数
  245. return Ok(num);
  246. }
  247. fn open(
  248. &self,
  249. mut data: SpinLockGuard<FilePrivateData>,
  250. mode: &crate::filesystem::vfs::file::FileMode,
  251. ) -> Result<(), SystemError> {
  252. let accmode = mode.accmode();
  253. let mut guard = self.inner.lock();
  254. // 不能以读写方式打开管道
  255. if accmode == FileMode::O_RDWR.bits() {
  256. return Err(SystemError::EACCES);
  257. } else if accmode == FileMode::O_RDONLY.bits() {
  258. guard.reader += 1;
  259. guard.had_reader = true;
  260. // println!(
  261. // "FIFO: pipe try open in read mode with reader pid:{:?}",
  262. // ProcessManager::current_pid()
  263. // );
  264. } else if accmode == FileMode::O_WRONLY.bits() {
  265. // println!(
  266. // "FIFO: pipe try open in write mode with {} reader, writer pid:{:?}",
  267. // guard.reader,
  268. // ProcessManager::current_pid()
  269. // );
  270. if guard.reader == 0 && mode.contains(FileMode::O_NONBLOCK) {
  271. return Err(SystemError::ENXIO);
  272. }
  273. guard.writer += 1;
  274. }
  275. // 设置mode
  276. *data = FilePrivateData::Pipefs(PipeFsPrivateData { mode: *mode });
  277. return Ok(());
  278. }
  279. fn metadata(&self) -> Result<crate::filesystem::vfs::Metadata, SystemError> {
  280. let inode = self.inner.lock();
  281. let mut metadata = inode.metadata.clone();
  282. metadata.size = inode.data.len() as i64;
  283. return Ok(metadata);
  284. }
  285. fn close(&self, data: SpinLockGuard<FilePrivateData>) -> Result<(), SystemError> {
  286. let mode: FileMode;
  287. if let FilePrivateData::Pipefs(pipe_data) = &*data {
  288. mode = pipe_data.mode;
  289. } else {
  290. return Err(SystemError::EBADF);
  291. }
  292. let accmode = mode.accmode();
  293. let mut guard = self.inner.lock();
  294. // 写端关闭
  295. if accmode == FileMode::O_WRONLY.bits() {
  296. assert!(guard.writer > 0);
  297. guard.writer -= 1;
  298. // 如果已经没有写端了,则唤醒读端
  299. if guard.writer == 0 {
  300. self.read_wait_queue
  301. .wakeup_all(Some(ProcessState::Blocked(true)));
  302. }
  303. }
  304. // 读端关闭
  305. if accmode == FileMode::O_RDONLY.bits() {
  306. assert!(guard.reader > 0);
  307. guard.reader -= 1;
  308. // 如果已经没有写端了,则唤醒读端
  309. if guard.reader == 0 {
  310. self.write_wait_queue
  311. .wakeup_all(Some(ProcessState::Blocked(true)));
  312. }
  313. }
  314. return Ok(());
  315. }
  316. fn write_at(
  317. &self,
  318. _offset: usize,
  319. len: usize,
  320. buf: &[u8],
  321. data: SpinLockGuard<FilePrivateData>,
  322. ) -> Result<usize, SystemError> {
  323. // 获取mode
  324. let mode: FileMode;
  325. if let FilePrivateData::Pipefs(pdata) = &*data {
  326. mode = pdata.mode;
  327. } else {
  328. return Err(SystemError::EBADF);
  329. }
  330. if buf.len() < len || len > PIPE_BUFF_SIZE {
  331. return Err(SystemError::EINVAL);
  332. }
  333. // 加锁
  334. let mut inner_guard = self.inner.lock();
  335. if inner_guard.reader == 0 {
  336. if !inner_guard.had_reader {
  337. // 如果从未有读端,直接返回 ENXIO,无论是否阻塞模式
  338. return Err(SystemError::ENXIO);
  339. } else {
  340. // 如果曾经有读端,现在已关闭
  341. match mode.contains(FileMode::O_NONBLOCK) {
  342. true => {
  343. // 非阻塞模式,直接返回 EPIPE
  344. return Err(SystemError::EPIPE);
  345. }
  346. false => {
  347. let sig = Signal::SIGPIPE;
  348. let mut info = SigInfo::new(
  349. sig,
  350. 0,
  351. SigCode::Kernel,
  352. SigType::Kill(ProcessManager::current_pid()),
  353. );
  354. compiler_fence(core::sync::atomic::Ordering::SeqCst);
  355. let _retval = sig
  356. .send_signal_info(Some(&mut info), ProcessManager::current_pid())
  357. .map(|x| x as usize);
  358. compiler_fence(core::sync::atomic::Ordering::SeqCst);
  359. return Err(SystemError::EPIPE);
  360. }
  361. }
  362. }
  363. }
  364. // 如果管道空间不够
  365. while len + inner_guard.valid_cnt as usize > PIPE_BUFF_SIZE {
  366. // 唤醒读端
  367. self.read_wait_queue
  368. .wakeup(Some(ProcessState::Blocked(true)));
  369. // 如果为非阻塞管道,直接返回错误
  370. if mode.contains(FileMode::O_NONBLOCK) {
  371. drop(inner_guard);
  372. return Err(SystemError::ENOMEM);
  373. }
  374. // 解锁并睡眠
  375. drop(inner_guard);
  376. let r = wq_wait_event_interruptible!(self.write_wait_queue, self.writeable(), {});
  377. if r.is_err() {
  378. return Err(SystemError::ERESTARTSYS);
  379. }
  380. inner_guard = self.inner.lock();
  381. }
  382. // 决定要输入的字节
  383. let start = inner_guard.write_pos as usize;
  384. let end = (inner_guard.write_pos as usize + len) % PIPE_BUFF_SIZE;
  385. // 从用户的缓冲区拷贝数据到管道
  386. if end < start {
  387. inner_guard.data[start..PIPE_BUFF_SIZE]
  388. .copy_from_slice(&buf[0..(PIPE_BUFF_SIZE - start)]);
  389. inner_guard.data[0..end].copy_from_slice(&buf[(PIPE_BUFF_SIZE - start)..len]);
  390. } else {
  391. inner_guard.data[start..end].copy_from_slice(&buf[0..len]);
  392. }
  393. // 更新写位置以及valid_cnt
  394. inner_guard.write_pos = (inner_guard.write_pos + len as i32) % PIPE_BUFF_SIZE as i32;
  395. inner_guard.valid_cnt += len as i32;
  396. // 写完后还有位置,则唤醒下一个写者
  397. if (inner_guard.valid_cnt as usize) < PIPE_BUFF_SIZE {
  398. self.write_wait_queue
  399. .wakeup(Some(ProcessState::Blocked(true)));
  400. }
  401. // 读完后解锁并唤醒等待在读等待队列中的进程
  402. self.read_wait_queue
  403. .wakeup(Some(ProcessState::Blocked(true)));
  404. let pollflag = EPollEventType::from_bits_truncate(inner_guard.poll(&data)? as u32);
  405. drop(inner_guard);
  406. // 唤醒epoll中等待的进程
  407. EventPoll::wakeup_epoll(&self.epitems, Some(pollflag))?;
  408. // 返回写入的字节数
  409. return Ok(len);
  410. }
  411. fn as_any_ref(&self) -> &dyn core::any::Any {
  412. self
  413. }
  414. fn get_entry_name_and_metadata(
  415. &self,
  416. ino: crate::filesystem::vfs::InodeId,
  417. ) -> Result<(alloc::string::String, crate::filesystem::vfs::Metadata), SystemError> {
  418. // 如果有条件,请在文件系统中使用高效的方式实现本接口,而不是依赖这个低效率的默认实现。
  419. let name = self.get_entry_name(ino)?;
  420. let entry = self.find(&name)?;
  421. return Ok((name, entry.metadata()?));
  422. }
  423. fn fs(&self) -> Arc<(dyn FileSystem)> {
  424. todo!()
  425. }
  426. fn list(&self) -> Result<alloc::vec::Vec<alloc::string::String>, SystemError> {
  427. return Err(SystemError::ENOSYS);
  428. }
  429. fn poll(&self, private_data: &FilePrivateData) -> Result<usize, SystemError> {
  430. return self.inner.lock().poll(private_data);
  431. }
  432. }