inner.rs 17 KB


  1. use core::sync::atomic::AtomicUsize;
  2. use crate::libs::rwlock::RwLock;
  3. use crate::net::socket::EPollEventType;
  4. use crate::net::socket::{self, inet::Types};
  5. use alloc::boxed::Box;
  6. use alloc::vec::Vec;
  7. use smoltcp;
  8. use system_error::SystemError::{self, *};
  9. // pub const DEFAULT_METADATA_BUF_SIZE: usize = 1024;
  10. pub const DEFAULT_RX_BUF_SIZE: usize = 512 * 1024;
  11. pub const DEFAULT_TX_BUF_SIZE: usize = 512 * 1024;
  12. fn new_smoltcp_socket() -> smoltcp::socket::tcp::Socket<'static> {
  13. let rx_buffer = smoltcp::socket::tcp::SocketBuffer::new(vec![0; DEFAULT_RX_BUF_SIZE]);
  14. let tx_buffer = smoltcp::socket::tcp::SocketBuffer::new(vec![0; DEFAULT_TX_BUF_SIZE]);
  15. smoltcp::socket::tcp::Socket::new(rx_buffer, tx_buffer)
  16. }
  17. fn new_listen_smoltcp_socket<T>(local_endpoint: T) -> smoltcp::socket::tcp::Socket<'static>
  18. where
  19. T: Into<smoltcp::wire::IpListenEndpoint>,
  20. {
  21. let mut socket = new_smoltcp_socket();
  22. socket.listen(local_endpoint).unwrap();
  23. socket
  24. }
  25. #[derive(Debug)]
  26. pub enum Init {
  27. Unbound(
  28. (
  29. Box<smoltcp::socket::tcp::Socket<'static>>,
  30. smoltcp::wire::IpVersion,
  31. ),
  32. ),
  33. Bound((socket::inet::BoundInner, smoltcp::wire::IpEndpoint)),
  34. }
  35. impl Init {
  36. pub(super) fn new(ver: smoltcp::wire::IpVersion) -> Self {
  37. Init::Unbound((Box::new(new_smoltcp_socket()), ver))
  38. }
  39. /// 传入一个已经绑定的socket
  40. pub(super) fn new_bound(inner: socket::inet::BoundInner) -> Self {
  41. let endpoint = inner.with::<smoltcp::socket::tcp::Socket, _, _>(|socket| {
  42. socket
  43. .local_endpoint()
  44. .expect("A Bound Socket Must Have A Local Endpoint")
  45. });
  46. Init::Bound((inner, endpoint))
  47. }
  48. pub(super) fn bind(
  49. self,
  50. local_endpoint: smoltcp::wire::IpEndpoint,
  51. ) -> Result<Self, SystemError> {
  52. match self {
  53. Init::Unbound((socket, _)) => {
  54. let bound = socket::inet::BoundInner::bind(*socket, &local_endpoint.addr)?;
  55. bound
  56. .port_manager()
  57. .bind_port(Types::Tcp, local_endpoint.port)?;
  58. // bound.iface().common().bind_socket()
  59. Ok(Init::Bound((bound, local_endpoint)))
  60. }
  61. Init::Bound(_) => {
  62. log::debug!("Already Bound");
  63. Err(EINVAL)
  64. }
  65. }
  66. }
  67. pub(super) fn bind_to_ephemeral(
  68. self,
  69. remote_endpoint: smoltcp::wire::IpEndpoint,
  70. ) -> Result<(socket::inet::BoundInner, smoltcp::wire::IpEndpoint), (Self, SystemError)> {
  71. match self {
  72. Init::Unbound((socket, ver)) => {
  73. let (bound, address) =
  74. socket::inet::BoundInner::bind_ephemeral(*socket, remote_endpoint.addr)
  75. .map_err(|err| (Self::new(ver), err))?;
  76. let bound_port = bound
  77. .port_manager()
  78. .bind_ephemeral_port(Types::Tcp)
  79. .map_err(|err| (Self::new(ver), err))?;
  80. let endpoint = smoltcp::wire::IpEndpoint::new(address, bound_port);
  81. Ok((bound, endpoint))
  82. }
  83. Init::Bound(_) => Err((self, EINVAL)),
  84. }
  85. }
  86. pub(super) fn connect(
  87. self,
  88. remote_endpoint: smoltcp::wire::IpEndpoint,
  89. ) -> Result<Connecting, (Self, SystemError)> {
  90. let (inner, local) = match self {
  91. Init::Unbound(_) => self.bind_to_ephemeral(remote_endpoint)?,
  92. Init::Bound(inner) => inner,
  93. };
  94. if local.addr.is_unspecified() {
  95. return Err((Init::Bound((inner, local)), EINVAL));
  96. }
  97. let result = inner.with_mut::<smoltcp::socket::tcp::Socket, _, _>(|socket| {
  98. socket
  99. .connect(
  100. inner.iface().smol_iface().lock().context(),
  101. remote_endpoint,
  102. local,
  103. )
  104. .map_err(|_| ECONNREFUSED)
  105. });
  106. match result {
  107. Ok(_) => Ok(Connecting::new(inner)),
  108. Err(err) => Err((Init::Bound((inner, local)), err)),
  109. }
  110. }
  111. /// # `listen`
  112. pub(super) fn listen(self, backlog: usize) -> Result<Listening, (Self, SystemError)> {
  113. let (inner, local) = match self {
  114. Init::Unbound(_) => {
  115. return Err((self, EINVAL));
  116. }
  117. Init::Bound(inner) => inner,
  118. };
  119. let listen_addr = if local.addr.is_unspecified() {
  120. smoltcp::wire::IpListenEndpoint::from(local.port)
  121. } else {
  122. smoltcp::wire::IpListenEndpoint::from(local)
  123. };
  124. log::debug!("listen at {:?}", listen_addr);
  125. let mut inners = Vec::new();
  126. if let Err(err) = || -> Result<(), SystemError> {
  127. for _ in 0..(backlog - 1) {
  128. // -1 because the first one is already bound
  129. let new_listen = socket::inet::BoundInner::bind(
  130. new_listen_smoltcp_socket(listen_addr),
  131. listen_addr
  132. .addr
  133. .as_ref()
  134. .unwrap_or(&smoltcp::wire::IpAddress::from(
  135. smoltcp::wire::Ipv4Address::UNSPECIFIED,
  136. )),
  137. )?;
  138. inners.push(new_listen);
  139. }
  140. Ok(())
  141. }() {
  142. return Err((Init::Bound((inner, local)), err));
  143. }
  144. if let Err(err) = inner.with_mut::<smoltcp::socket::tcp::Socket, _, _>(|socket| {
  145. socket.listen(listen_addr).map_err(|_| ECONNREFUSED)
  146. }) {
  147. return Err((Init::Bound((inner, local)), err));
  148. }
  149. inners.push(inner);
  150. return Ok(Listening {
  151. inners,
  152. connect: AtomicUsize::new(0),
  153. listen_addr,
  154. });
  155. }
  156. pub(super) fn close(&self) {
  157. match self {
  158. Init::Unbound(_) => {}
  159. Init::Bound((inner, endpoint)) => {
  160. inner.port_manager().unbind_port(Types::Tcp, endpoint.port);
  161. inner.with_mut::<smoltcp::socket::tcp::Socket, _, _>(|socket| socket.close());
  162. }
  163. }
  164. }
  165. }
  166. #[derive(Debug, Default, Clone, Copy)]
  167. enum ConnectResult {
  168. Connected,
  169. #[default]
  170. Connecting,
  171. Refused,
  172. }
  173. #[derive(Debug)]
  174. pub struct Connecting {
  175. inner: socket::inet::BoundInner,
  176. result: RwLock<ConnectResult>,
  177. }
  178. impl Connecting {
  179. fn new(inner: socket::inet::BoundInner) -> Self {
  180. Connecting {
  181. inner,
  182. result: RwLock::new(ConnectResult::Connecting),
  183. }
  184. }
  185. pub fn with_mut<R, F: FnMut(&mut smoltcp::socket::tcp::Socket<'static>) -> R>(
  186. &self,
  187. f: F,
  188. ) -> R {
  189. self.inner.with_mut(f)
  190. }
  191. pub fn into_result(self) -> (Inner, Result<(), SystemError>) {
  192. use ConnectResult::*;
  193. let result = *self.result.read_irqsave();
  194. match result {
  195. Connecting => (Inner::Connecting(self), Err(EAGAIN_OR_EWOULDBLOCK)),
  196. Connected => (
  197. Inner::Established(Established { inner: self.inner }),
  198. Ok(()),
  199. ),
  200. Refused => (Inner::Init(Init::new_bound(self.inner)), Err(ECONNREFUSED)),
  201. }
  202. }
  203. pub unsafe fn into_established(self) -> Established {
  204. Established { inner: self.inner }
  205. }
  206. /// Returns `true` when `conn_result` becomes ready, which indicates that the caller should
  207. /// invoke the `into_result()` method as soon as possible.
  208. ///
  209. /// Since `into_result()` needs to be called only once, this method will return `true`
  210. /// _exactly_ once. The caller is responsible for not missing this event.
  211. #[must_use]
  212. pub(super) fn update_io_events(&self) -> bool {
  213. // if matches!(*self.result.read_irqsave(), ConnectResult::Connecting) {
  214. // return false;
  215. // }
  216. self.inner
  217. .with_mut(|socket: &mut smoltcp::socket::tcp::Socket| {
  218. let mut result = self.result.write_irqsave();
  219. if matches!(*result, ConnectResult::Refused | ConnectResult::Connected) {
  220. return false; // Already connected or refused
  221. }
  222. // Connected
  223. if socket.can_send() {
  224. log::debug!("can send");
  225. *result = ConnectResult::Connected;
  226. return true;
  227. }
  228. // Connecting
  229. if socket.is_open() {
  230. log::debug!("connecting");
  231. *result = ConnectResult::Connecting;
  232. return false;
  233. }
  234. // Refused
  235. *result = ConnectResult::Refused;
  236. return true;
  237. })
  238. }
  239. pub fn get_name(&self) -> smoltcp::wire::IpEndpoint {
  240. self.inner
  241. .with::<smoltcp::socket::tcp::Socket, _, _>(|socket| {
  242. socket
  243. .local_endpoint()
  244. .expect("A Connecting Tcp With No Local Endpoint")
  245. })
  246. }
  247. pub fn get_peer_name(&self) -> smoltcp::wire::IpEndpoint {
  248. self.inner
  249. .with::<smoltcp::socket::tcp::Socket, _, _>(|socket| {
  250. socket
  251. .remote_endpoint()
  252. .expect("A Connecting Tcp With No Remote Endpoint")
  253. })
  254. }
  255. }
  256. #[derive(Debug)]
  257. pub struct Listening {
  258. inners: Vec<socket::inet::BoundInner>,
  259. connect: AtomicUsize,
  260. listen_addr: smoltcp::wire::IpListenEndpoint,
  261. }
  262. impl Listening {
  263. pub fn accept(&mut self) -> Result<(Established, smoltcp::wire::IpEndpoint), SystemError> {
  264. let connected: &mut socket::inet::BoundInner = self
  265. .inners
  266. .get_mut(self.connect.load(core::sync::atomic::Ordering::Relaxed))
  267. .unwrap();
  268. if connected.with::<smoltcp::socket::tcp::Socket, _, _>(|socket| !socket.is_active()) {
  269. return Err(EAGAIN_OR_EWOULDBLOCK);
  270. }
  271. let remote_endpoint = connected.with::<smoltcp::socket::tcp::Socket, _, _>(|socket| {
  272. socket
  273. .remote_endpoint()
  274. .expect("A Connected Tcp With No Remote Endpoint")
  275. });
  276. // log::debug!("local at {:?}", local_endpoint);
  277. let mut new_listen = socket::inet::BoundInner::bind(
  278. new_listen_smoltcp_socket(self.listen_addr),
  279. self.listen_addr
  280. .addr
  281. .as_ref()
  282. .unwrap_or(&smoltcp::wire::IpAddress::from(
  283. smoltcp::wire::Ipv4Address::UNSPECIFIED,
  284. )),
  285. )?;
  286. // swap the connected socket with the new_listen socket
  287. // TODO is smoltcp socket swappable?
  288. core::mem::swap(&mut new_listen, connected);
  289. return Ok((Established { inner: new_listen }, remote_endpoint));
  290. }
  291. pub fn update_io_events(&self, pollee: &AtomicUsize) {
  292. let position = self.inners.iter().position(|inner| {
  293. inner.with::<smoltcp::socket::tcp::Socket, _, _>(|socket| socket.is_active())
  294. });
  295. if let Some(position) = position {
  296. self.connect
  297. .store(position, core::sync::atomic::Ordering::Relaxed);
  298. pollee.fetch_or(
  299. EPollEventType::EPOLLIN.bits() as usize,
  300. core::sync::atomic::Ordering::Relaxed,
  301. );
  302. } else {
  303. pollee.fetch_and(
  304. !EPollEventType::EPOLLIN.bits() as usize,
  305. core::sync::atomic::Ordering::Relaxed,
  306. );
  307. }
  308. }
  309. pub fn get_name(&self) -> smoltcp::wire::IpEndpoint {
  310. smoltcp::wire::IpEndpoint::new(
  311. self.listen_addr
  312. .addr
  313. .unwrap_or(smoltcp::wire::IpAddress::from(
  314. smoltcp::wire::Ipv4Address::UNSPECIFIED,
  315. )),
  316. self.listen_addr.port,
  317. )
  318. }
  319. pub fn close(&self) {
  320. log::debug!("Close Listening Socket");
  321. let port = self.get_name().port;
  322. for inner in self.inners.iter() {
  323. inner.with_mut::<smoltcp::socket::tcp::Socket, _, _>(|socket| socket.close());
  324. }
  325. self.inners[0]
  326. .iface()
  327. .port_manager()
  328. .unbind_port(Types::Tcp, port);
  329. }
  330. pub fn release(&self) {
  331. // log::debug!("Release Listening Socket");
  332. for inner in self.inners.iter() {
  333. inner.release();
  334. }
  335. }
  336. }
  337. #[derive(Debug)]
  338. pub struct Established {
  339. inner: socket::inet::BoundInner,
  340. }
  341. impl Established {
  342. pub fn with_mut<R, F: FnMut(&mut smoltcp::socket::tcp::Socket<'static>) -> R>(
  343. &self,
  344. f: F,
  345. ) -> R {
  346. self.inner.with_mut(f)
  347. }
  348. pub fn close(&self) {
  349. self.inner
  350. .with_mut::<smoltcp::socket::tcp::Socket, _, _>(|socket| socket.close());
  351. self.inner.iface().poll();
  352. }
  353. pub fn release(&self) {
  354. self.inner.release();
  355. }
  356. pub fn get_name(&self) -> smoltcp::wire::IpEndpoint {
  357. self.inner
  358. .with::<smoltcp::socket::tcp::Socket, _, _>(|socket| socket.local_endpoint())
  359. .unwrap()
  360. }
  361. pub fn get_peer_name(&self) -> smoltcp::wire::IpEndpoint {
  362. self.inner
  363. .with::<smoltcp::socket::tcp::Socket, _, _>(|socket| socket.remote_endpoint().unwrap())
  364. }
  365. pub fn recv_slice(&self, buf: &mut [u8]) -> Result<usize, SystemError> {
  366. self.inner
  367. .with_mut::<smoltcp::socket::tcp::Socket, _, _>(|socket| {
  368. use smoltcp::socket::tcp::RecvError::*;
  369. if socket.can_send() {
  370. match socket.recv_slice(buf) {
  371. Ok(size) => Ok(size),
  372. Err(InvalidState) => {
  373. log::error!("TcpSocket::try_recv: InvalidState");
  374. Err(ENOTCONN)
  375. }
  376. Err(Finished) => Ok(0),
  377. }
  378. } else {
  379. Err(ENOBUFS)
  380. }
  381. })
  382. }
  383. pub fn send_slice(&self, buf: &[u8]) -> Result<usize, SystemError> {
  384. self.inner
  385. .with_mut::<smoltcp::socket::tcp::Socket, _, _>(|socket| {
  386. if socket.can_send() {
  387. socket.send_slice(buf).map_err(|_| ECONNABORTED)
  388. } else {
  389. Err(ENOBUFS)
  390. }
  391. })
  392. }
  393. pub fn update_io_events(&self, pollee: &AtomicUsize) {
  394. self.inner
  395. .with_mut::<smoltcp::socket::tcp::Socket, _, _>(|socket| {
  396. if socket.can_send() {
  397. pollee.fetch_or(
  398. EPollEventType::EPOLLOUT.bits() as usize,
  399. core::sync::atomic::Ordering::Relaxed,
  400. );
  401. } else {
  402. pollee.fetch_and(
  403. !EPollEventType::EPOLLOUT.bits() as usize,
  404. core::sync::atomic::Ordering::Relaxed,
  405. );
  406. }
  407. if socket.can_recv() {
  408. pollee.fetch_or(
  409. EPollEventType::EPOLLIN.bits() as usize,
  410. core::sync::atomic::Ordering::Relaxed,
  411. );
  412. } else {
  413. pollee.fetch_and(
  414. !EPollEventType::EPOLLIN.bits() as usize,
  415. core::sync::atomic::Ordering::Relaxed,
  416. );
  417. }
  418. })
  419. }
  420. }
  421. #[derive(Debug)]
  422. pub enum Inner {
  423. Init(Init),
  424. Connecting(Connecting),
  425. Listening(Listening),
  426. Established(Established),
  427. }
  428. impl Inner {
  429. pub fn send_buffer_size(&self) -> usize {
  430. match self {
  431. Inner::Init(_) => DEFAULT_TX_BUF_SIZE,
  432. Inner::Connecting(conn) => conn.with_mut(|socket| socket.send_capacity()),
  433. // only the first socket in the list is used for sending
  434. Inner::Listening(listen) => listen.inners[0]
  435. .with_mut::<smoltcp::socket::tcp::Socket, _, _>(|socket| socket.send_capacity()),
  436. Inner::Established(est) => est.with_mut(|socket| socket.send_capacity()),
  437. }
  438. }
  439. pub fn recv_buffer_size(&self) -> usize {
  440. match self {
  441. Inner::Init(_) => DEFAULT_RX_BUF_SIZE,
  442. Inner::Connecting(conn) => conn.with_mut(|socket| socket.recv_capacity()),
  443. // only the first socket in the list is used for receiving
  444. Inner::Listening(listen) => listen.inners[0]
  445. .with_mut::<smoltcp::socket::tcp::Socket, _, _>(|socket| socket.recv_capacity()),
  446. Inner::Established(est) => est.with_mut(|socket| socket.recv_capacity()),
  447. }
  448. }
  449. pub fn iface(&self) -> Option<&alloc::sync::Arc<dyn crate::driver::net::Iface>> {
  450. match self {
  451. Inner::Init(_) => None,
  452. Inner::Connecting(conn) => Some(conn.inner.iface()),
  453. Inner::Listening(listen) => Some(listen.inners[0].iface()),
  454. Inner::Established(est) => Some(est.inner.iface()),
  455. }
  456. }
  457. }