Browse Source

匿名管道重构&增加IrqArch trait以及IrqFlags及其守卫 (#253)

* 实现匿名管道

* 增加IrqArch trait以及IrqFlags及其守卫

---------

Co-authored-by: longjin <[email protected]>
hanjiezhou 1 year ago
parent
commit
f678331a33

+ 3 - 2
.vscode/settings.json

@@ -173,9 +173,10 @@
         "user_namespace.h": "c",
         "sleep.h": "c",
         "net.h": "c",
-        "lz4.h": "c"
+        "lz4.h": "c",
+        "cmd_test.h": "c"
     },
-    "C_Cpp.errorSquiggles": "Enabled",
+    "C_Cpp.errorSquiggles": "enabled",
     "esbonio.sphinx.confDir": "",
     "rust-analyzer.cargo.target": "x86_64-unknown-none",
     "rust-analyzer.checkOnSave.allTargets": false,

+ 44 - 1
kernel/src/arch/x86_64/interrupt/mod.rs

@@ -1,5 +1,12 @@
 #![allow(dead_code)]
-use core::arch::asm;
+use core::{
+    arch::asm,
+    sync::atomic::{compiler_fence, Ordering},
+};
+
+use crate::exception::{InterruptArch, IrqFlags, IrqFlagsGuard};
+
+use super::asm::irqflags::{local_irq_restore, local_irq_save};
 
 /// @brief 关闭中断
 #[inline]
@@ -16,3 +23,39 @@ pub fn sti() {
         asm!("sti");
     }
 }
+
+pub struct X86_64InterruptArch;
+
+impl InterruptArch for X86_64InterruptArch {
+    unsafe fn interrupt_enable() {
+        sti();
+    }
+
+    unsafe fn interrupt_disable() {
+        cli();
+    }
+
+    fn is_irq_enabled() -> bool {
+        let rflags: u64;
+        unsafe {
+            asm!("pushfq; pop {}", out(reg) rflags);
+        }
+        return rflags & (1 << 9) != 0;
+    }
+
+    unsafe fn save_and_disable_irq() -> IrqFlagsGuard {
+        compiler_fence(Ordering::SeqCst);
+        let mut rflags: u64 = 0;
+        local_irq_save(&mut rflags);
+        let flags = IrqFlags::new(rflags);
+        let guard = IrqFlagsGuard::new(flags);
+        compiler_fence(Ordering::SeqCst);
+        return guard;
+    }
+
+    unsafe fn restore_irq(flags: IrqFlags) {
+        compiler_fence(Ordering::SeqCst);
+        local_irq_restore(&flags.flags());
+        compiler_fence(Ordering::SeqCst);
+    }
+}

+ 2 - 0
kernel/src/arch/x86_64/mod.rs

@@ -8,3 +8,5 @@ pub mod mm;
 pub mod pci;
 pub mod rand;
 pub mod sched;
+
+pub use interrupt::X86_64InterruptArch as CurrentIrqArch;

+ 72 - 0
kernel/src/exception/mod.rs

@@ -1 +1,73 @@
+use crate::arch::CurrentIrqArch;
+
 pub mod softirq;
+
+/// @brief 中断相关的操作
+pub trait InterruptArch: Send + Sync {
+    /// @brief 使能中断
+    unsafe fn interrupt_enable();
+    /// @brief 禁止中断
+    unsafe fn interrupt_disable();
+    /// @brief 检查中断是否被禁止
+    fn is_irq_enabled() -> bool;
+
+    /// @brief 保存当前中断状态,并且禁止中断
+    unsafe fn save_and_disable_irq() -> IrqFlagsGuard;
+    unsafe fn restore_irq(flags: IrqFlags);
+}
+
+#[derive(Debug, Clone, Copy)]
+pub struct IrqFlags {
+    flags: u64,
+}
+
+impl IrqFlags {
+    pub fn new(flags: u64) -> Self {
+        IrqFlags { flags }
+    }
+
+    pub fn flags(&self) -> u64 {
+        self.flags
+    }
+}
+
+/// @brief 当前中断状态的保护器,当该对象被drop时,会恢复之前的中断状态
+///
+/// # Example
+///
+/// ```
+/// use crate::arch::CurrentIrqArch;
+///
+/// // disable irq and save irq state (这是唯一的获取IrqFlagsGuard的方法)
+/// let guard = unsafe{CurrentIrqArch::save_and_disable_irq()};
+///
+/// // do something
+///
+/// // 销毁guard时,会恢复之前的中断状态
+/// drop(guard);
+///
+/// ```
+#[derive(Debug)]
+pub struct IrqFlagsGuard {
+    flags: IrqFlags,
+}
+
+impl IrqFlagsGuard {
+    /// @brief 创建IrqFlagsGuard对象
+    ///
+    /// # Safety
+    ///
+    /// 该函数不安全,因为它不会检查flags是否是一个有效的IrqFlags对象, 而当它被drop时,会恢复flags中的中断状态
+    ///
+    /// 该函数只应被`CurrentIrqArch::save_and_disable_irq`调用
+    pub unsafe fn new(flags: IrqFlags) -> Self {
+        IrqFlagsGuard { flags }
+    }
+}
+impl Drop for IrqFlagsGuard {
+    fn drop(&mut self) {
+        unsafe {
+            CurrentIrqArch::restore_irq(self.flags);
+        }
+    }
+}

+ 2 - 0
kernel/src/ipc/mod.rs

@@ -1,2 +1,4 @@
+pub mod pipe;
 pub mod signal;
 pub mod signal_types;
+pub mod syscall;

+ 230 - 0
kernel/src/ipc/pipe.rs

@@ -0,0 +1,230 @@
+use crate::{
+    arch::{sched::sched, CurrentIrqArch},
+    exception::InterruptArch,
+    filesystem::vfs::{
+        core::generate_inode_id, FilePrivateData, FileSystem, FileType, IndexNode, Metadata,
+        PollStatus,
+    },
+    include::bindings::bindings::PROC_INTERRUPTIBLE,
+    libs::{spinlock::SpinLock, wait_queue::WaitQueue},
+    syscall::SystemError,
+    time::TimeSpec,
+};
+
+use alloc::sync::{Arc, Weak};
+
+/// 我们设定pipe_buff的总大小为1024字节
+const PIPE_BUFF_SIZE: usize = 1024;
+
+/// @brief 管道文件i节点(锁)
+#[derive(Debug)]
+pub struct LockedPipeInode(SpinLock<InnerPipeInode>);
+
+/// @brief 管道文件i节点(无锁)
+#[derive(Debug)]
+pub struct InnerPipeInode {
+    self_ref: Weak<LockedPipeInode>,
+    valid_cnt: i32,
+    read_pos: i32,
+    write_pos: i32,
+    read_wait_queue: WaitQueue,
+    write_wait_queue: WaitQueue,
+    data: [u8; PIPE_BUFF_SIZE],
+    /// INode 元数据
+    metadata: Metadata,
+}
+
+impl LockedPipeInode {
+    pub fn new() -> Arc<Self> {
+        let inner = InnerPipeInode {
+            self_ref: Weak::default(),
+            valid_cnt: 0,
+            read_pos: 0,
+            write_pos: 0,
+            read_wait_queue: WaitQueue::INIT,
+            write_wait_queue: WaitQueue::INIT,
+            data: [0; PIPE_BUFF_SIZE],
+
+            metadata: Metadata {
+                dev_id: 0,
+                inode_id: generate_inode_id(),
+                size: 0,
+                blk_size: 0,
+                blocks: 0,
+                atime: TimeSpec::default(),
+                mtime: TimeSpec::default(),
+                ctime: TimeSpec::default(),
+                file_type: FileType::Pipe,
+                mode: 0o666,
+                nlinks: 1,
+                uid: 0,
+                gid: 0,
+                raw_dev: 0,
+            },
+        };
+        let result = Arc::new(Self(SpinLock::new(inner)));
+        let mut guard = result.0.lock();
+        guard.self_ref = Arc::downgrade(&result);
+        // 释放锁
+        drop(guard); //这一步其实不需要,只要离开作用域,guard生命周期结束,自会解锁
+        return result;
+    }
+}
+
+impl IndexNode for LockedPipeInode {
+    fn read_at(
+        &self,
+        _offset: usize,
+        len: usize,
+        buf: &mut [u8],
+        _data: &mut FilePrivateData,
+    ) -> Result<usize, crate::syscall::SystemError> {
+        if buf.len() < len {
+            return Err(SystemError::EINVAL);
+        }
+        // 加锁
+        let mut inode = self.0.lock();
+
+        //如果管道里面没有数据,则唤醒写端,
+        while inode.valid_cnt == 0 {
+            inode.write_wait_queue.wakeup(PROC_INTERRUPTIBLE.into());
+
+            // 在读等待队列中睡眠,并释放锁
+            unsafe {
+                let irq_guard = CurrentIrqArch::save_and_disable_irq();
+                inode.read_wait_queue.sleep_without_schedule();
+                drop(inode);
+                
+                drop(irq_guard);
+            }
+            sched();
+            inode = self.0.lock();
+        }
+
+        let mut num = inode.valid_cnt as usize;
+        //决定要输出的字节
+        let start = inode.read_pos as usize;
+        //如果读端希望读取的字节数大于有效字节数,则输出有效字节
+        let mut end = (inode.valid_cnt as usize + inode.read_pos as usize) % PIPE_BUFF_SIZE;
+        //如果读端希望读取的字节数少于有效字节数,则输出希望读取的字节
+        if len < inode.valid_cnt as usize {
+            end = (len + inode.read_pos as usize) % PIPE_BUFF_SIZE;
+            num = len;
+        }
+
+        // 从管道拷贝数据到用户的缓冲区
+
+        if end < start {
+            buf[0..(PIPE_BUFF_SIZE - start)].copy_from_slice(&inode.data[start..PIPE_BUFF_SIZE]);
+            buf[(PIPE_BUFF_SIZE - start)..num].copy_from_slice(&inode.data[0..end]);
+        } else {
+            buf[0..num].copy_from_slice(&inode.data[start..end]);
+        }
+
+        //更新读位置以及valid_cnt
+        inode.read_pos = (inode.read_pos + num as i32) % PIPE_BUFF_SIZE as i32;
+        inode.valid_cnt -= num as i32;
+
+        //读完后解锁并唤醒等待在写等待队列中的进程
+        inode.write_wait_queue.wakeup(PROC_INTERRUPTIBLE.into());
+        //返回读取的字节数
+        return Ok(num);
+    }
+
+    fn open(
+        &self,
+        _data: &mut FilePrivateData,
+        _mode: &crate::filesystem::vfs::file::FileMode,
+    ) -> Result<(), SystemError> {
+        return Ok(());
+    }
+
+    fn metadata(&self) -> Result<crate::filesystem::vfs::Metadata, SystemError> {
+        let inode = self.0.lock();
+        let mut metadata = inode.metadata.clone();
+        metadata.size = inode.data.len() as i64;
+
+        return Ok(metadata);
+    }
+
+    fn close(&self, _data: &mut FilePrivateData) -> Result<(), SystemError> {
+        return Ok(());
+    }
+
+    fn write_at(
+        &self,
+        _offset: usize,
+        len: usize,
+        buf: &[u8],
+        _data: &mut FilePrivateData,
+    ) -> Result<usize, crate::syscall::SystemError> {
+        if buf.len() < len || len > PIPE_BUFF_SIZE {
+            return Err(SystemError::EINVAL);
+        }
+        // 加锁
+
+        let mut inode = self.0.lock();
+
+        // 如果管道空间不够
+
+        while len + inode.valid_cnt as usize > PIPE_BUFF_SIZE {
+            // 唤醒读端
+            inode.read_wait_queue.wakeup(PROC_INTERRUPTIBLE.into());
+            // 解锁并睡眠
+            unsafe {
+                let irq_guard = CurrentIrqArch::save_and_disable_irq();
+                inode.write_wait_queue.sleep_without_schedule();
+                drop(inode);
+                drop(irq_guard);
+            }
+            sched();
+            inode = self.0.lock();
+        }
+
+        // 决定要输入的字节
+        let start = inode.write_pos as usize;
+        let end = (inode.write_pos as usize + len) % PIPE_BUFF_SIZE;
+        // 从用户的缓冲区拷贝数据到管道
+
+        if end < start {
+            inode.data[start..PIPE_BUFF_SIZE].copy_from_slice(&buf[0..(PIPE_BUFF_SIZE - start)]);
+            inode.data[0..end].copy_from_slice(&buf[(PIPE_BUFF_SIZE - start)..len]);
+        } else {
+            inode.data[start..end].copy_from_slice(&buf[0..len]);
+        }
+        // 更新写位置以及valid_cnt
+        inode.write_pos = (inode.write_pos + len as i32) % PIPE_BUFF_SIZE as i32;
+        inode.valid_cnt += len as i32;
+
+        // 读完后解锁并唤醒等待在读等待队列中的进程
+        inode.read_wait_queue.wakeup(PROC_INTERRUPTIBLE.into());
+        // 返回写入的字节数
+        return Ok(len);
+    }
+
+    fn poll(&self) -> Result<PollStatus, crate::syscall::SystemError> {
+        return Ok(PollStatus::READ | PollStatus::WRITE);
+    }
+
+    fn as_any_ref(&self) -> &dyn core::any::Any {
+        self
+    }
+
+    fn get_entry_name_and_metadata(
+        &self,
+        ino: crate::filesystem::vfs::InodeId,
+    ) -> Result<(alloc::string::String, crate::filesystem::vfs::Metadata), SystemError> {
+        // 如果有条件,请在文件系统中使用高效的方式实现本接口,而不是依赖这个低效率的默认实现。
+        let name = self.get_entry_name(ino)?;
+        let entry = self.find(&name)?;
+        return Ok((name, entry.metadata()?));
+    }
+
+    fn fs(&self) -> Arc<(dyn FileSystem)> {
+        todo!()
+    }
+
+    fn list(&self) -> Result<alloc::vec::Vec<alloc::string::String>, SystemError> {
+        return Err(SystemError::EOPNOTSUPP_OR_ENOTSUP);
+    }
+}

+ 38 - 0
kernel/src/ipc/syscall.rs

@@ -0,0 +1,38 @@
+use crate::{
+    arch::asm::current::current_pcb,
+    filesystem::vfs::file::{File, FileMode},
+    include::bindings::bindings::pt_regs,
+    syscall::SystemError,
+};
+
+use super::pipe::LockedPipeInode;
+
+#[no_mangle]
+/// @brief 调用匿名管道
+pub extern "C" fn sys_pipe(regs: &pt_regs) -> u64 {
+    let fd: *mut i32 = regs.r8 as *mut i32;
+    return do_pipe(fd)
+        .map(|x| x as u64)
+        .unwrap_or_else(|e| e.to_posix_errno() as u64);
+}
+
+pub fn do_pipe(fd: *mut i32) -> Result<i64, SystemError> {
+    let pipe_ptr = LockedPipeInode::new();
+    let read_file = File::new(pipe_ptr.clone(), FileMode::O_RDONLY);
+    let write_file = File::new(pipe_ptr.clone(), FileMode::O_WRONLY);
+
+    let read_fd = current_pcb().alloc_fd(read_file.unwrap(), None);
+    if !read_fd.is_ok() {
+        return Err(read_fd.unwrap_err());
+    }
+
+    let write_fd = current_pcb().alloc_fd(write_file.unwrap(), None);
+    if !write_fd.is_ok() {
+        return Err(write_fd.unwrap_err());
+    }
+    unsafe {
+        *fd.offset(0) = read_fd.unwrap();
+        *fd.offset(1) = write_fd.unwrap();
+    }
+    return Ok(0);
+}

+ 38 - 1
kernel/src/libs/wait_queue.rs

@@ -2,7 +2,8 @@
 use alloc::{collections::LinkedList, vec::Vec};
 
 use crate::{
-    arch::{asm::current::current_pcb, sched::sched},
+    arch::{asm::current::current_pcb, sched::sched, CurrentIrqArch},
+    exception::InterruptArch,
     include::bindings::bindings::{
         process_control_block, process_wakeup, wait_queue_head_t, PROC_INTERRUPTIBLE,
         PROC_UNINTERRUPTIBLE,
@@ -45,9 +46,45 @@ impl WaitQueue {
         current_pcb().state = PROC_INTERRUPTIBLE as u64;
         guard.wait_list.push_back(current_pcb());
         drop(guard);
+
         sched();
     }
 
+    /// @brief 让当前进程在等待队列上进行等待,并且,在释放waitqueue的锁之前,执行f函数闭包
+    pub fn sleep_with_func<F>(&self, f: F)
+    where
+        F: FnOnce(),
+    {
+        let mut guard: SpinLockGuard<InnerWaitQueue> = self.0.lock();
+        current_pcb().state = PROC_INTERRUPTIBLE as u64;
+        guard.wait_list.push_back(current_pcb());
+        f();
+        drop(guard);
+        sched();
+    }
+
+    /// @brief 让当前进程在等待队列上进行等待. 但是,在释放waitqueue的锁之后,不会调用调度函数。
+    /// 这样的设计,是为了让调用者可以在执行本函数之后,执行一些操作,然后再【手动调用调度函数】。
+    ///
+    /// 执行本函数前,需要确保处于【中断禁止】状态。
+    ///
+    /// 尽管sleep_with_func和sleep_without_schedule都可以实现这个功能,但是,sleep_with_func会在释放锁之前,执行f函数闭包。
+    ///
+    /// 考虑这样一个场景:
+    /// 等待队列位于某个自旋锁保护的数据结构A中,我们希望在进程睡眠的同时,释放数据结构A的锁。
+    /// 在这种情况下,如果使用sleep_with_func,所有权系统不会允许我们这么做。
+    /// 因此,sleep_without_schedule的设计,正是为了解决这个问题。
+    ///
+    /// 由于sleep_without_schedule不会调用调度函数,因此,如果开发者忘记在执行本函数之后,手动调用调度函数,
+    /// 由于时钟中断到来或者‘其他cpu kick了当前cpu’,可能会导致一些未定义的行为。
+    pub unsafe fn sleep_without_schedule(&self) {
+        // 安全检查:确保当前处于中断禁止状态
+        assert!(CurrentIrqArch::is_irq_enabled() == false);
+        let mut guard: SpinLockGuard<InnerWaitQueue> = self.0.lock();
+        current_pcb().state = PROC_INTERRUPTIBLE as u64;
+        guard.wait_list.push_back(current_pcb());
+        drop(guard);
+    }
     /// @brief 让当前进程在等待队列上进行等待,并且,不允许被信号打断
     pub fn sleep_uninterruptible(&self) {
         let mut guard: SpinLockGuard<InnerWaitQueue> = self.0.lock();

+ 1 - 1
kernel/src/process/process.c

@@ -623,7 +623,7 @@ ul initial_kernel_thread(ul arg)
                          "m"(current_pcb->thread->rsp), "m"(current_pcb->thread->rip), "S"("/bin/shell.elf"), "c"(NULL),
                          "d"(NULL)
                          : "memory");
-
+       
     return 1;
 }
 #pragma GCC pop_options

+ 2 - 7
kernel/src/syscall/syscall.c

@@ -37,7 +37,8 @@ extern uint64_t sys_shutdown(struct pt_regs *regs);
 extern uint64_t sys_accept(struct pt_regs *regs);
 extern uint64_t sys_getsockname(struct pt_regs *regs);
 extern uint64_t sys_getpeername(struct pt_regs *regs);
-
+extern uint64_t sys_pipe(struct pt_regs *regs);
+extern uint64_t sys_mkdir(struct pt_regs *regs);
 /**
  * @brief 关闭文件系统调用
  *
@@ -400,12 +401,6 @@ void do_syscall_int(struct pt_regs *regs, unsigned long error_code)
     ul ret = system_call_table[regs->rax](regs);
     regs->rax = ret; // 返回码
 }
-uint64_t sys_pipe(struct pt_regs *regs)
-{
-    return -ENOTSUP;
-}
-
-extern uint64_t sys_mkdir(struct pt_regs *regs);
 
 system_call_t system_call_table[MAX_SYSTEM_CALL_NUM] = {
     [0] = system_call_not_exists,

+ 0 - 8
kernel/src/syscall/syscall.h

@@ -82,14 +82,6 @@ uint64_t sys_sbrk(struct pt_regs *regs);
  */
 uint64_t sys_mkdir(struct pt_regs *regs);
 
-/**
- * @brief 创建管道
- * 在pipe.c中实现
- * @param fd(r8) 文件句柄指针
- * @param num(r9) 文件句柄个数
- * @return uint64_t
- */
-uint64_t sys_pipe(struct pt_regs *regs);
 
 ul sys_ahci_end_req(struct pt_regs *regs);
 

+ 68 - 17
user/apps/shell/cmd_test.c

@@ -1,32 +1,83 @@
 #include "cmd_test.h"
+#include <math.h>
 #include <stdio.h>
 #include <stdlib.h>
 #include <string.h>
+#include <time.h>
 #include <unistd.h>
 
+#define buf_SIZE 256 // 定义消息的最大长度
 int shell_pipe_test(int argc, char **argv)
 {
-    int ret = -1;
-    int fd[2];
-    pid_t pid;
-    char buf[512] = {0};
-    char *msg = "hello world";
+    int fd[2], i, n;
 
-    ret = pipe(fd);
-    if (-1 == ret) {
-        printf("failed to create pipe\n");
-        return -1;
+    pid_t pid;
+    int ret = pipe(fd); // 创建一个管道
+    if (ret < 0)
+    {
+        printf("pipe error");
+        exit(1);
+    }
+    pid = fork(); // 创建一个子进程
+    if (pid < 0)
+    {
+        printf("fork error");
+        exit(1);
     }
-    pid = fork();
-    if (0 == pid) { 
-        // close(fd[0]);
-        ret = write(fd[1], msg, strlen(msg)); 
+    if (pid == 0)
+    {                 // 子进程
+        close(fd[1]); // 关闭管道的写端
+        for (i = 0; i < 3; i++)
+        { // 循环三次
+            char buf[buf_SIZE] = {0};
+            n = read(fd[0], buf, buf_SIZE); // 从管道的读端读取一条消息
+            if (n > 0)
+            {
+
+                printf("Child process received message: %s\n", buf); // 打印收到的消息
+                if (strcmp(buf, "quit") == 0)
+                {                                     // 如果收到的消息是"quit"
+                    printf("Child process exits.\n"); // 打印退出信息
+                    break;                            // 跳出循环
+                }
+                else
+                {                                                    // 如果收到的消息不是"quit"
+                    printf("Child process is doing something...\n"); // 模拟子进程做一些操作
+                    usleep(100);
+                }
+            }
+        }
+        close(fd[0]); // 关闭管道的读端
         exit(0);
-    } else {          
-        // close(fd[1]);
-        ret = read(fd[0], buf, sizeof(buf));
-        printf("parent read %d bytes data: %s\n", ret, buf);
     }
+    else
+    {                 // 父进程
+        close(fd[0]); // 关闭管道的读端
+        for (i = 0; i < 3; i++)
+        { // 循环三次
+            char *msg = "hello world";
+            if (i == 1)
+            {
+                msg = "how are you";
+                usleep(1000);
+            }
+            if (i == 2)
+            {
+                msg = "quit";
+                usleep(1000);
+            }
+            n = strlen(msg);
+            printf("Parent process send:%s\n", msg);
 
+            write(fd[1], msg, n); // 向管道的写端写入一条消息
+            if (strcmp(msg, "quit") == 0)
+            {                                      // 如果发送的消息是"quit"
+                printf("Parent process exits.\n"); // 打印退出信息
+                break;                             // 跳出循环
+            }
+        }
+        close(fd[1]); // 关闭管道的写端
+        wait(NULL);   // 等待子进程结束
+    }
     return 0;
 }

+ 0 - 5
user/libs/libc/src/sys/stat.c

@@ -16,8 +16,3 @@ int mstat(struct mstat_t *stat)
 {
     return syscall_invoke(SYS_MSTAT, (uint64_t)stat, 0, 0, 0, 0, 0, 0, 0);
 }
-
-int pipe(int *fd)
-{
-    return syscall_invoke(SYS_PIPE, (uint64_t)fd, 0, 0,0,0,0,0,0);
-}

+ 8 - 1
user/libs/libc/src/unistd.c

@@ -65,7 +65,14 @@ pid_t fork(void)
 {
     return (pid_t)syscall_invoke(SYS_FORK, 0, 0, 0, 0, 0, 0, 0, 0);
 }
-
+/**
+ * @brief 调用匿名管道
+ *
+ * @return int 如果失败返回负数
+ */
+int pipe(int fd[2]){
+    return (int)syscall_invoke(SYS_PIPE, fd, 0, 0, 0, 0, 0, 0, 0);
+}
 /**
  * @brief fork当前进程,但是与父进程共享VM、flags、fd
  *