Browse Source

Patch socketpair (#576)

* 将sockets分成inet和unix域
- 添加File端点
- 添加SocketPair trait并将Socket trait中的pair相关方法移动
- 添加对SockAddrUn的处理

* 精简SocketHandleItem

* 重构socketpair相关逻辑
- 将File端点换成Inode端点
- 尝试使用SocketInode进行socketpair(未成功)


* 将SocketPair trait合并到Socket trait中,去除downcast
裕依 11 months ago
parent
commit
6046f77591

+ 1 - 0
kernel/src/filesystem/fat/entry.rs

@@ -1119,6 +1119,7 @@ impl LongDirEntry {
                 | '^' | '#' | '&' => {}
                 '+' | ',' | ';' | '=' | '[' | ']' | '.' | ' ' => {}
                 _ => {
+                    kdebug!("error char: {}", c);
                     return Err(SystemError::EILSEQ);
                 }
             }

+ 1 - 1
kernel/src/filesystem/vfs/file.rs

@@ -610,7 +610,7 @@ impl FileDescriptorVec {
         if !FileDescriptorVec::validate_fd(fd) {
             return None;
         }
-        return self.fds[fd as usize].clone();
+        self.fds[fd as usize].clone()
     }
 
     /// 释放文件描述符,同时关闭文件。

+ 1 - 1
kernel/src/filesystem/vfs/open.rs

@@ -76,7 +76,7 @@ fn do_sys_openat2(
     how: OpenHow,
     follow_symlink: bool,
 ) -> Result<usize, SystemError> {
-    // kdebug!("open: path: {}, mode: {:?}", path, mode);
+    // kdebug!("open path: {}, how: {:?}", path, how);
     let path = path.trim();
 
     let (inode_begin, path) = user_path_at(&ProcessManager::current_pcb(), dirfd, path)?;

+ 1 - 3
kernel/src/filesystem/vfs/syscall.rs

@@ -463,9 +463,7 @@ impl Syscall {
         let binding = ProcessManager::current_pcb().fd_table();
         let mut fd_table_guard = binding.write();
 
-        let res = fd_table_guard.drop_fd(fd as i32).map(|_| 0);
-
-        return res;
+        fd_table_guard.drop_fd(fd as i32).map(|_| 0)
     }
 
     /// @brief 发送命令到文件描述符对应的设备,

+ 4 - 2
kernel/src/net/mod.rs

@@ -8,6 +8,8 @@ use alloc::{collections::BTreeMap, sync::Arc};
 use crate::{driver::net::NetDriver, libs::rwlock::RwLock};
 use smoltcp::wire::IpEndpoint;
 
+use self::socket::SocketInode;
+
 pub mod event_poll;
 pub mod net_core;
 pub mod socket;
@@ -41,8 +43,8 @@ pub enum Endpoint {
     LinkLayer(LinkLayerEndpoint),
     /// 网络层端点
     Ip(Option<IpEndpoint>),
-    /// 不需要端点
-    Unused,
+    /// inode端点
+    Inode(Option<Arc<SocketInode>>),
     // todo: 增加NetLink机制后,增加NetLink端点
 }
 

+ 1 - 1
kernel/src/net/net_core.rs

@@ -12,7 +12,7 @@ use crate::{
 
 use super::{
     event_poll::{EPollEventType, EventPoll},
-    socket::{sockets::TcpSocket, HANDLE_MAP, SOCKET_SET},
+    socket::{inet::TcpSocket, HANDLE_MAP, SOCKET_SET},
 };
 
 /// The network poll function, which will be called by timer.

+ 34 - 148
kernel/src/net/socket/sockets.rs → kernel/src/net/socket/inet.rs

@@ -9,7 +9,7 @@ use system_error::SystemError;
 use crate::{
     driver::net::NetDriver,
     kerror, kwarn,
-    libs::{rwlock::RwLock, spinlock::SpinLock},
+    libs::rwlock::RwLock,
     net::{
         event_poll::EPollEventType, net_core::poll_ifaces, Endpoint, Protocol, ShutdownType,
         NET_DRIVERS,
@@ -18,7 +18,7 @@ use crate::{
 
 use super::{
     GlobalSocketHandle, Socket, SocketHandleItem, SocketMetadata, SocketOptions, SocketPollMethod,
-    SocketType, SocketpairOps, HANDLE_MAP, PORT_MANAGER, SOCKET_SET,
+    SocketType, HANDLE_MAP, PORT_MANAGER, SOCKET_SET,
 };
 
 /// @brief 表示原始的socket。原始套接字绕过传输层协议(如 TCP 或 UDP)并提供对网络层协议(如 IP)的直接访问。
@@ -87,15 +87,7 @@ impl RawSocket {
 }
 
 impl Socket for RawSocket {
-    fn as_any_ref(&self) -> &dyn core::any::Any {
-        self
-    }
-
-    fn as_any_mut(&mut self) -> &mut dyn core::any::Any {
-        self
-    }
-
-    fn read(&mut self, buf: &mut [u8]) -> (Result<usize, SystemError>, Endpoint) {
+    fn read(&self, buf: &mut [u8]) -> (Result<usize, SystemError>, Endpoint) {
         poll_ifaces();
         loop {
             // 如何优化这里?
@@ -208,17 +200,25 @@ impl Socket for RawSocket {
         Ok(())
     }
 
-    fn metadata(&self) -> Result<SocketMetadata, SystemError> {
-        Ok(self.metadata.clone())
+    fn metadata(&self) -> SocketMetadata {
+        self.metadata.clone()
     }
 
     fn box_clone(&self) -> Box<dyn Socket> {
-        return Box::new(self.clone());
+        Box::new(self.clone())
     }
 
     fn socket_handle(&self) -> SocketHandle {
         self.handle.0
     }
+
+    fn as_any_ref(&self) -> &dyn core::any::Any {
+        self
+    }
+
+    fn as_any_mut(&mut self) -> &mut dyn core::any::Any {
+        self
+    }
 }
 
 /// @brief 表示udp socket
@@ -296,16 +296,8 @@ impl UdpSocket {
 }
 
 impl Socket for UdpSocket {
-    fn as_any_ref(&self) -> &dyn core::any::Any {
-        self
-    }
-
-    fn as_any_mut(&mut self) -> &mut dyn core::any::Any {
-        self
-    }
-
     /// @brief 在read函数执行之前,请先bind到本地的指定端口
-    fn read(&mut self, buf: &mut [u8]) -> (Result<usize, SystemError>, Endpoint) {
+    fn read(&self, buf: &mut [u8]) -> (Result<usize, SystemError>, Endpoint) {
         loop {
             // kdebug!("Wait22 to Read");
             poll_ifaces();
@@ -413,10 +405,10 @@ impl Socket for UdpSocket {
     fn connect(&mut self, endpoint: Endpoint) -> Result<(), SystemError> {
         if let Endpoint::Ip(_) = endpoint {
             self.remote_endpoint = Some(endpoint);
-            return Ok(());
+            Ok(())
         } else {
-            return Err(SystemError::EINVAL);
-        };
+            Err(SystemError::EINVAL)
+        }
     }
 
     fn ioctl(
@@ -429,8 +421,8 @@ impl Socket for UdpSocket {
         todo!()
     }
 
-    fn metadata(&self) -> Result<SocketMetadata, SystemError> {
-        Ok(self.metadata.clone())
+    fn metadata(&self) -> SocketMetadata {
+        self.metadata.clone()
     }
 
     fn box_clone(&self) -> Box<dyn Socket> {
@@ -465,6 +457,14 @@ impl Socket for UdpSocket {
     fn socket_handle(&self) -> SocketHandle {
         self.handle.0
     }
+
+    fn as_any_ref(&self) -> &dyn core::any::Any {
+        self
+    }
+
+    fn as_any_mut(&mut self) -> &mut dyn core::any::Any {
+        self
+    }
 }
 
 /// @brief 表示 tcp socket
@@ -519,6 +519,7 @@ impl TcpSocket {
             metadata,
         };
     }
+
     fn do_listen(
         &mut self,
         socket: &mut tcp::Socket,
@@ -548,15 +549,7 @@ impl TcpSocket {
 }
 
 impl Socket for TcpSocket {
-    fn as_any_ref(&self) -> &dyn core::any::Any {
-        self
-    }
-
-    fn as_any_mut(&mut self) -> &mut dyn core::any::Any {
-        self
-    }
-
-    fn read(&mut self, buf: &mut [u8]) -> (Result<usize, SystemError>, Endpoint) {
+    fn read(&self, buf: &mut [u8]) -> (Result<usize, SystemError>, Endpoint) {
         if HANDLE_MAP
             .read_irqsave()
             .get(&self.socket_handle())
@@ -836,7 +829,7 @@ impl Socket for TcpSocket {
                     let item = handle_guard.remove(&old_handle.0).unwrap();
                     // 按照smoltcp行为,将新的handle绑定到原来的item
                     handle_guard.insert(new_handle.0, item);
-                    let new_item = SocketHandleItem::from_socket(new_socket.as_ref());
+                    let new_item = SocketHandleItem::new();
                     // 插入新的item
                     handle_guard.insert(old_handle.0, new_item);
 
@@ -877,66 +870,18 @@ impl Socket for TcpSocket {
         return socket.remote_endpoint().map(|x| Endpoint::Ip(Some(x)));
     }
 
-    fn metadata(&self) -> Result<SocketMetadata, SystemError> {
-        Ok(self.metadata.clone())
+    fn metadata(&self) -> SocketMetadata {
+        self.metadata.clone()
     }
 
     fn box_clone(&self) -> Box<dyn Socket> {
-        return Box::new(self.clone());
+        Box::new(self.clone())
     }
 
     fn socket_handle(&self) -> SocketHandle {
         self.handle.0
     }
-}
-
-/// # 表示 seqpacket socket
-#[derive(Debug, Clone)]
-#[cast_to(Socket)]
-pub struct SeqpacketSocket {
-    metadata: SocketMetadata,
-    buffer: Arc<SpinLock<Vec<u8>>>,
-    peer_buffer: Option<Arc<SpinLock<Vec<u8>>>>,
-}
-
-impl SeqpacketSocket {
-    /// 默认的元数据缓冲区大小
-    pub const DEFAULT_METADATA_BUF_SIZE: usize = 1024;
-    /// 默认的缓冲区大小
-    pub const DEFAULT_BUF_SIZE: usize = 64 * 1024;
-
-    /// # 创建一个seqpacket的socket
-    ///
-    /// ## 参数
-    /// - `options`: socket的选项
-    pub fn new(options: SocketOptions) -> Self {
-        let buffer = Vec::with_capacity(Self::DEFAULT_BUF_SIZE);
-
-        let metadata = SocketMetadata::new(
-            SocketType::Seqpacket,
-            Self::DEFAULT_BUF_SIZE,
-            0,
-            Self::DEFAULT_METADATA_BUF_SIZE,
-            options,
-        );
-
-        return Self {
-            metadata,
-            buffer: Arc::new(SpinLock::new(buffer)),
-            peer_buffer: None,
-        };
-    }
-
-    fn buffer(&self) -> Arc<SpinLock<Vec<u8>>> {
-        self.buffer.clone()
-    }
-
-    fn set_peer_buffer(&mut self, peer_buffer: Arc<SpinLock<Vec<u8>>>) {
-        self.peer_buffer = Some(peer_buffer);
-    }
-}
 
-impl Socket for SeqpacketSocket {
     fn as_any_ref(&self) -> &dyn core::any::Any {
         self
     }
@@ -944,63 +889,4 @@ impl Socket for SeqpacketSocket {
     fn as_any_mut(&mut self) -> &mut dyn core::any::Any {
         self
     }
-
-    fn read(&mut self, buf: &mut [u8]) -> (Result<usize, SystemError>, Endpoint) {
-        let buffer = self.buffer.lock_irqsave();
-
-        let len = core::cmp::min(buf.len(), buffer.len());
-        buf[..len].copy_from_slice(&buffer[..len]);
-
-        (Ok(len), Endpoint::Unused)
-    }
-
-    fn write(&self, buf: &[u8], _to: Option<Endpoint>) -> Result<usize, SystemError> {
-        if self.peer_buffer.is_none() {
-            kwarn!("SeqpacketSocket is now just for socketpair");
-            return Err(SystemError::ENOSYS);
-        }
-
-        let binding = self.peer_buffer.clone().unwrap();
-        let mut peer_buffer = binding.lock_irqsave();
-
-        let len = buf.len();
-        if peer_buffer.capacity() - peer_buffer.len() < len {
-            return Err(SystemError::ENOBUFS);
-        }
-        peer_buffer[..len].copy_from_slice(buf);
-
-        Ok(len)
-    }
-
-    fn socketpair_ops(&self) -> Option<&'static dyn SocketpairOps> {
-        Some(&SeqpacketSocketpairOps)
-    }
-
-    fn metadata(&self) -> Result<SocketMetadata, SystemError> {
-        Ok(self.metadata.clone())
-    }
-
-    fn box_clone(&self) -> Box<dyn Socket> {
-        Box::new(self.clone())
-    }
-}
-
-struct SeqpacketSocketpairOps;
-
-impl SocketpairOps for SeqpacketSocketpairOps {
-    fn socketpair(&self, socket0: &mut Box<dyn Socket>, socket1: &mut Box<dyn Socket>) {
-        let pair0 = socket0
-            .as_mut()
-            .as_any_mut()
-            .downcast_mut::<SeqpacketSocket>()
-            .unwrap();
-
-        let pair1 = socket1
-            .as_mut()
-            .as_any_mut()
-            .downcast_mut::<SeqpacketSocket>()
-            .unwrap();
-        pair0.set_peer_buffer(pair1.buffer());
-        pair1.set_peer_buffer(pair0.buffer());
-    }
 }

+ 36 - 53
kernel/src/net/socket/mod.rs

@@ -27,7 +27,10 @@ use crate::{
     },
 };
 
-use self::sockets::{RawSocket, SeqpacketSocket, TcpSocket, UdpSocket};
+use self::{
+    inet::{RawSocket, TcpSocket, UdpSocket},
+    unix::{SeqpacketSocket, StreamSocket},
+};
 
 use super::{
     event_poll::{EPollEventType, EPollItem, EventPoll},
@@ -35,7 +38,8 @@ use super::{
     Endpoint, Protocol, ShutdownType,
 };
 
-pub mod sockets;
+pub mod inet;
+pub mod unix;
 
 lazy_static! {
     /// 所有socket的集合
@@ -60,9 +64,7 @@ pub(super) fn new_socket(
 ) -> Result<Box<dyn Socket>, SystemError> {
     let socket: Box<dyn Socket> = match address_family {
         AddressFamily::Unix => match socket_type {
-            PosixSocketType::Stream => Box::new(TcpSocket::new(SocketOptions::default())),
-            PosixSocketType::Datagram => Box::new(UdpSocket::new(SocketOptions::default())),
-            PosixSocketType::Raw => Box::new(RawSocket::new(protocol, SocketOptions::default())),
+            PosixSocketType::Stream => Box::new(StreamSocket::new(SocketOptions::default())),
             PosixSocketType::SeqPacket => Box::new(SeqpacketSocket::new(SocketOptions::default())),
             _ => {
                 return Err(SystemError::EINVAL);
@@ -84,16 +86,13 @@ pub(super) fn new_socket(
 }
 
 pub trait Socket: Sync + Send + Debug + Any {
-    fn as_any_ref(&self) -> &dyn Any;
-
-    fn as_any_mut(&mut self) -> &mut dyn Any;
     /// @brief 从socket中读取数据,如果socket是阻塞的,那么直到读取到数据才返回
     ///
     /// @param buf 读取到的数据存放的缓冲区
     ///
     /// @return - 成功:(返回读取的数据的长度,读取数据的端点).
     ///         - 失败:错误码
-    fn read(&mut self, buf: &mut [u8]) -> (Result<usize, SystemError>, Endpoint);
+    fn read(&self, buf: &mut [u8]) -> (Result<usize, SystemError>, Endpoint);
 
     /// @brief 向socket中写入数据。如果socket是阻塞的,那么直到写入的数据全部写入socket中才返回
     ///
@@ -113,9 +112,7 @@ pub trait Socket: Sync + Send + Debug + Any {
     /// @param endpoint 要连接的端点
     ///
     /// @return 返回连接是否成功
-    fn connect(&mut self, _endpoint: Endpoint) -> Result<(), SystemError> {
-        return Err(SystemError::ENOSYS);
-    }
+    fn connect(&mut self, _endpoint: Endpoint) -> Result<(), SystemError>;
 
     /// @brief 对应于POSIX的bind函数,用于绑定到本机指定的端点
     ///
@@ -125,7 +122,7 @@ pub trait Socket: Sync + Send + Debug + Any {
     ///
     /// @return 返回绑定是否成功
     fn bind(&mut self, _endpoint: Endpoint) -> Result<(), SystemError> {
-        return Err(SystemError::ENOSYS);
+        Err(SystemError::ENOSYS)
     }
 
     /// @brief 对应于 POSIX 的 shutdown 函数,用于关闭socket。
@@ -136,7 +133,7 @@ pub trait Socket: Sync + Send + Debug + Any {
     ///
     /// @return 返回是否成功关闭
     fn shutdown(&mut self, _type: ShutdownType) -> Result<(), SystemError> {
-        return Err(SystemError::ENOSYS);
+        Err(SystemError::ENOSYS)
     }
 
     /// @brief 对应于POSIX的listen函数,用于监听端点
@@ -145,7 +142,7 @@ pub trait Socket: Sync + Send + Debug + Any {
     ///
     /// @return 返回监听是否成功
     fn listen(&mut self, _backlog: usize) -> Result<(), SystemError> {
-        return Err(SystemError::ENOSYS);
+        Err(SystemError::ENOSYS)
     }
 
     /// @brief 对应于POSIX的accept函数,用于接受连接
@@ -154,24 +151,20 @@ pub trait Socket: Sync + Send + Debug + Any {
     ///
     /// @return 返回接受连接是否成功
     fn accept(&mut self) -> Result<(Box<dyn Socket>, Endpoint), SystemError> {
-        return Err(SystemError::ENOSYS);
+        Err(SystemError::ENOSYS)
     }
 
     /// @brief 获取socket的端点
     ///
     /// @return 返回socket的端点
     fn endpoint(&self) -> Option<Endpoint> {
-        return None;
+        None
     }
 
     /// @brief 获取socket的对端端点
     ///
     /// @return 返回socket的对端端点
     fn peer_endpoint(&self) -> Option<Endpoint> {
-        return None;
-    }
-
-    fn socketpair_ops(&self) -> Option<&'static dyn SocketpairOps> {
         None
     }
 
@@ -187,7 +180,7 @@ pub trait Socket: Sync + Send + Debug + Any {
     ///     The third boolean value indicates whether the socket has encountered an error condition. If it is true, then the socket is in an error state and should be closed or reset
     ///
     fn poll(&self) -> EPollEventType {
-        return EPollEventType::empty();
+        EPollEventType::empty()
     }
 
     /// @brief socket的ioctl函数
@@ -205,11 +198,11 @@ pub trait Socket: Sync + Send + Debug + Any {
         _arg1: usize,
         _arg2: usize,
     ) -> Result<usize, SystemError> {
-        return Ok(0);
+        Ok(0)
     }
 
     /// @brief 获取socket的元数据
-    fn metadata(&self) -> Result<SocketMetadata, SystemError>;
+    fn metadata(&self) -> SocketMetadata;
 
     fn box_clone(&self) -> Box<dyn Socket>;
 
@@ -227,13 +220,21 @@ pub trait Socket: Sync + Send + Debug + Any {
         _optval: &[u8],
     ) -> Result<(), SystemError> {
         kwarn!("setsockopt is not implemented");
-        return Ok(());
+        Ok(())
     }
 
     fn socket_handle(&self) -> SocketHandle {
         todo!()
     }
 
+    fn write_buffer(&self, _buf: &[u8]) -> Result<usize, SystemError> {
+        todo!()
+    }
+
+    fn as_any_ref(&self) -> &dyn Any;
+
+    fn as_any_mut(&mut self) -> &mut dyn Any;
+
     fn add_epoll(&mut self, epitem: Arc<EPollItem>) -> Result<(), SystemError> {
         HANDLE_MAP
             .write_irqsave()
@@ -278,11 +279,6 @@ impl Clone for Box<dyn Socket> {
     }
 }
 
-pub trait SocketpairOps {
-    /// 执行socketpair
-    fn socketpair(&self, socket0: &mut Box<dyn Socket>, socket1: &mut Box<dyn Socket>);
-}
-
 /// # Socket在文件系统中的inode封装
 #[derive(Debug)]
 pub struct SocketInode(SpinLock<Box<dyn Socket>>, AtomicUsize);
@@ -294,11 +290,11 @@ impl SocketInode {
 
     #[inline]
     pub fn inner(&self) -> SpinLockGuard<Box<dyn Socket>> {
-        return self.0.lock();
+        self.0.lock()
     }
 
     pub unsafe fn inner_no_preempt(&self) -> SpinLockGuard<Box<dyn Socket>> {
-        return self.0.lock_no_preempt();
+        self.0.lock_no_preempt()
     }
 }
 
@@ -314,12 +310,12 @@ impl IndexNode for SocketInode {
             // 最后一次关闭,需要释放
             let mut socket = self.0.lock_irqsave();
 
-            if socket.metadata().unwrap().socket_type == SocketType::Seqpacket {
+            if socket.metadata().socket_type == SocketType::Unix {
                 return Ok(());
             }
 
             if let Some(Endpoint::Ip(Some(ip))) = socket.endpoint() {
-                PORT_MANAGER.unbind_port(socket.metadata().unwrap().socket_type, ip.port)?;
+                PORT_MANAGER.unbind_port(socket.metadata().socket_type, ip.port)?;
             }
 
             socket.clear_epoll()?;
@@ -339,7 +335,7 @@ impl IndexNode for SocketInode {
         buf: &mut [u8],
         _data: &mut FilePrivateData,
     ) -> Result<usize, SystemError> {
-        return self.0.lock_no_preempt().read(&mut buf[0..len]).0;
+        self.0.lock_no_preempt().read(&mut buf[0..len]).0
     }
 
     fn write_at(
@@ -349,7 +345,7 @@ impl IndexNode for SocketInode {
         buf: &[u8],
         _data: &mut FilePrivateData,
     ) -> Result<usize, SystemError> {
-        return self.0.lock_no_preempt().write(&buf[0..len], None);
+        self.0.lock_no_preempt().write(&buf[0..len], None)
     }
 
     fn poll(&self, _private_data: &FilePrivateData) -> Result<usize, SystemError> {
@@ -384,11 +380,8 @@ impl IndexNode for SocketInode {
     }
 }
 
-#[allow(dead_code)]
 #[derive(Debug)]
 pub struct SocketHandleItem {
-    /// socket元数据
-    metadata: SocketMetadata,
     /// shutdown状态
     pub shutdown_type: RwLock<ShutdownType>,
     /// socket的waitqueue
@@ -398,25 +391,15 @@ pub struct SocketHandleItem {
 }
 
 impl SocketHandleItem {
-    pub fn new(socket: &dyn Socket) -> Self {
-        Self {
-            metadata: socket.metadata().unwrap(),
-            shutdown_type: RwLock::new(ShutdownType::empty()),
-            wait_queue: EventWaitQueue::new(),
-            epitems: SpinLock::new(LinkedList::new()),
-        }
-    }
-
-    pub fn from_socket<A: Socket>(socket: &A) -> Self {
+    pub fn new() -> Self {
         Self {
-            metadata: socket.metadata().unwrap(),
             shutdown_type: RwLock::new(ShutdownType::empty()),
             wait_queue: EventWaitQueue::new(),
             epitems: SpinLock::new(LinkedList::new()),
         }
     }
 
-    /// ### 在socket的等待队列上睡眠
+    /// ## 在socket的等待队列上睡眠
     pub fn sleep(
         socket_handle: SocketHandle,
         events: u64,
@@ -589,8 +572,8 @@ pub enum SocketType {
     Tcp,
     /// 用于Udp通信的 Socket
     Udp,
-    /// 用于进程间通信的 Socket
-    Seqpacket,
+    /// unix域的 Socket
+    Unix,
 }
 
 bitflags! {

+ 206 - 0
kernel/src/net/socket/unix.rs

@@ -0,0 +1,206 @@
+use alloc::{boxed::Box, sync::Arc, vec::Vec};
+use system_error::SystemError;
+
+use crate::{libs::spinlock::SpinLock, net::Endpoint};
+
+use super::{Socket, SocketInode, SocketMetadata, SocketOptions, SocketType};
+
+#[derive(Debug, Clone)]
+pub struct StreamSocket {
+    metadata: SocketMetadata,
+    buffer: Arc<SpinLock<Vec<u8>>>,
+    peer_inode: Option<Arc<SocketInode>>,
+}
+
+impl StreamSocket {
+    /// 默认的元数据缓冲区大小
+    pub const DEFAULT_METADATA_BUF_SIZE: usize = 1024;
+    /// 默认的缓冲区大小
+    pub const DEFAULT_BUF_SIZE: usize = 64 * 1024;
+
+    /// # 创建一个 Stream Socket
+    ///
+    /// ## 参数
+    /// - `options`: socket选项
+    pub fn new(options: SocketOptions) -> Self {
+        let buffer = Arc::new(SpinLock::new(Vec::with_capacity(Self::DEFAULT_BUF_SIZE)));
+
+        let metadata = SocketMetadata::new(
+            SocketType::Unix,
+            Self::DEFAULT_BUF_SIZE,
+            Self::DEFAULT_BUF_SIZE,
+            Self::DEFAULT_METADATA_BUF_SIZE,
+            options,
+        );
+
+        Self {
+            metadata,
+            buffer,
+            peer_inode: None,
+        }
+    }
+}
+
+impl Socket for StreamSocket {
+    fn read(&self, buf: &mut [u8]) -> (Result<usize, SystemError>, Endpoint) {
+        let mut buffer = self.buffer.lock_irqsave();
+
+        let len = core::cmp::min(buf.len(), buffer.len());
+        buf[..len].copy_from_slice(&buffer[..len]);
+
+        let _ = buffer.split_off(len);
+
+        (Ok(len), Endpoint::Inode(self.peer_inode.clone()))
+    }
+
+    fn write(&self, buf: &[u8], _to: Option<Endpoint>) -> Result<usize, SystemError> {
+        if self.peer_inode.is_none() {
+            return Err(SystemError::ENOTCONN);
+        }
+
+        let peer_inode = self.peer_inode.clone().unwrap();
+        let len = peer_inode.inner().write_buffer(buf)?;
+        Ok(len)
+    }
+
+    fn connect(&mut self, endpoint: Endpoint) -> Result<(), SystemError> {
+        if self.peer_inode.is_some() {
+            return Err(SystemError::EISCONN);
+        }
+
+        if let Endpoint::Inode(inode) = endpoint {
+            self.peer_inode = inode;
+            Ok(())
+        } else {
+            Err(SystemError::EINVAL)
+        }
+    }
+
+    fn write_buffer(&self, buf: &[u8]) -> Result<usize, SystemError> {
+        let mut buffer = self.buffer.lock_irqsave();
+
+        let len = buf.len();
+        if buffer.capacity() - buffer.len() < len {
+            return Err(SystemError::ENOBUFS);
+        }
+        buffer.extend_from_slice(buf);
+
+        Ok(len)
+    }
+
+    fn metadata(&self) -> SocketMetadata {
+        self.metadata.clone()
+    }
+
+    fn box_clone(&self) -> Box<dyn Socket> {
+        Box::new(self.clone())
+    }
+
+    fn as_any_ref(&self) -> &dyn core::any::Any {
+        self
+    }
+
+    fn as_any_mut(&mut self) -> &mut dyn core::any::Any {
+        self
+    }
+}
+
+#[derive(Debug, Clone)]
+pub struct SeqpacketSocket {
+    metadata: SocketMetadata,
+    buffer: Arc<SpinLock<Vec<u8>>>,
+    peer_inode: Option<Arc<SocketInode>>,
+}
+
+impl SeqpacketSocket {
+    /// 默认的元数据缓冲区大小
+    pub const DEFAULT_METADATA_BUF_SIZE: usize = 1024;
+    /// 默认的缓冲区大小
+    pub const DEFAULT_BUF_SIZE: usize = 64 * 1024;
+
+    /// # 创建一个 Seqpacket Socket
+    ///
+    /// ## 参数
+    /// - `options`: socket选项
+    pub fn new(options: SocketOptions) -> Self {
+        let buffer = Arc::new(SpinLock::new(Vec::with_capacity(Self::DEFAULT_BUF_SIZE)));
+
+        let metadata = SocketMetadata::new(
+            SocketType::Unix,
+            Self::DEFAULT_BUF_SIZE,
+            Self::DEFAULT_BUF_SIZE,
+            Self::DEFAULT_METADATA_BUF_SIZE,
+            options,
+        );
+
+        Self {
+            metadata,
+            buffer,
+            peer_inode: None,
+        }
+    }
+}
+
+impl Socket for SeqpacketSocket {
+    fn read(&self, buf: &mut [u8]) -> (Result<usize, SystemError>, Endpoint) {
+        let mut buffer = self.buffer.lock_irqsave();
+
+        let len = core::cmp::min(buf.len(), buffer.len());
+        buf[..len].copy_from_slice(&buffer[..len]);
+
+        let _ = buffer.split_off(len);
+
+        (Ok(len), Endpoint::Inode(self.peer_inode.clone()))
+    }
+
+    fn write(&self, buf: &[u8], _to: Option<Endpoint>) -> Result<usize, SystemError> {
+        if self.peer_inode.is_none() {
+            return Err(SystemError::ENOTCONN);
+        }
+
+        let peer_inode = self.peer_inode.clone().unwrap();
+        let len = peer_inode.inner().write_buffer(buf)?;
+        Ok(len)
+    }
+
+    fn connect(&mut self, endpoint: Endpoint) -> Result<(), SystemError> {
+        if self.peer_inode.is_some() {
+            return Err(SystemError::EISCONN);
+        }
+
+        if let Endpoint::Inode(inode) = endpoint {
+            self.peer_inode = inode;
+            Ok(())
+        } else {
+            Err(SystemError::EINVAL)
+        }
+    }
+
+    fn write_buffer(&self, buf: &[u8]) -> Result<usize, SystemError> {
+        let mut buffer = self.buffer.lock_irqsave();
+
+        let len = buf.len();
+        if buffer.capacity() - buffer.len() < len {
+            return Err(SystemError::ENOBUFS);
+        }
+        buffer.extend_from_slice(buf);
+
+        Ok(len)
+    }
+
+    fn metadata(&self) -> SocketMetadata {
+        self.metadata.clone()
+    }
+
+    fn box_clone(&self) -> Box<dyn Socket> {
+        Box::new(self.clone())
+    }
+
+    fn as_any_ref(&self) -> &dyn core::any::Any {
+        self
+    }
+
+    fn as_any_mut(&mut self) -> &mut dyn core::any::Any {
+        self
+    }
+}

+ 57 - 33
kernel/src/net/syscall.rs

@@ -1,4 +1,4 @@
-use core::cmp::min;
+use core::{cmp::min, ffi::CStr};
 
 use alloc::{boxed::Box, sync::Arc};
 use num_traits::{FromPrimitive, ToPrimitive};
@@ -9,6 +9,7 @@ use crate::{
     filesystem::vfs::{
         file::{File, FileMode},
         syscall::{IoVec, IoVecs},
+        FileType,
     },
     libs::spinlock::SpinLockGuard,
     mm::{verify_area, VirtAddr},
@@ -43,10 +44,12 @@ impl Syscall {
 
         let socket = new_socket(address_family, socket_type, protocol)?;
 
-        let handle_item = SocketHandleItem::new(socket.as_ref());
-        HANDLE_MAP
-            .write_irqsave()
-            .insert(socket.socket_handle(), handle_item);
+        if address_family != AddressFamily::Unix {
+            let handle_item = SocketHandleItem::new();
+            HANDLE_MAP
+                .write_irqsave()
+                .insert(socket.socket_handle(), handle_item);
+        }
 
         let socketinode: Arc<SocketInode> = SocketInode::new(socket);
         let f = File::new(socketinode, FileMode::O_RDWR)?;
@@ -75,25 +78,25 @@ impl Syscall {
         let socket_type = PosixSocketType::try_from((socket_type & 0xf) as u8)?;
         let protocol = Protocol::from(protocol as u8);
 
-        let mut socket0 = new_socket(address_family, socket_type, protocol)?;
-        let mut socket1 = new_socket(address_family, socket_type, protocol)?;
-
-        socket0
-            .socketpair_ops()
-            .unwrap()
-            .socketpair(&mut socket0, &mut socket1);
-
         let binding = ProcessManager::current_pcb().fd_table();
         let mut fd_table_guard = binding.write();
 
-        let mut alloc_fd = |socket: Box<dyn Socket>| -> Result<i32, SystemError> {
-            let socketinode = SocketInode::new(socket);
-            let file = File::new(socketinode, FileMode::O_RDWR)?;
-            fd_table_guard.alloc_fd(file, None)
-        };
+        // 创建一对socket
+        let inode0 = SocketInode::new(new_socket(address_family, socket_type, protocol)?);
+        let inode1 = SocketInode::new(new_socket(address_family, socket_type, protocol)?);
 
-        fds[0] = alloc_fd(socket0)?;
-        fds[1] = alloc_fd(socket1)?;
+        // 进行pair
+        unsafe {
+            inode0
+                .inner_no_preempt()
+                .connect(Endpoint::Inode(Some(inode1.clone())))?;
+            inode1
+                .inner_no_preempt()
+                .connect(Endpoint::Inode(Some(inode0.clone())))?;
+        }
+
+        fds[0] = fd_table_guard.alloc_fd(File::new(inode0, FileMode::O_RDWR)?, None)?;
+        fds[1] = fd_table_guard.alloc_fd(File::new(inode1, FileMode::O_RDWR)?, None)?;
 
         drop(fd_table_guard);
         Ok(0)
@@ -150,7 +153,7 @@ impl Syscall {
                 PosixSocketOption::SO_SNDBUF => {
                     // 返回发送缓冲区大小
                     unsafe {
-                        *optval = socket.metadata()?.tx_buf_size as u32;
+                        *optval = socket.metadata().tx_buf_size as u32;
                         *optlen = core::mem::size_of::<u32>() as u32;
                     }
                     return Ok(0);
@@ -158,7 +161,7 @@ impl Syscall {
                 PosixSocketOption::SO_RCVBUF => {
                     // 返回默认的接收缓冲区大小
                     unsafe {
-                        *optval = socket.metadata()?.rx_buf_size as u32;
+                        *optval = socket.metadata().rx_buf_size as u32;
                         *optlen = core::mem::size_of::<u32>() as u32;
                     }
                     return Ok(0);
@@ -204,9 +207,8 @@ impl Syscall {
             .get_socket(fd as i32)
             .ok_or(SystemError::EBADF)?;
         let mut socket = unsafe { socket.inner_no_preempt() };
-        // kdebug!("connect to {:?}...", endpoint);
         socket.connect(endpoint)?;
-        return Ok(0);
+        Ok(0)
     }
 
     /// @brief sys_bind系统调用的实际执行函数
@@ -223,7 +225,7 @@ impl Syscall {
             .ok_or(SystemError::EBADF)?;
         let mut socket = unsafe { socket.inner_no_preempt() };
         socket.bind(endpoint)?;
-        return Ok(0);
+        Ok(0)
     }
 
     /// @brief sys_sendto系统调用的实际执行函数
@@ -274,7 +276,7 @@ impl Syscall {
         let socket: Arc<SocketInode> = ProcessManager::current_pcb()
             .get_socket(fd as i32)
             .ok_or(SystemError::EBADF)?;
-        let mut socket = unsafe { socket.inner_no_preempt() };
+        let socket = unsafe { socket.inner_no_preempt() };
 
         let (n, endpoint) = socket.read(buf);
         drop(socket);
@@ -305,7 +307,7 @@ impl Syscall {
         let socket: Arc<SocketInode> = ProcessManager::current_pcb()
             .get_socket(fd as i32)
             .ok_or(SystemError::EBADF)?;
-        let mut socket = unsafe { socket.inner_no_preempt() };
+        let socket = unsafe { socket.inner_no_preempt() };
 
         let mut buf = iovs.new_buf(true);
         // 从socket中读取数据
@@ -572,12 +574,13 @@ impl SockAddr {
         .map_err(|_| SystemError::EFAULT)?;
 
         let addr = unsafe { addr.as_ref() }.ok_or(SystemError::EFAULT)?;
-        if len < addr.len()? {
-            return Err(SystemError::EINVAL);
-        }
         unsafe {
             match AddressFamily::try_from(addr.family)? {
                 AddressFamily::INet => {
+                    if len < addr.len()? {
+                        return Err(SystemError::EINVAL);
+                    }
+
                     let addr_in: SockAddrIn = addr.addr_in;
 
                     let ip: wire::IpAddress = wire::IpAddress::from(wire::Ipv4Address::from_bytes(
@@ -587,6 +590,29 @@ impl SockAddr {
 
                     return Ok(Endpoint::Ip(Some(wire::IpEndpoint::new(ip, port))));
                 }
+                AddressFamily::Unix => {
+                    let addr_un: SockAddrUn = addr.addr_un;
+
+                    let path = CStr::from_bytes_until_nul(&addr_un.sun_path)
+                        .map_err(|_| SystemError::EINVAL)?
+                        .to_str()
+                        .map_err(|_| SystemError::EINVAL)?;
+
+                    let fd = Syscall::open(path.as_ptr(), FileMode::O_RDWR.bits(), 0o755, true)?;
+
+                    let binding = ProcessManager::current_pcb().fd_table();
+                    let fd_table_guard = binding.read();
+
+                    let binding = fd_table_guard.get_file_by_fd(fd as i32).unwrap();
+                    let file = binding.lock();
+                    if file.file_type() != FileType::Socket {
+                        return Err(SystemError::ENOTSOCK);
+                    }
+                    let inode = file.inode();
+                    let socketinode = inode.as_any_ref().downcast_ref::<Arc<SocketInode>>();
+
+                    return Ok(Endpoint::Inode(socketinode.cloned()));
+                }
                 AddressFamily::Packet => {
                     // TODO: support packet socket
                     return Err(SystemError::EINVAL);
@@ -595,9 +621,6 @@ impl SockAddr {
                     // TODO: support netlink socket
                     return Err(SystemError::EINVAL);
                 }
-                AddressFamily::Unix => {
-                    return Err(SystemError::EINVAL);
-                }
                 _ => {
                     return Err(SystemError::EINVAL);
                 }
@@ -705,6 +728,7 @@ impl From<Endpoint> for SockAddr {
 
                 return SockAddr { addr_ll };
             }
+
             _ => {
                 // todo: support other endpoint, like Netlink...
                 unimplemented!("not support {value:?}");

+ 3 - 0
user/apps/test_socket/.gitignore

@@ -0,0 +1,3 @@
+/target
+Cargo.lock
+/install/

+ 4 - 0
user/apps/test_socket/Cargo.toml

@@ -0,0 +1,4 @@
+[package]
+name = "test_socket"
+version = "0.1.0"
+edition = "2021"

+ 56 - 0
user/apps/test_socket/Makefile

@@ -0,0 +1,56 @@
+TOOLCHAIN="+nightly-2023-08-15-x86_64-unknown-linux-gnu"
+RUSTFLAGS+=""
+
+ifdef DADK_CURRENT_BUILD_DIR
+# 如果是在dadk中编译,那么安装到dadk的安装目录中
+	INSTALL_DIR = $(DADK_CURRENT_BUILD_DIR)
+else
+# 如果是在本地编译,那么安装到当前目录下的install目录中
+	INSTALL_DIR = ./install
+endif
+
+ifeq ($(ARCH), x86_64)
+	export RUST_TARGET=x86_64-unknown-linux-musl
+else ifeq ($(ARCH), riscv64)
+	export RUST_TARGET=riscv64gc-unknown-linux-gnu
+else 
+# 默认为x86_86,用于本地编译
+	export RUST_TARGET=x86_64-unknown-linux-musl
+endif
+
+run:
+	RUSTFLAGS=$(RUSTFLAGS) cargo $(TOOLCHAIN) run --target $(RUST_TARGET)
+
+build:
+	RUSTFLAGS=$(RUSTFLAGS) cargo $(TOOLCHAIN) build --target $(RUST_TARGET)
+
+clean:
+	RUSTFLAGS=$(RUSTFLAGS) cargo $(TOOLCHAIN) clean --target $(RUST_TARGET)
+
+test:
+	RUSTFLAGS=$(RUSTFLAGS) cargo $(TOOLCHAIN) test --target $(RUST_TARGET)
+
+doc:
+	RUSTFLAGS=$(RUSTFLAGS) cargo $(TOOLCHAIN) doc --target $(RUST_TARGET)
+
+fmt:
+	RUSTFLAGS=$(RUSTFLAGS) cargo $(TOOLCHAIN) fmt
+
+fmt-check:
+	RUSTFLAGS=$(RUSTFLAGS) cargo $(TOOLCHAIN) fmt --check
+
+run-release:
+	RUSTFLAGS=$(RUSTFLAGS) cargo $(TOOLCHAIN) run --target $(RUST_TARGET) --release
+
+build-release:
+	RUSTFLAGS=$(RUSTFLAGS) cargo $(TOOLCHAIN) build --target $(RUST_TARGET) --release
+
+clean-release:
+	RUSTFLAGS=$(RUSTFLAGS) cargo $(TOOLCHAIN) clean --target $(RUST_TARGET) --release
+
+test-release:
+	RUSTFLAGS=$(RUSTFLAGS) cargo $(TOOLCHAIN) test --target $(RUST_TARGET) --release
+
+.PHONY: install
+install:
+	RUSTFLAGS=$(RUSTFLAGS) cargo $(TOOLCHAIN) install --target $(RUST_TARGET) --path . --no-track --root $(INSTALL_DIR) --force

+ 21 - 0
user/apps/test_socket/src/main.rs

@@ -0,0 +1,21 @@
+mod test_unix_stream;
+mod test_unix_stream_pair;
+
+use test_unix_stream::test_unix_stream;
+use test_unix_stream_pair::test_unix_stream_pair;
+
+fn main() -> std::io::Result<()> {
+    if let Err(e) = test_unix_stream() {
+        println!("[ fault ] test_unix_stream, err: {}", e);
+    } else {
+        println!("[success] test_unix_stream");
+    }
+
+    if let Err(e) = test_unix_stream_pair() {
+        println!("[ fault ] test_unix_stream_pair, err: {}", e);
+    } else {
+        println!("[success] test_unix_stream_pair");
+    }
+
+    Ok(())
+}

+ 42 - 0
user/apps/test_socket/src/test_unix_stream.rs

@@ -0,0 +1,42 @@
+use std::io::{Error, Read, Write};
+use std::os::unix::net::{UnixListener, UnixStream};
+use std::thread;
+use std::{fs, str};
+
+const SOCKET_PATH: &str = "/test.socket";
+const MSG: &str = "Hello, unix stream socket!";
+
+fn client() -> std::io::Result<()> {
+    // 连接到服务器
+    let mut stream = UnixStream::connect(SOCKET_PATH)?;
+    // 发送消息到服务器
+    stream.write_all(MSG.as_bytes())?;
+    Ok(())
+}
+
+pub fn test_unix_stream() -> std::io::Result<()> {
+    println!("unix stream socket path: {}", SOCKET_PATH);
+    // 删除可能已存在的socket文件
+    fs::remove_file(&SOCKET_PATH).ok();
+    // 创建Unix域监听socket
+    let listener = UnixListener::bind(SOCKET_PATH)?;
+
+    let client_thread = thread::spawn(move || client());
+
+    // 监听并接受连接
+    let (mut stream, _) = listener.accept().expect("listen error");
+
+    let mut buffer = [0; 1024];
+    let nbytes = stream.read(&mut buffer).expect("read error");
+    let received_msg = str::from_utf8(&buffer[..nbytes]).unwrap();
+
+    client_thread.join().ok();
+
+    fs::remove_file(&SOCKET_PATH).ok();
+
+    if received_msg == MSG {
+        Ok(())
+    } else {
+        Err(Error::from_raw_os_error(-1))
+    }
+}

+ 21 - 0
user/apps/test_socket/src/test_unix_stream_pair.rs

@@ -0,0 +1,21 @@
+use std::io::{Error, Read, Write};
+use std::os::unix::net::UnixStream;
+use std::str;
+
+const MSG: &str = "Hello, unix stream socketpair!";
+
+pub fn test_unix_stream_pair() -> std::io::Result<()> {
+    let (mut sock0, mut sock1) = UnixStream::pair()?;
+
+    sock1.write_all(MSG.as_bytes())?;
+
+    let mut buffer = [0; 1024];
+    let nbytes = sock0.read(&mut buffer).expect("read error");
+    let received_msg = str::from_utf8(&buffer[..nbytes]).unwrap();
+
+    if received_msg == MSG {
+        Ok(())
+    } else {
+        Err(Error::from_raw_os_error(-1))
+    }
+}

+ 22 - 0
user/dadk/config/test_socket-0.1.0.dadk

@@ -0,0 +1,22 @@
+{
+  "name": "test_socket",
+  "version": "0.1.0",
+  "description": "测试socket",
+  "task_type": {
+    "BuildFromSource": {
+      "Local": {
+        "path": "apps/test_socket"
+      }
+    }
+  },
+  "depends": [],
+  "build": {
+    "build_command": "make install"
+  },
+  "clean": {
+    "clean_command": "make clean"
+  },
+  "install": {
+    "in_dragonos_path": "/"
+  }
+}