inner.rs 7.2 KB


  1. use core::sync::atomic::{AtomicUsize, Ordering};
  2. use log::debug;
  3. use system_error::SystemError;
  4. use crate::libs::mutex::Mutex;
  5. use crate::net::socket::buffer::Buffer;
  6. use crate::net::socket::unix::stream::StreamSocket;
  7. use crate::net::socket::{Endpoint, Inode, ShutdownTemp};
  8. use alloc::collections::VecDeque;
  9. use alloc::{string::String, sync::Arc};
  10. #[derive(Debug)]
  11. pub enum Inner {
  12. Init(Init),
  13. Connected(Connected),
  14. Listener(Listener),
  15. }
  16. #[derive(Debug)]
  17. pub struct Init {
  18. addr: Option<Endpoint>,
  19. }
  20. impl Init {
  21. pub(super) fn new() -> Self {
  22. Self { addr: None }
  23. }
  24. pub(super) fn bind(&mut self, endpoint_to_bind: Endpoint) -> Result<(), SystemError> {
  25. if self.addr.is_some() {
  26. log::error!("the socket is already bound");
  27. return Err(SystemError::EINVAL);
  28. }
  29. match endpoint_to_bind {
  30. Endpoint::Inode(_) => self.addr = Some(endpoint_to_bind),
  31. _ => return Err(SystemError::EINVAL),
  32. }
  33. return Ok(());
  34. }
  35. pub fn bind_path(&mut self, sun_path: String) -> Result<Endpoint, SystemError> {
  36. if self.addr.is_none() {
  37. log::error!("the socket is not bound");
  38. return Err(SystemError::EINVAL);
  39. }
  40. if let Some(Endpoint::Inode((inode, mut path))) = self.addr.take() {
  41. path = sun_path;
  42. let epoint = Endpoint::Inode((inode, path.clone()));
  43. self.addr.replace(epoint.clone());
  44. log::debug!("bind path in inode : {:?}", path);
  45. return Ok(epoint);
  46. };
  47. return Err(SystemError::EINVAL);
  48. }
  49. pub(super) fn endpoint(&self) -> Option<&Endpoint> {
  50. self.addr.as_ref()
  51. }
  52. }
  53. #[derive(Debug, Clone)]
  54. pub struct Connected {
  55. addr: Option<Endpoint>,
  56. peer_addr: Option<Endpoint>,
  57. buffer: Arc<Buffer>,
  58. }
  59. impl Connected {
  60. pub fn new_pair(addr: Option<Endpoint>, peer_addr: Option<Endpoint>) -> (Self, Self) {
  61. let this = Connected {
  62. addr: addr.clone(),
  63. peer_addr: peer_addr.clone(),
  64. buffer: Buffer::new(),
  65. };
  66. let peer = Connected {
  67. addr: peer_addr,
  68. peer_addr: addr,
  69. buffer: Buffer::new(),
  70. };
  71. return (this, peer);
  72. }
  73. pub fn endpoint(&self) -> Option<&Endpoint> {
  74. self.addr.as_ref()
  75. }
  76. pub fn set_addr(&mut self, addr: Option<Endpoint>) {
  77. self.addr = addr;
  78. }
  79. pub fn peer_endpoint(&self) -> Option<&Endpoint> {
  80. self.peer_addr.as_ref()
  81. }
  82. pub fn set_peer_addr(&mut self, peer: Option<Endpoint>) {
  83. self.peer_addr = peer;
  84. }
  85. pub fn send_slice(&self, buf: &[u8]) -> Result<usize, SystemError> {
  86. //写入对端buffer
  87. let peer_inode = match self.peer_addr.as_ref().unwrap() {
  88. Endpoint::Inode((inode, _)) => inode,
  89. _ => return Err(SystemError::EINVAL),
  90. };
  91. let peer_socket =
  92. Arc::downcast::<StreamSocket>(peer_inode.inner()).map_err(|_| SystemError::EINVAL)?;
  93. let usize = match &*peer_socket.inner.read() {
  94. Inner::Connected(conntected) => {
  95. let usize = conntected.buffer.write_read_buffer(buf)?;
  96. usize
  97. }
  98. _ => {
  99. debug!("no! is not connested!");
  100. return Err(SystemError::EINVAL);
  101. }
  102. };
  103. peer_socket.wait_queue.wakeup(None);
  104. Ok(usize)
  105. }
  106. pub fn can_send(&self) -> Result<bool, SystemError> {
  107. //查看连接体里的buf是否非满
  108. let peer_inode = match self.peer_addr.as_ref().unwrap() {
  109. Endpoint::Inode((inode, _)) => inode,
  110. _ => return Err(SystemError::EINVAL),
  111. };
  112. let peer_socket =
  113. Arc::downcast::<StreamSocket>(peer_inode.inner()).map_err(|_| SystemError::EINVAL)?;
  114. let is_full = match &*peer_socket.inner.read() {
  115. Inner::Connected(connected) => connected.buffer.is_read_buf_full(),
  116. _ => return Err(SystemError::EINVAL),
  117. };
  118. debug!("can send? :{}", !is_full);
  119. Ok(!is_full)
  120. }
  121. pub fn can_recv(&self) -> bool {
  122. //查看连接体里的buf是否非空
  123. return !self.buffer.is_read_buf_empty();
  124. }
  125. pub fn try_send(&self, buf: &[u8]) -> Result<usize, SystemError> {
  126. if self.can_send()? {
  127. return self.send_slice(buf);
  128. } else {
  129. return Err(SystemError::ENOBUFS);
  130. }
  131. }
  132. fn recv_slice(&self, buf: &mut [u8]) -> Result<usize, SystemError> {
  133. return self.buffer.read_read_buffer(buf);
  134. }
  135. pub fn try_recv(&self, buf: &mut [u8]) -> Result<usize, SystemError> {
  136. if self.can_recv() {
  137. return self.recv_slice(buf);
  138. } else {
  139. return Err(SystemError::EINVAL);
  140. }
  141. }
  142. pub fn shutdown(&self, how: ShutdownTemp) -> Result<(), SystemError> {
  143. if how.is_empty() {
  144. return Err(SystemError::EINVAL);
  145. } else if how.is_send_shutdown() {
  146. unimplemented!("unimplemented!");
  147. } else if how.is_recv_shutdown() {
  148. unimplemented!("unimplemented!");
  149. }
  150. Ok(())
  151. }
  152. }
  153. #[derive(Debug)]
  154. pub struct Listener {
  155. addr: Option<Endpoint>,
  156. incoming_connects: Mutex<VecDeque<Arc<Inode>>>,
  157. backlog: AtomicUsize,
  158. }
  159. impl Listener {
  160. pub fn new(addr: Option<Endpoint>, backlog: usize) -> Self {
  161. Self {
  162. addr,
  163. incoming_connects: Mutex::new(VecDeque::new()),
  164. backlog: AtomicUsize::new(backlog),
  165. }
  166. }
  167. pub fn listen(&self, backlog: usize) -> Result<(), SystemError> {
  168. self.backlog.store(backlog, Ordering::Relaxed);
  169. return Ok(());
  170. }
  171. pub fn push_incoming(&self, server_inode: Arc<Inode>) -> Result<(), SystemError> {
  172. let mut incoming_connects = self.incoming_connects.lock();
  173. if incoming_connects.len() >= self.backlog.load(Ordering::Relaxed) {
  174. debug!("unix stream listen socket connected queue is full!");
  175. return Err(SystemError::EAGAIN_OR_EWOULDBLOCK);
  176. }
  177. incoming_connects.push_back(server_inode);
  178. return Ok(());
  179. }
  180. pub fn pop_incoming(&self) -> Option<Arc<Inode>> {
  181. let mut incoming_connects = self.incoming_connects.lock();
  182. return incoming_connects.pop_front();
  183. }
  184. pub(super) fn endpoint(&self) -> Option<&Endpoint> {
  185. self.addr.as_ref()
  186. }
  187. pub(super) fn is_acceptable(&self) -> bool {
  188. return self.incoming_connects.lock().len() != 0;
  189. }
  190. pub(super) fn try_accept(&self) -> Result<(Arc<Inode>, Endpoint), SystemError> {
  191. let mut incoming_connecteds = self.incoming_connects.lock();
  192. debug!("incom len {}", incoming_connecteds.len());
  193. let connected = incoming_connecteds
  194. .pop_front()
  195. .ok_or(SystemError::EAGAIN_OR_EWOULDBLOCK)?;
  196. let socket =
  197. Arc::downcast::<StreamSocket>(connected.inner()).map_err(|_| SystemError::EINVAL)?;
  198. let peer = match &*socket.inner.read() {
  199. Inner::Connected(connected) => connected.peer_endpoint().unwrap().clone(),
  200. _ => return Err(SystemError::ENOTCONN),
  201. };
  202. debug!("server accept!");
  203. return Ok((Inode::new(socket), peer));
  204. }
  205. }