fragmentation.rs 20 KB


  1. #![allow(unused)]
  2. use core::fmt;
  3. use managed::{ManagedMap, ManagedSlice};
  4. use crate::storage::Assembler;
  5. use crate::time::{Duration, Instant};
  6. use crate::Error;
  7. use crate::Result;
  8. /// Holds different fragments of one packet, used for assembling fragmented packets.
  9. #[derive(Debug)]
  10. pub struct PacketAssembler<'a> {
  11. buffer: ManagedSlice<'a, u8>,
  12. assembler: AssemblerState,
  13. }
  14. /// Holds the state of the assembling of one packet.
  15. #[cfg_attr(feature = "defmt", derive(defmt::Format))]
  16. #[derive(Debug, PartialEq)]
  17. enum AssemblerState {
  18. NotInit,
  19. Assembling {
  20. assembler: Assembler,
  21. total_size: Option<usize>,
  22. expires_at: Instant,
  23. offset_correction: isize,
  24. },
  25. }
  26. impl fmt::Display for AssemblerState {
  27. fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
  28. match self {
  29. AssemblerState::NotInit => write!(f, "Not init")?,
  30. AssemblerState::Assembling {
  31. assembler,
  32. total_size,
  33. expires_at,
  34. offset_correction,
  35. } => {
  36. write!(f, "{} expires at {}", assembler, expires_at)?;
  37. }
  38. }
  39. Ok(())
  40. }
  41. }
  42. impl<'a> PacketAssembler<'a> {
  43. /// Create a new empty buffer for fragments.
  44. pub fn new<S>(storage: S) -> Self
  45. where
  46. S: Into<ManagedSlice<'a, u8>>,
  47. {
  48. let s = storage.into();
  49. PacketAssembler {
  50. buffer: s,
  51. assembler: AssemblerState::NotInit,
  52. }
  53. }
  54. /// Start with saving fragments.
  55. /// We initialize the assembler with the total size of the final packet.
  56. ///
  57. /// # Errors
  58. ///
  59. /// - Returns [`Error::PacketAssemblerBufferTooSmall`] when the buffer is too small for holding all the
  60. /// fragments of a packet.
  61. pub(crate) fn start(
  62. &mut self,
  63. total_size: Option<usize>,
  64. expires_at: Instant,
  65. offset_correction: isize,
  66. ) -> Result<()> {
  67. match &mut self.buffer {
  68. ManagedSlice::Borrowed(b) => {
  69. if let Some(total_size) = total_size {
  70. if b.len() < total_size {
  71. return Err(Error::PacketAssemblerBufferTooSmall);
  72. }
  73. }
  74. }
  75. #[cfg(any(feature = "std", feature = "alloc"))]
  76. ManagedSlice::Owned(b) => {
  77. if let Some(total_size) = total_size {
  78. b.resize(total_size, 0);
  79. }
  80. }
  81. }
  82. self.assembler = AssemblerState::Assembling {
  83. assembler: Assembler::new(if let Some(total_size) = total_size {
  84. total_size
  85. } else {
  86. usize::MAX
  87. }),
  88. total_size,
  89. expires_at,
  90. offset_correction,
  91. };
  92. Ok(())
  93. }
  94. /// Set the total size of the packet assembler.
  95. ///
  96. /// # Errors
  97. ///
  98. /// - Returns [`Error::PacketAssemblerNotInit`] when the assembler was not initialized (try initializing the
  99. /// assembler with [Self::start]).
  100. pub(crate) fn set_total_size(&mut self, size: usize) -> Result<()> {
  101. match self.assembler {
  102. AssemblerState::NotInit => Err(Error::PacketAssemblerNotInit),
  103. AssemblerState::Assembling {
  104. ref mut total_size, ..
  105. } => {
  106. *total_size = Some(size);
  107. Ok(())
  108. }
  109. }
  110. }
  111. /// Return the instant when the assembler expires.
  112. ///
  113. /// # Errors
  114. ///
  115. /// - Returns [`Error::PacketAssemblerNotInit`] when the assembler was not initialized (try initializing the
  116. /// assembler with [Self::start]).
  117. pub(crate) fn expires_at(&self) -> Result<Instant> {
  118. match self.assembler {
  119. AssemblerState::NotInit => Err(Error::PacketAssemblerNotInit),
  120. AssemblerState::Assembling { expires_at, .. } => Ok(expires_at),
  121. }
  122. }
  123. /// Add a fragment into the packet that is being reassembled.
  124. ///
  125. /// # Errors
  126. ///
  127. /// - Returns [`Error::PacketAssemblerNotInit`] when the assembler was not initialized (try initializing the
  128. /// assembler with [Self::start]).
  129. /// - Returns [`Error::PacketAssemblerBufferTooSmall`] when trying to add data into the buffer at a non-existing
  130. /// place.
  131. /// - Returns [`Error::PacketAssemblerOverlap`] when there was an overlap when adding data.
  132. pub(crate) fn add(&mut self, data: &[u8], offset: usize) -> Result<bool> {
  133. match self.assembler {
  134. AssemblerState::NotInit => Err(Error::PacketAssemblerNotInit),
  135. AssemblerState::Assembling {
  136. ref mut assembler,
  137. total_size,
  138. offset_correction,
  139. ..
  140. } => {
  141. let offset = offset as isize + offset_correction;
  142. let offset = if offset <= 0 { 0 } else { offset as usize };
  143. match &mut self.buffer {
  144. ManagedSlice::Borrowed(b) => {
  145. if offset + data.len() > b.len() {
  146. return Err(Error::PacketAssemblerBufferTooSmall);
  147. }
  148. }
  149. #[cfg(any(feature = "std", feature = "alloc"))]
  150. ManagedSlice::Owned(b) => {
  151. if offset + data.len() > b.len() {
  152. b.resize(offset + data.len(), 0);
  153. }
  154. }
  155. }
  156. let len = data.len();
  157. self.buffer[offset..][..len].copy_from_slice(data);
  158. net_debug!(
  159. "frag assembler: receiving {} octests at offset {}",
  160. len,
  161. offset
  162. );
  163. match assembler.add(offset, data.len()) {
  164. Ok(overlap) => {
  165. net_debug!("assembler: {}", self.assembler);
  166. if overlap {
  167. net_debug!("packet was added, but there was an overlap.");
  168. }
  169. self.is_complete()
  170. }
  171. // NOTE(thvdveld): hopefully we wont get too many holes errors I guess?
  172. Err(_) => Err(Error::PacketAssemblerTooManyHoles),
  173. }
  174. }
  175. }
  176. }
  177. /// Get an immutable slice of the underlying packet data.
  178. /// This will mark the assembler state as [`AssemblerState::NotInit`] such that it can be reused.
  179. ///
  180. /// # Errors
  181. ///
  182. /// - Returns [`Error::PacketAssemblerNotInit`] when the assembler was not initialized (try initializing the
  183. /// assembler with [`Self::start`]).
  184. /// - Returns [`Error::PacketAssemblerIncomplete`] when not all the fragments have been collected.
  185. pub(crate) fn assemble(&mut self) -> Result<&'_ [u8]> {
  186. let b = match self.assembler {
  187. AssemblerState::NotInit => return Err(Error::PacketAssemblerNotInit),
  188. AssemblerState::Assembling { total_size, .. } => {
  189. if self.is_complete()? {
  190. // NOTE: we can unwrap because `is_complete` already checks this.
  191. let total_size = total_size.unwrap();
  192. let a = &self.buffer[..total_size];
  193. self.assembler = AssemblerState::NotInit;
  194. a
  195. } else {
  196. return Err(Error::PacketAssemblerIncomplete);
  197. }
  198. }
  199. };
  200. Ok(b)
  201. }
  202. /// Returns `true` when all fragments have been received, otherwise `false`.
  203. ///
  204. /// # Errors
  205. ///
  206. /// - Returns [`Error::PacketAssemblerNotInit`] when the assembler was not initialized (try initializing the
  207. /// assembler with [`Self::start`]).
  208. pub(crate) fn is_complete(&self) -> Result<bool> {
  209. match &self.assembler {
  210. AssemblerState::NotInit => Err(Error::PacketAssemblerNotInit),
  211. AssemblerState::Assembling {
  212. assembler,
  213. total_size,
  214. ..
  215. } => match (total_size, assembler.peek_front()) {
  216. (Some(total_size), Some(front)) => Ok(front == *total_size),
  217. _ => Ok(false),
  218. },
  219. }
  220. }
  221. /// Returns `true` when the packet assembler is free to use.
  222. fn is_free(&self) -> bool {
  223. self.assembler == AssemblerState::NotInit
  224. }
  225. /// Mark this assembler as [`AssemblerState::NotInit`].
  226. /// This is then cleaned up by the [`PacketAssemblerSet`].
  227. pub fn mark_discarded(&mut self) {
  228. self.assembler = AssemblerState::NotInit;
  229. }
  230. /// Returns `true` when the [`AssemblerState`] is discarded.
  231. pub fn is_discarded(&self) -> bool {
  232. matches!(self.assembler, AssemblerState::NotInit)
  233. }
  234. }
  235. /// Set holding multiple [`PacketAssembler`].
  236. #[derive(Debug)]
  237. pub struct PacketAssemblerSet<'a, Key: Eq + Ord + Clone + Copy> {
  238. packet_buffer: ManagedSlice<'a, PacketAssembler<'a>>,
  239. index_buffer: ManagedMap<'a, Key, usize>,
  240. }
  241. impl<'a, K: Eq + Ord + Clone + Copy> PacketAssemblerSet<'a, K> {
  242. /// Create a new set of packet assemblers.
  243. ///
  244. /// # Panics
  245. ///
  246. /// This will panic when:
  247. /// - The packet buffer and index buffer don't have the same size or are empty (when they are
  248. /// both borrowed).
  249. /// - The packet buffer is empty (when only the packet buffer is borrowed).
  250. /// - The index buffer is empty (when only the index buffer is borrowed).
  251. pub fn new<FB, IB>(packet_buffer: FB, index_buffer: IB) -> Self
  252. where
  253. FB: Into<ManagedSlice<'a, PacketAssembler<'a>>>,
  254. IB: Into<ManagedMap<'a, K, usize>>,
  255. {
  256. let packet_buffer = packet_buffer.into();
  257. let index_buffer = index_buffer.into();
  258. match (&packet_buffer, &index_buffer) {
  259. (ManagedSlice::Borrowed(f), ManagedMap::Borrowed(i)) => {
  260. if f.len() != i.len() {
  261. panic!("The amount of places in the index buffer must be the same as the amount of possible fragments assemblers.");
  262. }
  263. }
  264. #[cfg(any(feature = "std", feature = "alloc"))]
  265. (ManagedSlice::Borrowed(f), ManagedMap::Owned(_)) => {
  266. if f.is_empty() {
  267. panic!("The packet buffer cannot be empty.");
  268. }
  269. }
  270. #[cfg(any(feature = "std", feature = "alloc"))]
  271. (ManagedSlice::Owned(_), ManagedMap::Borrowed(i)) => {
  272. if i.is_empty() {
  273. panic!("The index buffer cannot be empty.");
  274. }
  275. }
  276. #[cfg(any(feature = "std", feature = "alloc"))]
  277. (ManagedSlice::Owned(_), ManagedMap::Owned(_)) => (),
  278. }
  279. Self {
  280. packet_buffer,
  281. index_buffer,
  282. }
  283. }
  284. /// Reserve a [`PacketAssembler`], which is linked to a specific key.
  285. /// Returns the reserved fragments assembler.
  286. ///
  287. /// # Errors
  288. ///
  289. /// - Returns [`Error::PacketAssemblerSetFull`] when every [`PacketAssembler`] in the buffer is used (only
  290. /// when the non allocating version of is used).
  291. pub(crate) fn reserve_with_key(&mut self, key: &K) -> Result<&mut PacketAssembler<'a>> {
  292. // Check how many WIP reassemblies we have.
  293. // The limit is currently set to 255.
  294. if self.index_buffer.len() == u8::MAX as usize {
  295. return Err(Error::PacketAssemblerSetFull);
  296. }
  297. if self.packet_buffer.len() == self.index_buffer.len() {
  298. match &mut self.packet_buffer {
  299. ManagedSlice::Borrowed(_) => return Err(Error::PacketAssemblerSetFull),
  300. #[cfg(any(feature = "std", feature = "alloc"))]
  301. ManagedSlice::Owned(b) => (),
  302. }
  303. }
  304. let i = self
  305. .get_free_packet_assembler()
  306. .ok_or(Error::PacketAssemblerSetFull)?;
  307. // NOTE(thvdveld): this should not fail because we already checked the available space.
  308. match self.index_buffer.insert(*key, i) {
  309. Ok(_) => Ok(&mut self.packet_buffer[i]),
  310. Err(_) => unreachable!(),
  311. }
  312. }
  313. /// Return the first free packet assembler available from the cache.
  314. fn get_free_packet_assembler(&mut self) -> Option<usize> {
  315. match &mut self.packet_buffer {
  316. ManagedSlice::Borrowed(_) => (),
  317. #[cfg(any(feature = "std", feature = "alloc"))]
  318. ManagedSlice::Owned(b) => b.push(PacketAssembler::new(alloc::vec![])),
  319. }
  320. self.packet_buffer
  321. .iter()
  322. .enumerate()
  323. .find(|(_, b)| b.is_free())
  324. .map(|(i, _)| i)
  325. }
  326. /// Return a mutable slice to a packet assembler.
  327. ///
  328. /// # Errors
  329. ///
  330. /// - Returns [`Error::PacketAssemblerSetKeyNotFound`] when the key was not found in the set.
  331. pub(crate) fn get_packet_assembler_mut(&mut self, key: &K) -> Result<&mut PacketAssembler<'a>> {
  332. if let Some(i) = self.index_buffer.get(key) {
  333. Ok(&mut self.packet_buffer[*i as usize])
  334. } else {
  335. Err(Error::PacketAssemblerSetKeyNotFound)
  336. }
  337. }
  338. /// Return the assembled packet from a packet assembler.
  339. /// This also removes it from the set.
  340. ///
  341. /// # Errors
  342. ///
  343. /// - Returns [`Error::PacketAssemblerSetKeyNotFound`] when the `key` was not found.
  344. /// - Returns [`Error::PacketAssemblerIncomplete`] when the fragments assembler was empty or not fully assembled.
  345. pub(crate) fn get_assembled_packet(&mut self, key: &K) -> Result<&[u8]> {
  346. if let Some(i) = self.index_buffer.get(key) {
  347. let p = self.packet_buffer[*i as usize].assemble()?;
  348. self.index_buffer.remove(key);
  349. Ok(p)
  350. } else {
  351. Err(Error::PacketAssemblerSetKeyNotFound)
  352. }
  353. }
  354. /// Remove all [`PacketAssembler`]s that are marked as discarded.
  355. pub fn remove_discarded(&mut self) {
  356. loop {
  357. let mut key = None;
  358. for (k, i) in self.index_buffer.iter() {
  359. if matches!(
  360. self.packet_buffer[*i as usize].assembler,
  361. AssemblerState::NotInit
  362. ) {
  363. key = Some(*k);
  364. break;
  365. }
  366. }
  367. if let Some(k) = key {
  368. self.index_buffer.remove(&k);
  369. } else {
  370. break;
  371. }
  372. }
  373. }
  374. /// Mark all [`PacketAssembler`]s as discarded for which `f` returns `Ok(true)`.
  375. /// This does not remove them from the buffer.
  376. pub fn mark_discarded_when<F>(&mut self, f: F) -> Result<()>
  377. where
  378. F: Fn(&mut PacketAssembler<'_>) -> Result<bool>,
  379. {
  380. for (_, i) in &mut self.index_buffer.iter() {
  381. let frag = &mut self.packet_buffer[*i as usize];
  382. if f(frag)? {
  383. frag.mark_discarded();
  384. }
  385. }
  386. Ok(())
  387. }
  388. /// Remove all [`PacketAssembler`]s for which `f` returns `Ok(true)`.
  389. pub fn remove_when<F>(&mut self, f: F) -> Result<()>
  390. where
  391. F: Fn(&mut PacketAssembler<'_>) -> Result<bool>,
  392. {
  393. self.mark_discarded_when(f)?;
  394. self.remove_discarded();
  395. Ok(())
  396. }
  397. }
  398. #[cfg(test)]
  399. mod tests {
  400. use super::*;
  401. #[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Copy)]
  402. struct Key {
  403. id: usize,
  404. }
  405. #[test]
  406. fn packet_assembler_not_init() {
  407. let mut p_assembler = PacketAssembler::new(vec![]);
  408. let data = b"Hello World!";
  409. assert_eq!(
  410. p_assembler.add(&data[..], data.len()),
  411. Err(Error::PacketAssemblerNotInit)
  412. );
  413. assert_eq!(
  414. p_assembler.is_complete(),
  415. Err(Error::PacketAssemblerNotInit)
  416. );
  417. assert_eq!(p_assembler.assemble(), Err(Error::PacketAssemblerNotInit));
  418. }
  419. #[test]
  420. fn packet_assembler_buffer_too_small() {
  421. let mut storage = [0u8; 1];
  422. let mut p_assembler = PacketAssembler::new(&mut storage[..]);
  423. assert_eq!(
  424. p_assembler.start(Some(2), Instant::from_secs(0), 0),
  425. Err(Error::PacketAssemblerBufferTooSmall)
  426. );
  427. assert_eq!(p_assembler.start(Some(1), Instant::from_secs(0), 0), Ok(()));
  428. let data = b"Hello World!";
  429. assert_eq!(
  430. p_assembler.add(&data[..], data.len()),
  431. Err(Error::PacketAssemblerBufferTooSmall)
  432. );
  433. }
  434. #[test]
  435. fn packet_assembler_overlap() {
  436. let mut storage = [0u8; 5];
  437. let mut p_assembler = PacketAssembler::new(&mut storage[..]);
  438. p_assembler
  439. .start(Some(5), Instant::from_secs(0), 0)
  440. .unwrap();
  441. let data = b"Rust";
  442. p_assembler.add(&data[..], 0).unwrap();
  443. assert_eq!(p_assembler.add(&data[..], 1), Ok(true));
  444. }
  445. #[test]
  446. fn packet_assembler_assemble() {
  447. let mut storage = [0u8; 12];
  448. let mut p_assembler = PacketAssembler::new(&mut storage[..]);
  449. let data = b"Hello World!";
  450. p_assembler
  451. .start(Some(data.len()), Instant::from_secs(0), 0)
  452. .unwrap();
  453. p_assembler.add(b"Hello ", 0).unwrap();
  454. assert_eq!(
  455. p_assembler.assemble(),
  456. Err(Error::PacketAssemblerIncomplete)
  457. );
  458. p_assembler.add(b"World!", b"Hello ".len()).unwrap();
  459. assert_eq!(p_assembler.assemble(), Ok(&b"Hello World!"[..]));
  460. }
  461. #[test]
  462. fn packet_assembler_out_of_order_assemble() {
  463. let mut storage = [0u8; 12];
  464. let mut p_assembler = PacketAssembler::new(&mut storage[..]);
  465. let data = b"Hello World!";
  466. p_assembler
  467. .start(Some(data.len()), Instant::from_secs(0), 0)
  468. .unwrap();
  469. p_assembler.add(b"World!", b"Hello ".len()).unwrap();
  470. assert_eq!(
  471. p_assembler.assemble(),
  472. Err(Error::PacketAssemblerIncomplete)
  473. );
  474. p_assembler.add(b"Hello ", 0).unwrap();
  475. assert_eq!(p_assembler.assemble(), Ok(&b"Hello World!"[..]));
  476. }
  477. #[test]
  478. fn packet_assembler_set() {
  479. let key = Key { id: 1 };
  480. let mut set = PacketAssemblerSet::<'_, _>::new(vec![], std::collections::BTreeMap::new());
  481. if let Err(e) = set.get_packet_assembler_mut(&key) {
  482. assert_eq!(e, Error::PacketAssemblerSetKeyNotFound);
  483. }
  484. assert!(set.reserve_with_key(&key).is_ok());
  485. }
  486. #[test]
  487. fn packet_assembler_set_borrowed() {
  488. let mut buf = [0u8, 127];
  489. let mut packet_assembler_cache = [PacketAssembler::<'_>::new(&mut buf[..])];
  490. let mut packet_index_cache = [None];
  491. let key = Key { id: 1 };
  492. let mut set =
  493. PacketAssemblerSet::new(&mut packet_assembler_cache[..], &mut packet_index_cache[..]);
  494. if let Err(e) = set.get_packet_assembler_mut(&key) {
  495. assert_eq!(e, Error::PacketAssemblerSetKeyNotFound);
  496. }
  497. assert!(set.reserve_with_key(&key).is_ok());
  498. }
  499. #[test]
  500. fn packet_assembler_set_assembling_many() {
  501. let mut buf = [0u8, 127];
  502. let mut packet_assembler_cache = [PacketAssembler::new(&mut buf[..])];
  503. let mut packet_index_cache = [None];
  504. let mut set =
  505. PacketAssemblerSet::new(&mut packet_assembler_cache[..], &mut packet_index_cache[..]);
  506. let key = Key { id: 0 };
  507. set.reserve_with_key(&key).unwrap();
  508. set.get_packet_assembler_mut(&key)
  509. .unwrap()
  510. .start(Some(0), Instant::from_secs(0), 0)
  511. .unwrap();
  512. set.get_assembled_packet(&key).unwrap();
  513. let key = Key { id: 1 };
  514. set.reserve_with_key(&key).unwrap();
  515. set.get_packet_assembler_mut(&key)
  516. .unwrap()
  517. .start(Some(0), Instant::from_secs(0), 0)
  518. .unwrap();
  519. set.get_assembled_packet(&key).unwrap();
  520. let key = Key { id: 2 };
  521. set.reserve_with_key(&key).unwrap();
  522. set.get_packet_assembler_mut(&key)
  523. .unwrap()
  524. .start(Some(0), Instant::from_secs(0), 0)
  525. .unwrap();
  526. set.get_assembled_packet(&key).unwrap();
  527. }
  528. }