浏览代码

feat: 增加tokio异步运行时支持 (#894)

* fix the EventFdFlags error

* feat: support tokio (Single thread version)

Fix deadlock issue on closing file.
Add function for PipeInode and EventFdInode.
linfeng 7 月之前
父节点
当前提交
4afc5b7b7b

+ 45 - 6
kernel/src/filesystem/eventfd.rs

@@ -3,11 +3,13 @@ use crate::filesystem::vfs::syscall::ModeType;
 use crate::filesystem::vfs::{FilePrivateData, FileSystem, FileType, IndexNode, Metadata};
 use crate::libs::spinlock::{SpinLock, SpinLockGuard};
 use crate::libs::wait_queue::WaitQueue;
-use crate::net::event_poll::EPollEventType;
+use crate::net::event_poll::{EPollEventType, EPollItem, EventPoll, KernelIoctlData};
 use crate::process::ProcessManager;
 use crate::syscall::Syscall;
+use alloc::collections::LinkedList;
 use alloc::string::String;
 use alloc::sync::Arc;
+use alloc::sync::Weak;
 use alloc::vec::Vec;
 use core::any::Any;
 use ida::IdAllocator;
@@ -19,14 +21,14 @@ bitflags! {
     pub struct EventFdFlags: u32{
         /// Provide semaphore-like semantics for reads from the new
         /// file descriptor.
-        const EFD_SEMAPHORE = 1;
+        const EFD_SEMAPHORE = 0o1;
         /// Set the close-on-exec (FD_CLOEXEC) flag on the new file
         /// descriptor
-        const EFD_CLOEXEC = 2;
+        const EFD_CLOEXEC = 0o2000000;
         /// Set the O_NONBLOCK file status flag on the open file
         /// description (see open(2)) referred to by the new file
         /// descriptor
-        const EFD_NONBLOCK = 4;
+        const EFD_NONBLOCK = 0o0004000;
     }
 }
 
@@ -48,6 +50,7 @@ impl EventFd {
 pub struct EventFdInode {
     eventfd: SpinLock<EventFd>,
     wait_queue: WaitQueue,
+    epitems: SpinLock<LinkedList<Arc<EPollItem>>>,
 }
 
 impl EventFdInode {
@@ -55,8 +58,23 @@ impl EventFdInode {
         EventFdInode {
             eventfd: SpinLock::new(eventfd),
             wait_queue: WaitQueue::default(),
+            epitems: SpinLock::new(LinkedList::new()),
         }
     }
+    pub fn remove_epoll(&self, epoll: &Weak<SpinLock<EventPoll>>) -> Result<(), SystemError> {
+        let is_remove = !self
+            .epitems
+            .lock_irqsave()
+            .extract_if(|x| x.epoll().ptr_eq(epoll))
+            .collect::<Vec<_>>()
+            .is_empty();
+
+        if is_remove {
+            return Ok(());
+        }
+
+        Err(SystemError::ENOENT)
+    }
 }
 
 impl IndexNode for EventFdInode {
@@ -85,7 +103,7 @@ impl IndexNode for EventFdInode {
         _offset: usize,
         len: usize,
         buf: &mut [u8],
-        _data: SpinLockGuard<FilePrivateData>,
+        data: SpinLockGuard<FilePrivateData>,
     ) -> Result<usize, SystemError> {
         if len < 8 {
             return Err(SystemError::EINVAL);
@@ -115,6 +133,11 @@ impl IndexNode for EventFdInode {
         }
         let val_bytes = val.to_ne_bytes();
         buf[..8].copy_from_slice(&val_bytes);
+
+        let pollflag = EPollEventType::from_bits_truncate(self.poll(&data)? as u32);
+        // 唤醒epoll中等待的进程
+        EventPoll::wakeup_epoll(&self.epitems, pollflag)?;
+
         return Ok(8);
     }
 
@@ -131,7 +154,7 @@ impl IndexNode for EventFdInode {
         _offset: usize,
         len: usize,
         buf: &[u8],
-        _data: SpinLockGuard<FilePrivateData>,
+        data: SpinLockGuard<FilePrivateData>,
     ) -> Result<usize, SystemError> {
         if len < 8 {
             return Err(SystemError::EINVAL);
@@ -157,6 +180,10 @@ impl IndexNode for EventFdInode {
         let mut eventfd = self.eventfd.lock();
         eventfd.count += val;
         self.wait_queue.wakeup_all(None);
+
+        let pollflag = EPollEventType::from_bits_truncate(self.poll(&data)? as u32);
+        // 唤醒epoll中等待的进程
+        EventPoll::wakeup_epoll(&self.epitems, pollflag)?;
         return Ok(8);
     }
 
@@ -187,6 +214,18 @@ impl IndexNode for EventFdInode {
     fn resize(&self, _len: usize) -> Result<(), SystemError> {
         Ok(())
     }
+    fn kernel_ioctl(
+        &self,
+        arg: Arc<dyn KernelIoctlData>,
+        _data: &FilePrivateData,
+    ) -> Result<usize, SystemError> {
+        let epitem = arg
+            .arc_any()
+            .downcast::<EPollItem>()
+            .map_err(|_| SystemError::EFAULT)?;
+        self.epitems.lock().push_back(epitem);
+        Ok(0)
+    }
     fn fs(&self) -> Arc<dyn FileSystem> {
         panic!("EventFd does not have a filesystem")
     }

+ 16 - 6
kernel/src/filesystem/vfs/file.rs

@@ -8,6 +8,8 @@ use alloc::{
 use log::error;
 use system_error::SystemError;
 
+use super::{Dirent, FileType, IndexNode, InodeId, Metadata, SpecialNodeData};
+use crate::filesystem::eventfd::EventFdInode;
 use crate::{
     driver::{
         base::{block::SeekFrom, device::DevicePrivateData},
@@ -23,8 +25,6 @@ use crate::{
     process::{cred::Cred, ProcessManager},
 };
 
-use super::{Dirent, FileType, IndexNode, InodeId, Metadata, SpecialNodeData};
-
 /// 文件私有信息的枚举类型
 #[derive(Debug, Clone)]
 #[allow(dead_code)]
@@ -513,9 +513,19 @@ impl File {
                 let inode = self.inode.downcast_ref::<SocketInode>().unwrap();
                 let mut socket = inode.inner();
 
-                return socket.remove_epoll(epoll);
+                socket.remove_epoll(epoll)
+            }
+            FileType::Pipe => {
+                let inode = self.inode.downcast_ref::<LockedPipeInode>().unwrap();
+                inode.inner().lock().remove_epoll(epoll)
+            }
+            _ => {
+                let inode = self
+                    .inode
+                    .downcast_ref::<EventFdInode>()
+                    .ok_or(SystemError::ENOSYS)?;
+                inode.remove_epoll(epoll)
             }
-            _ => return Err(SystemError::ENOSYS),
         }
     }
 
@@ -643,14 +653,14 @@ impl FileDescriptorVec {
     /// ## 参数
     ///
     /// - `fd` 文件描述符序号
-    pub fn drop_fd(&mut self, fd: i32) -> Result<(), SystemError> {
+    pub fn drop_fd(&mut self, fd: i32) -> Result<Arc<File>, SystemError> {
         self.get_file_by_fd(fd).ok_or(SystemError::EBADF)?;
 
         // 把文件描述符数组对应位置设置为空
         let file = self.fds[fd as usize].take().unwrap();
 
         assert!(Arc::strong_count(&file) == 1);
-        return Ok(());
+        return Ok(file);
     }
 
     #[allow(dead_code)]

+ 3 - 2
kernel/src/filesystem/vfs/syscall.rs

@@ -522,8 +522,9 @@ impl Syscall {
     pub fn close(fd: usize) -> Result<usize, SystemError> {
         let binding = ProcessManager::current_pcb().fd_table();
         let mut fd_table_guard = binding.write();
-
-        fd_table_guard.drop_fd(fd as i32).map(|_| 0)
+        let _file = fd_table_guard.drop_fd(fd as i32)?;
+        drop(fd_table_guard);
+        Ok(0)
     }
 
     /// @brief 发送命令到文件描述符对应的设备,

+ 16 - 0
kernel/src/ipc/pipe.rs

@@ -18,6 +18,7 @@ use crate::{
 use alloc::{
     collections::LinkedList,
     sync::{Arc, Weak},
+    vec::Vec,
 };
 use system_error::SystemError;
 
@@ -104,6 +105,21 @@ impl InnerPipeInode {
         self.epitems.lock().push_back(epitem);
         Ok(())
     }
+
+    pub fn remove_epoll(&self, epoll: &Weak<SpinLock<EventPoll>>) -> Result<(), SystemError> {
+        let is_remove = !self
+            .epitems
+            .lock_irqsave()
+            .extract_if(|x| x.epoll().ptr_eq(epoll))
+            .collect::<Vec<_>>()
+            .is_empty();
+
+        if is_remove {
+            return Ok(());
+        }
+
+        Err(SystemError::ENOENT)
+    }
 }
 
 impl LockedPipeInode {

+ 3 - 0
user/apps/test_tokio/.gitignore

@@ -0,0 +1,3 @@
+/target
+Cargo.lock
+/install/

+ 13 - 0
user/apps/test_tokio/Cargo.toml

@@ -0,0 +1,13 @@
+[package]
+name = "test_tokio"
+version = "0.1.0"
+edition = "2021"
+
+[dependencies]
+tokio = { version = "1.25", features = [
+    "macros",
+    "rt",
+    "rt-multi-thread",
+    "net",
+    "signal",
+] }

+ 56 - 0
user/apps/test_tokio/Makefile

@@ -0,0 +1,56 @@
+TOOLCHAIN="+nightly-2024-07-23-x86_64-unknown-linux-gnu"
+RUSTFLAGS+=""
+
+ifdef DADK_CURRENT_BUILD_DIR
+# 如果是在dadk中编译,那么安装到dadk的安装目录中
+	INSTALL_DIR = $(DADK_CURRENT_BUILD_DIR)
+else
+# 如果是在本地编译,那么安装到当前目录下的install目录中
+	INSTALL_DIR = ./install
+endif
+
+ifeq ($(ARCH), x86_64)
+	export RUST_TARGET=x86_64-unknown-linux-musl
+else ifeq ($(ARCH), riscv64)
+	export RUST_TARGET=riscv64gc-unknown-linux-gnu
+else 
+# 默认为x86_86,用于本地编译
+	export RUST_TARGET=x86_64-unknown-linux-musl
+endif
+
+run:
+	RUSTFLAGS=$(RUSTFLAGS) cargo $(TOOLCHAIN) run --target $(RUST_TARGET)
+
+build:
+	RUSTFLAGS=$(RUSTFLAGS) cargo $(TOOLCHAIN) build --target $(RUST_TARGET)
+
+clean:
+	RUSTFLAGS=$(RUSTFLAGS) cargo $(TOOLCHAIN) clean --target $(RUST_TARGET)
+
+test:
+	RUSTFLAGS=$(RUSTFLAGS) cargo $(TOOLCHAIN) test --target $(RUST_TARGET)
+
+doc:
+	RUSTFLAGS=$(RUSTFLAGS) cargo $(TOOLCHAIN) doc --target $(RUST_TARGET)
+
+fmt:
+	RUSTFLAGS=$(RUSTFLAGS) cargo $(TOOLCHAIN) fmt
+
+fmt-check:
+	RUSTFLAGS=$(RUSTFLAGS) cargo $(TOOLCHAIN) fmt --check
+
+run-release:
+	RUSTFLAGS=$(RUSTFLAGS) cargo $(TOOLCHAIN) run --target $(RUST_TARGET) --release
+
+build-release:
+	RUSTFLAGS=$(RUSTFLAGS) cargo $(TOOLCHAIN) build --target $(RUST_TARGET) --release
+
+clean-release:
+	RUSTFLAGS=$(RUSTFLAGS) cargo $(TOOLCHAIN) clean --target $(RUST_TARGET) --release
+
+test-release:
+	RUSTFLAGS=$(RUSTFLAGS) cargo $(TOOLCHAIN) test --target $(RUST_TARGET) --release
+
+.PHONY: install
+install:
+	RUSTFLAGS=$(RUSTFLAGS) cargo $(TOOLCHAIN) install --target $(RUST_TARGET) --path . --no-track --root $(INSTALL_DIR) --force

+ 17 - 0
user/apps/test_tokio/src/main.rs

@@ -0,0 +1,17 @@
+use tokio::signal;
+
+async fn say_world() {
+    println!("world");
+}
+
+#[tokio::main(flavor = "current_thread")]
+async fn main() {
+    // Calling `say_world()` does not execute the body of `say_world()`.
+    let op = say_world();
+
+    // This println! comes first
+    println!("hello");
+
+    // Calling `.await` on `op` starts executing `say_world`.
+    op.await;
+}

+ 23 - 0
user/dadk/config/test_tokio-0.1.0.dadk

@@ -0,0 +1,23 @@
+{
+  "name": "test_tokio",
+  "version": "0.1.0",
+  "description": "测试tokio",
+  "task_type": {
+    "BuildFromSource": {
+      "Local": {
+        "path": "apps/test_tokio"
+      }
+    }
+  },
+  "depends": [],
+  "build": {
+    "build_command": "make install"
+  },
+  "clean": {
+    "clean_command": "make clean"
+  },
+  "install": {
+    "in_dragonos_path": "/"
+  },
+  "target_arch": ["x86_64"]
+}