Browse Source

修正pipe逻辑,将pipe接入epoll。 (#478)

GnoCiYeH 1 year ago
parent
commit
5e948c5650

+ 15 - 2
kernel/src/filesystem/vfs/file.rs

@@ -11,7 +11,7 @@ use crate::{
         tty::TtyFilePrivateData,
     },
     filesystem::procfs::ProcfsFilePrivateData,
-    ipc::pipe::PipeFsPrivateData,
+    ipc::pipe::{LockedPipeInode, PipeFsPrivateData},
     kerror,
     libs::spinlock::SpinLock,
     net::{
@@ -47,6 +47,14 @@ impl Default for FilePrivateData {
     }
 }
 
+impl FilePrivateData {
+    pub fn update_mode(&mut self, mode: FileMode) {
+        if let FilePrivateData::Pipefs(pdata) = self {
+            pdata.set_mode(mode);
+        }
+    }
+}
+
 bitflags! {
     /// @brief 文件打开模式
     /// 其中,低2bit组合而成的数字的值,用于表示访问权限。其他的bit,才支持通过按位或的方式来表示参数
@@ -388,6 +396,7 @@ impl File {
 
         // 直接修改文件的打开模式
         self.mode = mode;
+        self.private_data.update_mode(mode);
         return Ok(());
     }
 
@@ -418,6 +427,10 @@ impl File {
 
                 return socket.add_epoll(epitem);
             }
+            FileType::Pipe => {
+                let inode = self.inode.downcast_ref::<LockedPipeInode>().unwrap();
+                return inode.inner().lock().add_epoll(epitem);
+            }
             _ => return Err(SystemError::EOPNOTSUPP_OR_ENOTSUP),
         }
     }
@@ -436,7 +449,7 @@ impl File {
     }
 
     pub fn poll(&self) -> Result<usize, SystemError> {
-        self.inode.poll()
+        self.inode.poll(&self.private_data)
     }
 }
 

+ 1 - 1
kernel/src/filesystem/vfs/mod.rs

@@ -166,7 +166,7 @@ pub trait IndexNode: Any + Sync + Send + Debug {
     /// @brief 获取当前inode的状态。
     ///
     /// @return PollStatus结构体
-    fn poll(&self) -> Result<usize, SystemError> {
+    fn poll(&self, _private_data: &FilePrivateData) -> Result<usize, SystemError> {
         // 若文件系统没有实现此方法,则返回“不支持”
         return Err(SystemError::EOPNOTSUPP_OR_ENOTSUP);
     }

+ 87 - 1
kernel/src/ipc/pipe.rs

@@ -6,11 +6,15 @@ use crate::{
         FileType, IndexNode, Metadata,
     },
     libs::{spinlock::SpinLock, wait_queue::WaitQueue},
+    net::event_poll::{EPollEventType, EPollItem, EventPoll},
     process::ProcessState,
     time::TimeSpec,
 };
 
-use alloc::sync::{Arc, Weak};
+use alloc::{
+    collections::LinkedList,
+    sync::{Arc, Weak},
+};
 use system_error::SystemError;
 
 /// 我们设定pipe_buff的总大小为1024字节
@@ -25,6 +29,10 @@ impl PipeFsPrivateData {
     pub fn new(mode: FileMode) -> Self {
         return PipeFsPrivateData { mode: mode };
     }
+
+    pub fn set_mode(&mut self, mode: FileMode) {
+        self.mode = mode;
+    }
 }
 
 /// @brief 管道文件i节点(锁)
@@ -35,6 +43,7 @@ pub struct LockedPipeInode(SpinLock<InnerPipeInode>);
 #[derive(Debug)]
 pub struct InnerPipeInode {
     self_ref: Weak<LockedPipeInode>,
+    /// 管道内可读的数据数
     valid_cnt: i32,
     read_pos: i32,
     write_pos: i32,
@@ -45,6 +54,50 @@ pub struct InnerPipeInode {
     metadata: Metadata,
     reader: u32,
     writer: u32,
+    epitems: SpinLock<LinkedList<Arc<EPollItem>>>,
+}
+
+impl InnerPipeInode {
+    pub fn poll(&self, private_data: &FilePrivateData) -> Result<usize, SystemError> {
+        let mut events = EPollEventType::empty();
+
+        let mode = if let FilePrivateData::Pipefs(PipeFsPrivateData { mode }) = private_data {
+            mode
+        } else {
+            return Err(SystemError::EBADFD);
+        };
+
+        if mode.contains(FileMode::O_RDONLY) {
+            if self.valid_cnt != 0 {
+                // 有数据可读
+                events.insert(EPollEventType::EPOLLIN & EPollEventType::EPOLLRDNORM);
+            }
+
+            // 没有写者
+            if self.writer == 0 {
+                events.insert(EPollEventType::EPOLLHUP)
+            }
+        }
+
+        if mode.contains(FileMode::O_WRONLY) {
+            // 管道内数据未满
+            if self.valid_cnt as usize != PIPE_BUFF_SIZE {
+                events.insert(EPollEventType::EPOLLIN & EPollEventType::EPOLLWRNORM);
+            }
+
+            // 没有读者
+            if self.reader == 0 {
+                events.insert(EPollEventType::EPOLLERR);
+            }
+        }
+
+        Ok(events.bits() as usize)
+    }
+
+    pub fn add_epoll(&mut self, epitem: Arc<EPollItem>) -> Result<(), SystemError> {
+        self.epitems.lock().push_back(epitem);
+        Ok(())
+    }
 }
 
 impl LockedPipeInode {
@@ -76,6 +129,7 @@ impl LockedPipeInode {
             },
             reader: 0,
             writer: 0,
+            epitems: SpinLock::new(LinkedList::new()),
         };
         let result = Arc::new(Self(SpinLock::new(inner)));
         let mut guard = result.0.lock();
@@ -84,6 +138,10 @@ impl LockedPipeInode {
         drop(guard); //这一步其实不需要,只要离开作用域,guard生命周期结束,自会解锁
         return result;
     }
+
+    pub fn inner(&self) -> &SpinLock<InnerPipeInode> {
+        &self.0
+    }
 }
 
 impl IndexNode for LockedPipeInode {
@@ -162,10 +220,22 @@ impl IndexNode for LockedPipeInode {
         inode.read_pos = (inode.read_pos + num as i32) % PIPE_BUFF_SIZE as i32;
         inode.valid_cnt -= num as i32;
 
+        // 读完以后如果未读完,则唤醒下一个读者
+        if inode.valid_cnt > 0 {
+            inode
+                .read_wait_queue
+                .wakeup(Some(ProcessState::Blocked(true)));
+        }
+
         //读完后解锁并唤醒等待在写等待队列中的进程
         inode
             .write_wait_queue
             .wakeup(Some(ProcessState::Blocked(true)));
+
+        let pollflag = EPollEventType::from_bits_truncate(inode.poll(&data)? as u32);
+        // 唤醒epoll中等待的进程
+        EventPoll::wakeup_epoll(&mut inode.epitems, pollflag)?;
+
         //返回读取的字节数
         return Ok(num);
     }
@@ -302,10 +372,22 @@ impl IndexNode for LockedPipeInode {
         inode.write_pos = (inode.write_pos + len as i32) % PIPE_BUFF_SIZE as i32;
         inode.valid_cnt += len as i32;
 
+        // 写完后还有位置,则唤醒下一个写者
+        if (inode.valid_cnt as usize) < PIPE_BUFF_SIZE {
+            inode
+                .write_wait_queue
+                .wakeup(Some(ProcessState::Blocked(true)));
+        }
+
         // 读完后解锁并唤醒等待在读等待队列中的进程
         inode
             .read_wait_queue
             .wakeup(Some(ProcessState::Blocked(true)));
+
+        let pollflag = EPollEventType::from_bits_truncate(inode.poll(&data)? as u32);
+        // 唤醒epoll中等待的进程
+        EventPoll::wakeup_epoll(&mut inode.epitems, pollflag)?;
+
         // 返回写入的字节数
         return Ok(len);
     }
@@ -331,4 +413,8 @@ impl IndexNode for LockedPipeInode {
     fn list(&self) -> Result<alloc::vec::Vec<alloc::string::String>, SystemError> {
         return Err(SystemError::EOPNOTSUPP_OR_ENOTSUP);
     }
+
+    fn poll(&self, private_data: &FilePrivateData) -> Result<usize, SystemError> {
+        return self.0.lock().poll(private_data);
+    }
 }

+ 38 - 29
kernel/src/ipc/syscall.rs

@@ -33,37 +33,46 @@ impl Syscall {
     /// - `fd`: 用于返回文件描述符的数组
     /// - `flags`:设置管道的参数
     pub fn pipe2(fd: *mut i32, flags: FileMode) -> Result<usize, SystemError> {
-        if flags.contains(FileMode::O_NONBLOCK)
-            || flags.contains(FileMode::O_CLOEXEC)
-            || flags.contains(FileMode::O_RDONLY)
+        if !flags
+            .difference(FileMode::O_CLOEXEC | FileMode::O_NONBLOCK | FileMode::O_DIRECT)
+            .is_empty()
         {
-            let mut user_buffer =
-                UserBufferWriter::new(fd, core::mem::size_of::<[c_int; 2]>(), true)?;
-            let fd = user_buffer.buffer::<i32>(0)?;
-            let pipe_ptr = LockedPipeInode::new();
-            let mut read_file = File::new(pipe_ptr.clone(), FileMode::O_RDONLY)?;
-            read_file.private_data =
-                FilePrivateData::Pipefs(PipeFsPrivateData::new(FileMode::O_RDONLY));
-            let mut write_file = File::new(pipe_ptr.clone(), FileMode::O_WRONLY)?;
-            write_file.private_data =
-                FilePrivateData::Pipefs(PipeFsPrivateData::new(FileMode::O_WRONLY));
-            if flags.contains(FileMode::O_CLOEXEC) {
-                read_file.set_close_on_exec(true);
-                write_file.set_close_on_exec(true);
-            }
-            let fd_table_ptr = ProcessManager::current_pcb().fd_table();
-            let mut fd_table_guard = fd_table_ptr.write();
-            let read_fd = fd_table_guard.alloc_fd(read_file, None)?;
-            let write_fd = fd_table_guard.alloc_fd(write_file, None)?;
-
-            drop(fd_table_guard);
-
-            fd[0] = read_fd;
-            fd[1] = write_fd;
-            Ok(0)
-        } else {
-            Err(SystemError::EINVAL)
+            return Err(SystemError::EINVAL);
+        }
+
+        let mut user_buffer = UserBufferWriter::new(fd, core::mem::size_of::<[c_int; 2]>(), true)?;
+        let fd = user_buffer.buffer::<i32>(0)?;
+        let pipe_ptr = LockedPipeInode::new();
+
+        let mut read_file = File::new(
+            pipe_ptr.clone(),
+            FileMode::O_RDONLY | (flags & FileMode::O_NONBLOCK),
+        )?;
+        read_file.private_data =
+            FilePrivateData::Pipefs(PipeFsPrivateData::new(FileMode::O_RDONLY));
+
+        let mut write_file = File::new(
+            pipe_ptr.clone(),
+            FileMode::O_WRONLY | (flags & (FileMode::O_NONBLOCK | FileMode::O_DIRECT)),
+        )?;
+        write_file.private_data = FilePrivateData::Pipefs(PipeFsPrivateData::new(
+            FileMode::O_WRONLY | (flags & (FileMode::O_NONBLOCK | FileMode::O_DIRECT)),
+        ));
+
+        if flags.contains(FileMode::O_CLOEXEC) {
+            read_file.set_close_on_exec(true);
+            write_file.set_close_on_exec(true);
         }
+        let fd_table_ptr = ProcessManager::current_pcb().fd_table();
+        let mut fd_table_guard = fd_table_ptr.write();
+        let read_fd = fd_table_guard.alloc_fd(read_file, None)?;
+        let write_fd = fd_table_guard.alloc_fd(write_file, None)?;
+
+        drop(fd_table_guard);
+
+        fd[0] = read_fd;
+        fd[1] = write_fd;
+        Ok(0)
     }
 
     pub fn kill(pid: Pid, sig: c_int) -> Result<usize, SystemError> {

+ 43 - 1
kernel/src/net/event_poll/mod.rs

@@ -161,7 +161,7 @@ impl IndexNode for EPollInode {
         Err(SystemError::ENOSYS)
     }
 
-    fn poll(&self) -> Result<usize, SystemError> {
+    fn poll(&self, _private_data: &FilePrivateData) -> Result<usize, SystemError> {
         // 需要实现epoll嵌套epoll时,需要实现这里
         todo!()
     }
@@ -704,6 +704,48 @@ impl EventPoll {
     pub fn ep_wake_one(&self) {
         self.epoll_wq.wakeup(None);
     }
+
+    /// ### epoll的回调,支持epoll的文件有事件到来时直接调用该方法即可
+    pub fn wakeup_epoll(
+        epitems: &mut SpinLock<LinkedList<Arc<EPollItem>>>,
+        pollflags: EPollEventType,
+    ) -> Result<(), SystemError> {
+        let mut epitems_guard = epitems.try_lock_irqsave()?;
+        // 一次只取一个,因为一次也只有一个进程能拿到对应文件的🔓
+        if let Some(epitem) = epitems_guard.pop_front() {
+            let epoll = epitem.epoll().upgrade().unwrap();
+            let mut epoll_guard = epoll.try_lock()?;
+            let binding = epitem.clone();
+            let event_guard = binding.event().read();
+            let ep_events = EPollEventType::from_bits_truncate(event_guard.events());
+
+            // 检查事件合理性以及是否有感兴趣的事件
+            if !(ep_events
+                .difference(EPollEventType::EP_PRIVATE_BITS)
+                .is_empty()
+                || pollflags.difference(ep_events).is_empty())
+            {
+                // TODO: 未处理pm相关
+
+                // 首先将就绪的epitem加入等待队列
+                epoll_guard.ep_add_ready(epitem.clone());
+
+                if epoll_guard.ep_has_waiter() {
+                    if ep_events.contains(EPollEventType::EPOLLEXCLUSIVE)
+                        && !pollflags.contains(EPollEventType::POLLFREE)
+                    {
+                        // 避免惊群
+                        epoll_guard.ep_wake_one();
+                    } else {
+                        epoll_guard.ep_wake_all();
+                    }
+                }
+            }
+
+            epitems_guard.push_back(epitem);
+        }
+        Ok(())
+    }
 }
 
 /// 与C兼容的Epoll事件结构体

+ 7 - 5
kernel/src/net/mod.rs

@@ -231,11 +231,13 @@ pub trait Socket: Sync + Send + Debug {
 
         for epitem in handle_item.epitems.lock_irqsave().iter() {
             let epoll = epitem.epoll();
-            let _ = EventPoll::ep_remove(
-                &mut epoll.upgrade().unwrap().lock_irqsave(),
-                epitem.fd(),
-                None,
-            );
+            if epoll.upgrade().is_some() {
+                let _ = EventPoll::ep_remove(
+                    &mut epoll.upgrade().unwrap().lock_irqsave(),
+                    epitem.fd(),
+                    None,
+                );
+            }
         }
 
         Ok(())

+ 8 - 48
kernel/src/net/net_core.rs

@@ -1,5 +1,5 @@
 use alloc::{boxed::Box, collections::BTreeMap, sync::Arc};
-use smoltcp::{iface::SocketHandle, socket::dhcpv4, wire};
+use smoltcp::{socket::dhcpv4, wire};
 use system_error::SystemError;
 
 use crate::{
@@ -11,7 +11,7 @@ use crate::{
 };
 
 use super::{
-    event_poll::EPollEventType,
+    event_poll::{EPollEventType, EventPoll},
     socket::{TcpSocket, HANDLE_MAP, SOCKET_SET},
 };
 
@@ -228,7 +228,12 @@ fn send_event(sockets: &smoltcp::iface::SocketSet) -> Result<(), SystemError> {
             smoltcp::socket::Socket::Dns(_) => unimplemented!("Dns socket hasn't unimplemented"),
         }
         drop(handle_guard);
-        wakeup_epoll(handle, events as u32)?;
+        let mut handle_guard = HANDLE_MAP.write_irqsave();
+        let handle_item = handle_guard.get_mut(&handle).unwrap();
+        EventPoll::wakeup_epoll(
+            &mut handle_item.epitems,
+            EPollEventType::from_bits_truncate(events as u32),
+        )?;
         // crate::kdebug!(
         //     "{} send_event {:?}",
         //     handle,
@@ -237,48 +242,3 @@ fn send_event(sockets: &smoltcp::iface::SocketSet) -> Result<(), SystemError> {
     }
     Ok(())
 }
-
-/// ### 处理epoll
-fn wakeup_epoll(handle: SocketHandle, events: u32) -> Result<(), SystemError> {
-    let mut handle_guard = HANDLE_MAP.write_irqsave();
-    let handle_item = handle_guard.get_mut(&handle).unwrap();
-    let mut epitems_guard = handle_item.epitems.try_lock_irqsave()?;
-
-    // 从events拿到epoll相关事件
-    let pollflags = EPollEventType::from_bits_truncate(events);
-
-    // 一次只取一个,因为一次也只有一个进程能拿到对应文件的🔓
-    if let Some(epitem) = epitems_guard.pop_front() {
-        let epoll = epitem.epoll().upgrade().unwrap();
-        let mut epoll_guard = epoll.try_lock_irqsave()?;
-        let binding = epitem.clone();
-        let event_guard = binding.event().read_irqsave();
-        let ep_events = EPollEventType::from_bits_truncate(event_guard.events());
-
-        // 检查事件合理性以及是否有感兴趣的事件
-        if !(ep_events
-            .difference(EPollEventType::EP_PRIVATE_BITS)
-            .is_empty()
-            || pollflags.difference(ep_events).is_empty())
-        {
-            // TODO: 未处理pm相关
-
-            // 首先将就绪的epitem加入等待队列
-            epoll_guard.ep_add_ready(epitem.clone());
-
-            if epoll_guard.ep_has_waiter() {
-                if ep_events.contains(EPollEventType::EPOLLEXCLUSIVE)
-                    && !pollflags.contains(EPollEventType::POLLFREE)
-                {
-                    // 避免惊群
-                    epoll_guard.ep_wake_one();
-                } else {
-                    epoll_guard.ep_wake_all();
-                }
-            }
-        }
-
-        epitems_guard.push_back(epitem);
-    }
-    Ok(())
-}

+ 2 - 2
kernel/src/net/socket.rs

@@ -20,7 +20,7 @@ use system_error::SystemError;
 use crate::{
     arch::{rand::rand, sched::sched},
     driver::net::NetDriver,
-    filesystem::vfs::{syscall::ModeType, FileType, IndexNode, Metadata},
+    filesystem::vfs::{syscall::ModeType, FilePrivateData, FileType, IndexNode, Metadata},
     kerror, kwarn,
     libs::{
         rwlock::{RwLock, RwLockReadGuard, RwLockWriteGuard},
@@ -1351,7 +1351,7 @@ impl IndexNode for SocketInode {
         return self.0.lock_no_preempt().write(&buf[0..len], None);
     }
 
-    fn poll(&self) -> Result<usize, SystemError> {
+    fn poll(&self, _private_data: &FilePrivateData) -> Result<usize, SystemError> {
         let events = self.0.lock_irqsave().poll();
         return Ok(events.bits() as usize);
     }