mod.rs 19 KB


  1. use crate::sched::SchedMode;
  2. use alloc::{
  3. string::String,
  4. sync::{Arc, Weak},
  5. };
  6. use inner::{Connected, Init, Inner, Listener};
  7. use log::debug;
  8. use system_error::SystemError;
  9. use unix::{
  10. ns::abs::{remove_abs_addr, ABS_INODE_MAP},
  11. INODE_MAP,
  12. };
  13. use crate::{
  14. libs::rwlock::RwLock,
  15. net::socket::{self, *},
  16. };
  17. type EP = EPollEventType;
  18. pub mod inner;
  19. #[derive(Debug)]
  20. pub struct StreamSocket {
  21. inner: RwLock<Inner>,
  22. shutdown: Shutdown,
  23. _epitems: EPollItems,
  24. wait_queue: WaitQueue,
  25. self_ref: Weak<Self>,
  26. }
  27. impl StreamSocket {
  28. /// 默认的元数据缓冲区大小
  29. #[allow(dead_code)]
  30. pub const DEFAULT_METADATA_BUF_SIZE: usize = 1024;
  31. /// 默认的缓冲区大小
  32. pub const DEFAULT_BUF_SIZE: usize = 64 * 1024;
  33. pub fn new() -> Arc<Self> {
  34. Arc::new_cyclic(|me| Self {
  35. inner: RwLock::new(Inner::Init(Init::new())),
  36. shutdown: Shutdown::new(),
  37. _epitems: EPollItems::default(),
  38. wait_queue: WaitQueue::default(),
  39. self_ref: me.clone(),
  40. })
  41. }
  42. pub fn new_pairs() -> Result<(Arc<Inode>, Arc<Inode>), SystemError> {
  43. let socket0 = StreamSocket::new();
  44. let socket1 = StreamSocket::new();
  45. let inode0 = Inode::new(socket0.clone());
  46. let inode1 = Inode::new(socket1.clone());
  47. let (conn_0, conn_1) = Connected::new_pair(
  48. Some(Endpoint::Inode((inode0.clone(), String::from("")))),
  49. Some(Endpoint::Inode((inode1.clone(), String::from("")))),
  50. );
  51. *socket0.inner.write() = Inner::Connected(conn_0);
  52. *socket1.inner.write() = Inner::Connected(conn_1);
  53. return Ok((inode0, inode1));
  54. }
  55. #[allow(dead_code)]
  56. pub fn new_connected(connected: Connected) -> Arc<Self> {
  57. Arc::new_cyclic(|me| Self {
  58. inner: RwLock::new(Inner::Connected(connected)),
  59. shutdown: Shutdown::new(),
  60. _epitems: EPollItems::default(),
  61. wait_queue: WaitQueue::default(),
  62. self_ref: me.clone(),
  63. })
  64. }
  65. pub fn new_inode() -> Result<Arc<Inode>, SystemError> {
  66. let socket = StreamSocket::new();
  67. let inode = Inode::new(socket.clone());
  68. let _ = match &mut *socket.inner.write() {
  69. Inner::Init(init) => init.bind(Endpoint::Inode((inode.clone(), String::from("")))),
  70. _ => return Err(SystemError::EINVAL),
  71. };
  72. return Ok(inode);
  73. }
  74. fn is_acceptable(&self) -> bool {
  75. match &*self.inner.read() {
  76. Inner::Listener(listener) => listener.is_acceptable(),
  77. _ => {
  78. panic!("the socket is not listening");
  79. }
  80. }
  81. }
  82. pub fn try_accept(&self) -> Result<(Arc<Inode>, Endpoint), SystemError> {
  83. match &*self.inner.read() {
  84. Inner::Listener(listener) => listener.try_accept() as _,
  85. _ => {
  86. log::error!("the socket is not listening");
  87. return Err(SystemError::EINVAL);
  88. }
  89. }
  90. }
  91. fn is_peer_shutdown(&self) -> Result<bool, SystemError> {
  92. let peer_shutdown = match self.get_peer_name()? {
  93. Endpoint::Inode((inode, _)) => Arc::downcast::<StreamSocket>(inode.inner())
  94. .map_err(|_| SystemError::EINVAL)?
  95. .shutdown
  96. .get()
  97. .is_both_shutdown(),
  98. _ => return Err(SystemError::EINVAL),
  99. };
  100. Ok(peer_shutdown)
  101. }
  102. fn can_recv(&self) -> Result<bool, SystemError> {
  103. let can = match &*self.inner.read() {
  104. Inner::Connected(connected) => connected.can_recv(),
  105. _ => return Err(SystemError::ENOTCONN),
  106. };
  107. Ok(can)
  108. }
  109. }
  110. impl Socket for StreamSocket {
  111. fn connect(&self, server_endpoint: Endpoint) -> Result<(), SystemError> {
  112. //获取客户端地址
  113. let client_endpoint = match &mut *self.inner.write() {
  114. Inner::Init(init) => match init.endpoint().cloned() {
  115. Some(endpoint) => {
  116. debug!("bind when connected");
  117. Some(endpoint)
  118. }
  119. None => {
  120. debug!("not bind when connected");
  121. let inode = Inode::new(self.self_ref.upgrade().unwrap().clone());
  122. let epoint = Endpoint::Inode((inode.clone(), String::from("")));
  123. let _ = init.bind(epoint.clone());
  124. Some(epoint)
  125. }
  126. },
  127. Inner::Connected(_) => return Err(SystemError::EISCONN),
  128. Inner::Listener(_) => return Err(SystemError::EINVAL),
  129. };
  130. //获取服务端地址
  131. // let peer_inode = match server_endpoint.clone() {
  132. // Endpoint::Inode(socket) => socket,
  133. // _ => return Err(SystemError::EINVAL),
  134. // };
  135. //找到对端socket
  136. let (peer_inode, sun_path) = match server_endpoint {
  137. Endpoint::Inode((inode, path)) => (inode, path),
  138. Endpoint::Unixpath((inode_id, path)) => {
  139. let inode_guard = INODE_MAP.read_irqsave();
  140. let inode = inode_guard.get(&inode_id).unwrap();
  141. match inode {
  142. Endpoint::Inode((inode, _)) => (inode.clone(), path),
  143. _ => return Err(SystemError::EINVAL),
  144. }
  145. }
  146. Endpoint::Abspath((abs_addr, path)) => {
  147. let inode_guard = ABS_INODE_MAP.lock_irqsave();
  148. let inode = match inode_guard.get(&abs_addr.name()) {
  149. Some(inode) => inode,
  150. None => {
  151. log::debug!("can not find inode from absInodeMap");
  152. return Err(SystemError::EINVAL);
  153. }
  154. };
  155. match inode {
  156. Endpoint::Inode((inode, _)) => (inode.clone(), path),
  157. _ => {
  158. debug!("when connect, find inode failed!");
  159. return Err(SystemError::EINVAL);
  160. }
  161. }
  162. }
  163. _ => return Err(SystemError::EINVAL),
  164. };
  165. let remote_socket: Arc<StreamSocket> =
  166. Arc::downcast::<StreamSocket>(peer_inode.inner()).map_err(|_| SystemError::EINVAL)?;
  167. //创建新的对端socket
  168. let new_server_socket = StreamSocket::new();
  169. let new_server_inode = Inode::new(new_server_socket.clone());
  170. let new_server_endpoint = Some(Endpoint::Inode((new_server_inode.clone(), sun_path)));
  171. //获取connect pair
  172. let (client_conn, server_conn) =
  173. Connected::new_pair(client_endpoint, new_server_endpoint.clone());
  174. *new_server_socket.inner.write() = Inner::Connected(server_conn);
  175. //查看remote_socket是否处于监听状态
  176. let remote_listener = remote_socket.inner.write();
  177. match &*remote_listener {
  178. Inner::Listener(listener) => {
  179. //往服务端socket的连接队列中添加connected
  180. listener.push_incoming(new_server_inode)?;
  181. *self.inner.write() = Inner::Connected(client_conn);
  182. remote_socket.wait_queue.wakeup(None);
  183. }
  184. _ => return Err(SystemError::EINVAL),
  185. }
  186. return Ok(());
  187. }
  188. fn bind(&self, endpoint: Endpoint) -> Result<(), SystemError> {
  189. match endpoint {
  190. Endpoint::Unixpath((inodeid, path)) => {
  191. let inode = match &mut *self.inner.write() {
  192. Inner::Init(init) => init.bind_path(path)?,
  193. _ => {
  194. log::error!("socket has listen or connected");
  195. return Err(SystemError::EINVAL);
  196. }
  197. };
  198. INODE_MAP.write_irqsave().insert(inodeid, inode);
  199. Ok(())
  200. }
  201. Endpoint::Abspath((abshandle, path)) => {
  202. let inode = match &mut *self.inner.write() {
  203. Inner::Init(init) => init.bind_path(path)?,
  204. _ => {
  205. log::error!("socket has listen or connected");
  206. return Err(SystemError::EINVAL);
  207. }
  208. };
  209. ABS_INODE_MAP.lock_irqsave().insert(abshandle.name(), inode);
  210. Ok(())
  211. }
  212. _ => return Err(SystemError::EINVAL),
  213. }
  214. }
  215. fn shutdown(&self, _stype: ShutdownTemp) -> Result<(), SystemError> {
  216. todo!();
  217. }
  218. fn listen(&self, backlog: usize) -> Result<(), SystemError> {
  219. let mut inner = self.inner.write();
  220. let epoint = match &*inner {
  221. Inner::Init(init) => init.endpoint().ok_or(SystemError::EINVAL)?.clone(),
  222. Inner::Connected(_) => {
  223. return Err(SystemError::EINVAL);
  224. }
  225. Inner::Listener(listener) => {
  226. return listener.listen(backlog);
  227. }
  228. };
  229. let listener = Listener::new(Some(epoint), backlog);
  230. *inner = Inner::Listener(listener);
  231. return Ok(());
  232. }
  233. fn accept(&self) -> Result<(Arc<socket::Inode>, Endpoint), SystemError> {
  234. debug!("stream server begin accept");
  235. //目前只实现了阻塞式实现
  236. loop {
  237. wq_wait_event_interruptible!(self.wait_queue, self.is_acceptable(), {})?;
  238. match self.try_accept() {
  239. Ok((socket, endpoint)) => {
  240. debug!("server accept!:{:?}", endpoint);
  241. return Ok((socket, endpoint));
  242. }
  243. Err(_) => continue,
  244. }
  245. }
  246. }
  247. fn set_option(&self, _level: PSOL, _optname: usize, _optval: &[u8]) -> Result<(), SystemError> {
  248. log::warn!("setsockopt is not implemented");
  249. Ok(())
  250. }
  251. fn wait_queue(&self) -> &WaitQueue {
  252. return &self.wait_queue;
  253. }
  254. fn poll(&self) -> usize {
  255. let mut mask = EP::empty();
  256. let shutdown = self.shutdown.get();
  257. // 参考linux的unix_poll https://code.dragonos.org.cn/xref/linux-6.1.9/net/unix/af_unix.c#3152
  258. // 用关闭读写端表示连接断开
  259. if shutdown.is_both_shutdown() || self.is_peer_shutdown().unwrap() {
  260. mask |= EP::EPOLLHUP;
  261. }
  262. if shutdown.is_recv_shutdown() {
  263. mask |= EP::EPOLLRDHUP | EP::EPOLLIN | EP::EPOLLRDNORM;
  264. }
  265. match &*self.inner.read() {
  266. Inner::Connected(connected) => {
  267. if connected.can_recv() {
  268. mask |= EP::EPOLLIN | EP::EPOLLRDNORM;
  269. }
  270. // if (sk_is_readable(sk))
  271. // mask |= EPOLLIN | EPOLLRDNORM;
  272. // TODO:处理紧急情况 EPOLLPRI
  273. // TODO:处理连接是否关闭 EPOLLHUP
  274. if !shutdown.is_send_shutdown() {
  275. if connected.can_send().unwrap() {
  276. mask |= EP::EPOLLOUT | EP::EPOLLWRNORM | EP::EPOLLWRBAND;
  277. } else {
  278. todo!("poll: buffer space not enough");
  279. }
  280. }
  281. }
  282. Inner::Listener(_) => mask |= EP::EPOLLIN,
  283. Inner::Init(_) => mask |= EP::EPOLLOUT,
  284. }
  285. mask.bits() as usize
  286. }
  287. fn close(&self) -> Result<(), SystemError> {
  288. self.shutdown.recv_shutdown();
  289. self.shutdown.send_shutdown();
  290. let endpoint = self.get_name()?;
  291. let path = match &endpoint {
  292. Endpoint::Inode((_, path)) => path,
  293. Endpoint::Unixpath((_, path)) => path,
  294. Endpoint::Abspath((_, path)) => path,
  295. _ => return Err(SystemError::EINVAL),
  296. };
  297. if path.is_empty() {
  298. return Ok(());
  299. }
  300. match &endpoint {
  301. Endpoint::Unixpath((inode_id, _)) => {
  302. let mut inode_guard = INODE_MAP.write_irqsave();
  303. inode_guard.remove(inode_id);
  304. }
  305. Endpoint::Inode((current_inode, current_path)) => {
  306. let mut inode_guard = INODE_MAP.write_irqsave();
  307. // 遍历查找匹配的条目
  308. let target_entry = inode_guard
  309. .iter()
  310. .find(|(_, ep)| {
  311. if let Endpoint::Inode((map_inode, map_path)) = ep {
  312. // 通过指针相等性比较确保是同一对象
  313. Arc::ptr_eq(map_inode, current_inode) && map_path == current_path
  314. } else {
  315. log::debug!("not match");
  316. false
  317. }
  318. })
  319. .map(|(id, _)| *id);
  320. if let Some(id) = target_entry {
  321. inode_guard.remove(&id).ok_or(SystemError::EINVAL)?;
  322. }
  323. }
  324. Endpoint::Abspath((abshandle, _)) => {
  325. let mut abs_inode_map = ABS_INODE_MAP.lock_irqsave();
  326. abs_inode_map.remove(&abshandle.name());
  327. }
  328. _ => {
  329. log::error!("invalid endpoint type");
  330. return Err(SystemError::EINVAL);
  331. }
  332. }
  333. *self.inner.write() = Inner::Init(Init::new());
  334. self.wait_queue.wakeup(None);
  335. let _ = remove_abs_addr(path);
  336. Ok(())
  337. }
  338. fn get_peer_name(&self) -> Result<Endpoint, SystemError> {
  339. //获取对端地址
  340. let endpoint = match &*self.inner.read() {
  341. Inner::Connected(connected) => connected.peer_endpoint().cloned(),
  342. _ => return Err(SystemError::ENOTCONN),
  343. };
  344. if let Some(endpoint) = endpoint {
  345. return Ok(endpoint);
  346. } else {
  347. return Err(SystemError::EAGAIN_OR_EWOULDBLOCK);
  348. }
  349. }
  350. fn get_name(&self) -> Result<Endpoint, SystemError> {
  351. //获取本端地址
  352. let endpoint = match &*self.inner.read() {
  353. Inner::Init(init) => init.endpoint().cloned(),
  354. Inner::Connected(connected) => connected.endpoint().cloned(),
  355. Inner::Listener(listener) => listener.endpoint().cloned(),
  356. };
  357. if let Some(endpoint) = endpoint {
  358. return Ok(endpoint);
  359. } else {
  360. return Err(SystemError::EAGAIN_OR_EWOULDBLOCK);
  361. }
  362. }
  363. fn get_option(
  364. &self,
  365. _level: PSOL,
  366. _name: usize,
  367. _value: &mut [u8],
  368. ) -> Result<usize, SystemError> {
  369. log::warn!("getsockopt is not implemented");
  370. Ok(0)
  371. }
  372. fn read(&self, buffer: &mut [u8]) -> Result<usize, SystemError> {
  373. self.recv(buffer, socket::PMSG::empty())
  374. }
  375. fn recv(&self, buffer: &mut [u8], flags: socket::PMSG) -> Result<usize, SystemError> {
  376. if !flags.contains(PMSG::DONTWAIT) {
  377. loop {
  378. log::debug!("socket try recv");
  379. wq_wait_event_interruptible!(
  380. self.wait_queue,
  381. self.can_recv()? || self.is_peer_shutdown()?,
  382. {}
  383. )?;
  384. // connect锁和flag判断顺序不正确,应该先判断在
  385. match &*self.inner.write() {
  386. Inner::Connected(connected) => match connected.try_recv(buffer) {
  387. Ok(usize) => {
  388. log::debug!("recv successfully");
  389. return Ok(usize);
  390. }
  391. Err(_) => continue,
  392. },
  393. _ => {
  394. log::error!("the socket is not connected");
  395. return Err(SystemError::ENOTCONN);
  396. }
  397. }
  398. }
  399. } else {
  400. unimplemented!("unimplemented non_block")
  401. }
  402. }
  403. fn recv_from(
  404. &self,
  405. buffer: &mut [u8],
  406. flags: socket::PMSG,
  407. _address: Option<Endpoint>,
  408. ) -> Result<(usize, Endpoint), SystemError> {
  409. if flags.contains(PMSG::OOB) {
  410. return Err(SystemError::EOPNOTSUPP_OR_ENOTSUP);
  411. }
  412. if !flags.contains(PMSG::DONTWAIT) {
  413. loop {
  414. log::debug!("socket try recv from");
  415. wq_wait_event_interruptible!(
  416. self.wait_queue,
  417. self.can_recv()? || self.is_peer_shutdown()?,
  418. {}
  419. )?;
  420. // connect锁和flag判断顺序不正确,应该先判断在
  421. log::debug!("try recv");
  422. match &*self.inner.write() {
  423. Inner::Connected(connected) => match connected.try_recv(buffer) {
  424. Ok(usize) => {
  425. log::debug!("recvs from successfully");
  426. return Ok((usize, connected.peer_endpoint().unwrap().clone()));
  427. }
  428. Err(_) => continue,
  429. },
  430. _ => {
  431. log::error!("the socket is not connected");
  432. return Err(SystemError::ENOTCONN);
  433. }
  434. }
  435. }
  436. } else {
  437. unimplemented!("unimplemented non_block")
  438. }
  439. }
  440. fn recv_msg(
  441. &self,
  442. _msg: &mut crate::net::syscall::MsgHdr,
  443. _flags: socket::PMSG,
  444. ) -> Result<usize, SystemError> {
  445. Err(SystemError::ENOSYS)
  446. }
  447. fn send(&self, buffer: &[u8], flags: socket::PMSG) -> Result<usize, SystemError> {
  448. if self.is_peer_shutdown()? {
  449. return Err(SystemError::EPIPE);
  450. }
  451. if !flags.contains(PMSG::DONTWAIT) {
  452. loop {
  453. match &*self.inner.write() {
  454. Inner::Connected(connected) => match connected.try_send(buffer) {
  455. Ok(usize) => {
  456. log::debug!("send successfully");
  457. return Ok(usize);
  458. }
  459. Err(_) => continue,
  460. },
  461. _ => {
  462. log::error!("the socket is not connected");
  463. return Err(SystemError::ENOTCONN);
  464. }
  465. }
  466. }
  467. } else {
  468. unimplemented!("unimplemented non_block")
  469. }
  470. }
  471. fn send_msg(
  472. &self,
  473. _msg: &crate::net::syscall::MsgHdr,
  474. _flags: socket::PMSG,
  475. ) -> Result<usize, SystemError> {
  476. todo!()
  477. }
  478. fn send_to(
  479. &self,
  480. _buffer: &[u8],
  481. _flags: socket::PMSG,
  482. _address: Endpoint,
  483. ) -> Result<usize, SystemError> {
  484. Err(SystemError::ENOSYS)
  485. }
  486. fn write(&self, buffer: &[u8]) -> Result<usize, SystemError> {
  487. self.send(buffer, socket::PMSG::empty())
  488. }
  489. fn send_buffer_size(&self) -> usize {
  490. log::warn!("using default buffer size");
  491. StreamSocket::DEFAULT_BUF_SIZE
  492. }
  493. fn recv_buffer_size(&self) -> usize {
  494. log::warn!("using default buffer size");
  495. StreamSocket::DEFAULT_BUF_SIZE
  496. }
  497. }