inner.rs 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475
  1. use core::sync::atomic::AtomicUsize;
  2. use crate::libs::rwlock::RwLock;
  3. use crate::net::socket::EPollEventType;
  4. use crate::net::socket::{self, inet::Types};
  5. use alloc::boxed::Box;
  6. use alloc::vec::Vec;
  7. use smoltcp;
  8. use system_error::SystemError::{self, *};
  9. use super::inet::UNSPECIFIED_LOCAL_ENDPOINT_V4;
  10. // pub const DEFAULT_METADATA_BUF_SIZE: usize = 1024;
  11. pub const DEFAULT_RX_BUF_SIZE: usize = 512 * 1024;
  12. pub const DEFAULT_TX_BUF_SIZE: usize = 512 * 1024;
  13. fn new_smoltcp_socket() -> smoltcp::socket::tcp::Socket<'static> {
  14. let rx_buffer = smoltcp::socket::tcp::SocketBuffer::new(vec![0; DEFAULT_RX_BUF_SIZE]);
  15. let tx_buffer = smoltcp::socket::tcp::SocketBuffer::new(vec![0; DEFAULT_TX_BUF_SIZE]);
  16. smoltcp::socket::tcp::Socket::new(rx_buffer, tx_buffer)
  17. }
  18. fn new_listen_smoltcp_socket<T>(local_endpoint: T) -> smoltcp::socket::tcp::Socket<'static>
  19. where
  20. T: Into<smoltcp::wire::IpListenEndpoint>,
  21. {
  22. let mut socket = new_smoltcp_socket();
  23. socket.listen(local_endpoint).unwrap();
  24. socket
  25. }
  26. #[derive(Debug)]
  27. pub enum Init {
  28. Unbound((Box<smoltcp::socket::tcp::Socket<'static>>, smoltcp::wire::IpVersion)),
  29. Bound((socket::inet::BoundInner, smoltcp::wire::IpEndpoint)),
  30. }
  31. impl Init {
  32. pub(super) fn new(ver: smoltcp::wire::IpVersion) -> Self {
  33. Init::Unbound((Box::new(new_smoltcp_socket()), ver))
  34. }
  35. /// 传入一个已经绑定的socket
  36. pub(super) fn new_bound(inner: socket::inet::BoundInner) -> Self {
  37. let endpoint = inner.with::<smoltcp::socket::tcp::Socket, _, _>(|socket| {
  38. socket
  39. .local_endpoint()
  40. .expect("A Bound Socket Must Have A Local Endpoint")
  41. });
  42. Init::Bound((inner, endpoint))
  43. }
  44. pub(super) fn bind(
  45. self,
  46. local_endpoint: smoltcp::wire::IpEndpoint,
  47. ) -> Result<Self, SystemError> {
  48. match self {
  49. Init::Unbound((socket, _)) => {
  50. let bound = socket::inet::BoundInner::bind(*socket, &local_endpoint.addr)?;
  51. bound
  52. .port_manager()
  53. .bind_port(Types::Tcp, local_endpoint.port)?;
  54. // bound.iface().common().bind_socket()
  55. Ok(Init::Bound((bound, local_endpoint)))
  56. }
  57. Init::Bound(_) => {
  58. log::debug!("Already Bound");
  59. Err(EINVAL)
  60. }
  61. }
  62. }
  63. pub(super) fn bind_to_ephemeral(
  64. self,
  65. remote_endpoint: smoltcp::wire::IpEndpoint,
  66. ) -> Result<(socket::inet::BoundInner, smoltcp::wire::IpEndpoint), (Self, SystemError)> {
  67. match self {
  68. Init::Unbound((socket, ver)) => {
  69. let (bound, address) =
  70. socket::inet::BoundInner::bind_ephemeral(*socket, remote_endpoint.addr)
  71. .map_err(|err| (Self::new(ver), err))?;
  72. let bound_port = bound
  73. .port_manager()
  74. .bind_ephemeral_port(Types::Tcp)
  75. .map_err(|err| (Self::new(ver), err))?;
  76. let endpoint = smoltcp::wire::IpEndpoint::new(address, bound_port);
  77. Ok((bound, endpoint))
  78. }
  79. Init::Bound(_) => Err((self, EINVAL)),
  80. }
  81. }
  82. pub(super) fn connect(
  83. self,
  84. remote_endpoint: smoltcp::wire::IpEndpoint,
  85. ) -> Result<Connecting, (Self, SystemError)> {
  86. let (inner, local) = match self {
  87. Init::Unbound(_) => self.bind_to_ephemeral(remote_endpoint)?,
  88. Init::Bound(inner) => inner,
  89. };
  90. if local.addr.is_unspecified() {
  91. return Err((Init::Bound((inner, local)), EINVAL));
  92. }
  93. let result = inner.with_mut::<smoltcp::socket::tcp::Socket, _, _>(|socket| {
  94. socket
  95. .connect(
  96. inner.iface().smol_iface().lock().context(),
  97. remote_endpoint,
  98. local,
  99. )
  100. .map_err(|_| ECONNREFUSED)
  101. });
  102. match result {
  103. Ok(_) => Ok(Connecting::new(inner)),
  104. Err(err) => Err((Init::Bound((inner, local)), err)),
  105. }
  106. }
  107. /// # `listen`
  108. pub(super) fn listen(self, backlog: usize) -> Result<Listening, (Self, SystemError)> {
  109. let (inner, local) = match self {
  110. Init::Unbound(_) => {
  111. return Err((self, EINVAL));
  112. }
  113. Init::Bound(inner) => inner,
  114. };
  115. let listen_addr = if local.addr.is_unspecified() {
  116. smoltcp::wire::IpListenEndpoint::from(local.port)
  117. } else {
  118. smoltcp::wire::IpListenEndpoint::from(local)
  119. };
  120. log::debug!("listen at {:?}", listen_addr);
  121. let mut inners = Vec::new();
  122. if let Err(err) = || -> Result<(), SystemError> {
  123. for _ in 0..(backlog - 1) {
  124. // -1 because the first one is already bound
  125. let new_listen = socket::inet::BoundInner::bind(
  126. new_listen_smoltcp_socket(listen_addr),
  127. listen_addr.addr.as_ref().unwrap_or(
  128. &smoltcp::wire::IpAddress::from(smoltcp::wire::Ipv4Address::UNSPECIFIED)
  129. ),
  130. )?;
  131. inners.push(new_listen);
  132. }
  133. Ok(())
  134. }() {
  135. return Err((Init::Bound((inner, local)), err));
  136. }
  137. if let Err(err) = inner.with_mut::<smoltcp::socket::tcp::Socket, _, _>(|socket| {
  138. socket.listen(listen_addr).map_err(|_| ECONNREFUSED)
  139. }) {
  140. return Err((Init::Bound((inner, local)), err));
  141. }
  142. inners.push(inner);
  143. return Ok(Listening {
  144. inners,
  145. connect: AtomicUsize::new(0),
  146. listen_addr,
  147. });
  148. }
  149. pub(super) fn close(&self) {
  150. match self {
  151. Init::Unbound(_) => {}
  152. Init::Bound((inner, endpoint)) => {
  153. inner.port_manager().unbind_port(Types::Tcp, endpoint.port);
  154. inner.with_mut::<smoltcp::socket::tcp::Socket, _, _>(|socket| socket.close());
  155. }
  156. }
  157. }
  158. }
  159. #[derive(Debug, Default, Clone, Copy)]
  160. enum ConnectResult {
  161. Connected,
  162. #[default]
  163. Connecting,
  164. Refused,
  165. }
  166. #[derive(Debug)]
  167. pub struct Connecting {
  168. inner: socket::inet::BoundInner,
  169. result: RwLock<ConnectResult>,
  170. }
  171. impl Connecting {
  172. fn new(inner: socket::inet::BoundInner) -> Self {
  173. Connecting {
  174. inner,
  175. result: RwLock::new(ConnectResult::Connecting),
  176. }
  177. }
  178. pub fn with_mut<R, F: FnMut(&mut smoltcp::socket::tcp::Socket<'static>) -> R>(
  179. &self,
  180. f: F,
  181. ) -> R {
  182. self.inner.with_mut(f)
  183. }
  184. pub fn into_result(self) -> (Inner, Option<SystemError>) {
  185. use ConnectResult::*;
  186. let result = *self.result.read_irqsave();
  187. match result {
  188. Connecting => (Inner::Connecting(self), Some(EAGAIN_OR_EWOULDBLOCK)),
  189. Connected => (Inner::Established(Established { inner: self.inner }), None),
  190. Refused => (Inner::Init(Init::new_bound(self.inner)), Some(ECONNREFUSED)),
  191. }
  192. }
  193. /// Returns `true` when `conn_result` becomes ready, which indicates that the caller should
  194. /// invoke the `into_result()` method as soon as possible.
  195. ///
  196. /// Since `into_result()` needs to be called only once, this method will return `true`
  197. /// _exactly_ once. The caller is responsible for not missing this event.
  198. #[must_use]
  199. pub(super) fn update_io_events(&self) -> bool {
  200. if matches!(*self.result.read_irqsave(), ConnectResult::Connecting) {
  201. return false;
  202. }
  203. self.inner
  204. .with_mut(|socket: &mut smoltcp::socket::tcp::Socket| {
  205. let mut result = self.result.write_irqsave();
  206. if matches!(*result, ConnectResult::Refused | ConnectResult::Connected) {
  207. return false; // Already connected or refused
  208. }
  209. // Connected
  210. if socket.can_send() {
  211. *result = ConnectResult::Connected;
  212. return true;
  213. }
  214. // Connecting
  215. if socket.is_open() {
  216. return false;
  217. }
  218. // Refused
  219. *result = ConnectResult::Refused;
  220. return true;
  221. })
  222. }
  223. pub fn get_name(&self) -> smoltcp::wire::IpEndpoint {
  224. self.inner
  225. .with::<smoltcp::socket::tcp::Socket, _, _>(|socket| {
  226. socket
  227. .local_endpoint()
  228. .expect("A Connecting Tcp With No Local Endpoint")
  229. })
  230. }
  231. }
  232. #[derive(Debug)]
  233. pub struct Listening {
  234. inners: Vec<socket::inet::BoundInner>,
  235. connect: AtomicUsize,
  236. listen_addr: smoltcp::wire::IpListenEndpoint,
  237. }
  238. impl Listening {
  239. pub fn accept(&mut self) -> Result<(Established, smoltcp::wire::IpEndpoint), SystemError> {
  240. let connected: &mut socket::inet::BoundInner = self
  241. .inners
  242. .get_mut(self.connect.load(core::sync::atomic::Ordering::Relaxed))
  243. .unwrap();
  244. if connected.with::<smoltcp::socket::tcp::Socket, _, _>(|socket| !socket.is_active()) {
  245. return Err(EAGAIN_OR_EWOULDBLOCK);
  246. }
  247. let remote_endpoint = connected
  248. .with::<smoltcp::socket::tcp::Socket, _, _>(|socket| {
  249. socket
  250. .remote_endpoint()
  251. .expect("A Connected Tcp With No Remote Endpoint")
  252. });
  253. // log::debug!("local at {:?}", local_endpoint);
  254. let mut new_listen = socket::inet::BoundInner::bind(
  255. new_listen_smoltcp_socket(self.listen_addr),
  256. self.listen_addr.addr.as_ref().unwrap_or(
  257. &smoltcp::wire::IpAddress::from(smoltcp::wire::Ipv4Address::UNSPECIFIED),
  258. ),
  259. )?;
  260. // swap the connected socket with the new_listen socket
  261. // TODO is smoltcp socket swappable?
  262. core::mem::swap(&mut new_listen, connected);
  263. return Ok((Established { inner: new_listen }, remote_endpoint));
  264. }
  265. pub fn update_io_events(&self, pollee: &AtomicUsize) {
  266. let position = self.inners.iter().position(|inner| {
  267. inner.with::<smoltcp::socket::tcp::Socket, _, _>(|socket| socket.is_active())
  268. });
  269. if let Some(position) = position {
  270. // log::debug!("Can accept!");
  271. self.connect
  272. .store(position, core::sync::atomic::Ordering::Relaxed);
  273. pollee.fetch_or(
  274. EPollEventType::EPOLLIN.bits() as usize,
  275. core::sync::atomic::Ordering::Relaxed,
  276. );
  277. } else {
  278. // log::debug!("Can't accept!");
  279. pollee.fetch_and(
  280. !EPollEventType::EPOLLIN.bits() as usize,
  281. core::sync::atomic::Ordering::Relaxed,
  282. );
  283. }
  284. }
  285. pub fn get_name(&self) -> smoltcp::wire::IpEndpoint {
  286. smoltcp::wire::IpEndpoint::new(self.listen_addr.addr.unwrap_or(
  287. smoltcp::wire::IpAddress::from(smoltcp::wire::Ipv4Address::UNSPECIFIED),
  288. ), self.listen_addr.port)
  289. }
  290. pub fn close(&self) {
  291. log::debug!("Close Listening Socket");
  292. let port = self.get_name().port;
  293. for inner in self.inners.iter() {
  294. inner.with_mut::<smoltcp::socket::tcp::Socket, _, _>(|socket| socket.close());
  295. }
  296. self.inners[0]
  297. .iface()
  298. .port_manager()
  299. .unbind_port(Types::Tcp, port);
  300. }
  301. }
  302. #[derive(Debug)]
  303. pub struct Established {
  304. inner: socket::inet::BoundInner,
  305. }
  306. impl Established {
  307. pub fn with_mut<R, F: FnMut(&mut smoltcp::socket::tcp::Socket<'static>) -> R>(
  308. &self,
  309. f: F,
  310. ) -> R {
  311. self.inner.with_mut(f)
  312. }
  313. pub fn with<R, F: Fn(&smoltcp::socket::tcp::Socket<'static>) -> R>(&self, f: F) -> R {
  314. self.inner.with(f)
  315. }
  316. pub fn close(&self) {
  317. self.inner
  318. .with_mut::<smoltcp::socket::tcp::Socket, _, _>(|socket| socket.close());
  319. self.inner.iface().poll();
  320. }
  321. pub fn release(&self) {
  322. self.inner.release();
  323. }
  324. pub fn local_endpoint(&self) -> smoltcp::wire::IpEndpoint {
  325. self.inner
  326. .with::<smoltcp::socket::tcp::Socket, _, _>(|socket| socket.local_endpoint())
  327. .unwrap()
  328. }
  329. pub fn remote_endpoint(&self) -> smoltcp::wire::IpEndpoint {
  330. self.inner
  331. .with::<smoltcp::socket::tcp::Socket, _, _>(|socket| socket.remote_endpoint().unwrap())
  332. }
  333. pub fn recv_slice(&self, buf: &mut [u8]) -> Result<usize, SystemError> {
  334. self.inner
  335. .with_mut::<smoltcp::socket::tcp::Socket, _, _>(|socket| {
  336. use smoltcp::socket::tcp::RecvError::*;
  337. if socket.can_send() {
  338. match socket.recv_slice(buf) {
  339. Ok(size) => Ok(size),
  340. Err(InvalidState) => {
  341. log::error!("TcpSocket::try_recv: InvalidState");
  342. Err(ENOTCONN)
  343. }
  344. Err(Finished) => Ok(0),
  345. }
  346. } else {
  347. Err(ENOBUFS)
  348. }
  349. })
  350. }
  351. pub fn send_slice(&self, buf: &[u8]) -> Result<usize, SystemError> {
  352. self.inner
  353. .with_mut::<smoltcp::socket::tcp::Socket, _, _>(|socket| {
  354. if socket.can_send() {
  355. socket.send_slice(buf).map_err(|_| ECONNABORTED)
  356. } else {
  357. Err(ENOBUFS)
  358. }
  359. })
  360. }
  361. pub fn update_io_events(&self, pollee: &AtomicUsize) {
  362. self.inner
  363. .with_mut::<smoltcp::socket::tcp::Socket, _, _>(|socket| {
  364. if socket.can_send() {
  365. pollee.fetch_or(
  366. EPollEventType::EPOLLOUT.bits() as usize,
  367. core::sync::atomic::Ordering::Relaxed,
  368. );
  369. } else {
  370. pollee.fetch_and(
  371. !EPollEventType::EPOLLOUT.bits() as usize,
  372. core::sync::atomic::Ordering::Relaxed,
  373. );
  374. }
  375. if socket.can_recv() {
  376. pollee.fetch_or(
  377. EPollEventType::EPOLLIN.bits() as usize,
  378. core::sync::atomic::Ordering::Relaxed,
  379. );
  380. } else {
  381. pollee.fetch_and(
  382. !EPollEventType::EPOLLIN.bits() as usize,
  383. core::sync::atomic::Ordering::Relaxed,
  384. );
  385. }
  386. })
  387. }
  388. }
  389. #[derive(Debug)]
  390. pub enum Inner {
  391. Init(Init),
  392. Connecting(Connecting),
  393. Listening(Listening),
  394. Established(Established),
  395. }
  396. impl Inner {
  397. pub fn send_buffer_size(&self) -> usize {
  398. match self {
  399. Inner::Init(_) => DEFAULT_TX_BUF_SIZE,
  400. Inner::Connecting(conn) => conn.with_mut(|socket| socket.send_capacity()),
  401. // only the first socket in the list is used for sending
  402. Inner::Listening(listen) => listen.inners[0]
  403. .with_mut::<smoltcp::socket::tcp::Socket, _, _>(|socket| socket.send_capacity()),
  404. Inner::Established(est) => est.with_mut(|socket| socket.send_capacity()),
  405. }
  406. }
  407. pub fn recv_buffer_size(&self) -> usize {
  408. match self {
  409. Inner::Init(_) => DEFAULT_RX_BUF_SIZE,
  410. Inner::Connecting(conn) => conn.with_mut(|socket| socket.recv_capacity()),
  411. // only the first socket in the list is used for receiving
  412. Inner::Listening(listen) => listen.inners[0]
  413. .with_mut::<smoltcp::socket::tcp::Socket, _, _>(|socket| socket.recv_capacity()),
  414. Inner::Established(est) => est.with_mut(|socket| socket.recv_capacity()),
  415. }
  416. }
  417. pub fn iface(&self) -> Option<&alloc::sync::Arc<dyn crate::driver::net::Iface>> {
  418. match self {
  419. Inner::Init(_) => None,
  420. Inner::Connecting(conn) => Some(conn.inner.iface()),
  421. Inner::Listening(listen) => Some(listen.inners[0].iface()),
  422. Inner::Established(est) => Some(est.inner.iface()),
  423. }
  424. }
  425. }