packet_buffer.rs 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417
  1. use managed::ManagedSlice;
  2. use crate::storage::{Full, RingBuffer};
  3. use super::Empty;
  4. /// Size and header of a packet.
  5. #[derive(Debug, Clone, Copy)]
  6. #[cfg_attr(feature = "defmt", derive(defmt::Format))]
  7. pub struct PacketMetadata<H> {
  8. size: usize,
  9. header: Option<H>,
  10. }
  11. impl<H> PacketMetadata<H> {
  12. /// Empty packet description.
  13. pub const EMPTY: PacketMetadata<H> = PacketMetadata {
  14. size: 0,
  15. header: None,
  16. };
  17. fn padding(size: usize) -> PacketMetadata<H> {
  18. PacketMetadata {
  19. size: size,
  20. header: None,
  21. }
  22. }
  23. fn packet(size: usize, header: H) -> PacketMetadata<H> {
  24. PacketMetadata {
  25. size: size,
  26. header: Some(header),
  27. }
  28. }
  29. fn is_padding(&self) -> bool {
  30. self.header.is_none()
  31. }
  32. }
  33. /// An UDP packet ring buffer.
  34. #[derive(Debug)]
  35. pub struct PacketBuffer<'a, H: 'a> {
  36. metadata_ring: RingBuffer<'a, PacketMetadata<H>>,
  37. payload_ring: RingBuffer<'a, u8>,
  38. }
  39. impl<'a, H> PacketBuffer<'a, H> {
  40. /// Create a new packet buffer with the provided metadata and payload storage.
  41. ///
  42. /// Metadata storage limits the maximum _number_ of packets in the buffer and payload
  43. /// storage limits the maximum _total size_ of packets.
  44. pub fn new<MS, PS>(metadata_storage: MS, payload_storage: PS) -> PacketBuffer<'a, H>
  45. where
  46. MS: Into<ManagedSlice<'a, PacketMetadata<H>>>,
  47. PS: Into<ManagedSlice<'a, u8>>,
  48. {
  49. PacketBuffer {
  50. metadata_ring: RingBuffer::new(metadata_storage),
  51. payload_ring: RingBuffer::new(payload_storage),
  52. }
  53. }
  54. /// Query whether the buffer is empty.
  55. pub fn is_empty(&self) -> bool {
  56. self.metadata_ring.is_empty()
  57. }
  58. /// Query whether the buffer is full.
  59. pub fn is_full(&self) -> bool {
  60. self.metadata_ring.is_full()
  61. }
  62. // There is currently no enqueue_with() because of the complexity of managing padding
  63. // in case of failure.
  64. /// Enqueue a single packet with the given header into the buffer, and
  65. /// return a reference to its payload, or return `Err(Full)`
  66. /// if the buffer is full.
  67. pub fn enqueue(&mut self, size: usize, header: H) -> Result<&mut [u8], Full> {
  68. if self.payload_ring.capacity() < size || self.metadata_ring.is_full() {
  69. return Err(Full);
  70. }
  71. // Ring is currently empty. Clear it (resetting `read_at`) to maximize
  72. // for contiguous space.
  73. if self.payload_ring.is_empty() {
  74. self.payload_ring.clear();
  75. }
  76. let window = self.payload_ring.window();
  77. let contig_window = self.payload_ring.contiguous_window();
  78. if window < size {
  79. return Err(Full);
  80. } else if contig_window < size {
  81. if window - contig_window < size {
  82. // The buffer length is larger than the current contiguous window
  83. // and is larger than the contiguous window will be after adding
  84. // the padding necessary to circle around to the beginning of the
  85. // ring buffer.
  86. return Err(Full);
  87. } else {
  88. // Add padding to the end of the ring buffer so that the
  89. // contiguous window is at the beginning of the ring buffer.
  90. *self.metadata_ring.enqueue_one()? = PacketMetadata::padding(contig_window);
  91. // note(discard): function does not write to the result
  92. // enqueued padding buffer location
  93. let _buf_enqueued = self.payload_ring.enqueue_many(contig_window);
  94. }
  95. }
  96. *self.metadata_ring.enqueue_one()? = PacketMetadata::packet(size, header);
  97. let payload_buf = self.payload_ring.enqueue_many(size);
  98. debug_assert!(payload_buf.len() == size);
  99. Ok(payload_buf)
  100. }
  101. /// Call `f` with a packet from the buffer large enough to fit `max_size` bytes. The packet
  102. /// is shrunk to the size returned from `f` and enqueued into the buffer.
  103. pub fn enqueue_with_infallible<'b, F>(
  104. &'b mut self,
  105. max_size: usize,
  106. header: H,
  107. f: F,
  108. ) -> Result<usize, Full>
  109. where
  110. F: FnOnce(&'b mut [u8]) -> usize,
  111. {
  112. if self.payload_ring.capacity() < max_size || self.metadata_ring.is_full() {
  113. return Err(Full);
  114. }
  115. let window = self.payload_ring.window();
  116. let contig_window = self.payload_ring.contiguous_window();
  117. if window < max_size {
  118. return Err(Full);
  119. } else if contig_window < max_size {
  120. if window - contig_window < max_size {
  121. // The buffer length is larger than the current contiguous window
  122. // and is larger than the contiguous window will be after adding
  123. // the padding necessary to circle around to the beginning of the
  124. // ring buffer.
  125. return Err(Full);
  126. } else {
  127. // Add padding to the end of the ring buffer so that the
  128. // contiguous window is at the beginning of the ring buffer.
  129. *self.metadata_ring.enqueue_one()? = PacketMetadata::padding(contig_window);
  130. // note(discard): function does not write to the result
  131. // enqueued padding buffer location
  132. let _buf_enqueued = self.payload_ring.enqueue_many(contig_window);
  133. }
  134. }
  135. let (size, _) = self
  136. .payload_ring
  137. .enqueue_many_with(|data| (f(&mut data[..max_size]), ()));
  138. *self.metadata_ring.enqueue_one()? = PacketMetadata::packet(size, header);
  139. Ok(size)
  140. }
  141. fn dequeue_padding(&mut self) {
  142. let Self {
  143. ref mut metadata_ring,
  144. ref mut payload_ring,
  145. } = *self;
  146. let _ = metadata_ring.dequeue_one_with(|metadata| {
  147. if metadata.is_padding() {
  148. // note(discard): function does not use value of dequeued padding bytes
  149. let _buf_dequeued = payload_ring.dequeue_many(metadata.size);
  150. Ok(()) // dequeue metadata
  151. } else {
  152. Err(()) // don't dequeue metadata
  153. }
  154. });
  155. }
  156. /// Call `f` with a single packet from the buffer, and dequeue the packet if `f`
  157. /// returns successfully, or return `Err(EmptyError)` if the buffer is empty.
  158. pub fn dequeue_with<'c, R, E, F>(&'c mut self, f: F) -> Result<Result<R, E>, Empty>
  159. where
  160. F: FnOnce(&mut H, &'c mut [u8]) -> Result<R, E>,
  161. {
  162. self.dequeue_padding();
  163. let Self {
  164. ref mut metadata_ring,
  165. ref mut payload_ring,
  166. } = *self;
  167. metadata_ring.dequeue_one_with(move |metadata| {
  168. let PacketMetadata {
  169. ref mut header,
  170. size,
  171. } = *metadata;
  172. payload_ring
  173. .dequeue_many_with(|payload_buf| {
  174. debug_assert!(payload_buf.len() >= size);
  175. match f(header.as_mut().unwrap(), &mut payload_buf[..size]) {
  176. Ok(val) => (size, Ok(val)),
  177. Err(err) => (0, Err(err)),
  178. }
  179. })
  180. .1
  181. })
  182. }
  183. /// Dequeue a single packet from the buffer, and return a reference to its payload
  184. /// as well as its header, or return `Err(Error::Exhausted)` if the buffer is empty.
  185. pub fn dequeue(&mut self) -> Result<(H, &mut [u8]), Empty> {
  186. self.dequeue_padding();
  187. let PacketMetadata {
  188. ref mut header,
  189. size,
  190. } = *self.metadata_ring.dequeue_one()?;
  191. let payload_buf = self.payload_ring.dequeue_many(size);
  192. debug_assert!(payload_buf.len() == size);
  193. Ok((header.take().unwrap(), payload_buf))
  194. }
  195. /// Peek at a single packet from the buffer without removing it, and return a reference to
  196. /// its payload as well as its header, or return `Err(Error:Exhausted)` if the buffer is empty.
  197. ///
  198. /// This function otherwise behaves identically to [dequeue](#method.dequeue).
  199. pub fn peek(&mut self) -> Result<(&H, &[u8]), Empty> {
  200. self.dequeue_padding();
  201. if let Some(metadata) = self.metadata_ring.get_allocated(0, 1).first() {
  202. Ok((
  203. metadata.header.as_ref().unwrap(),
  204. self.payload_ring.get_allocated(0, metadata.size),
  205. ))
  206. } else {
  207. Err(Empty)
  208. }
  209. }
  210. /// Return the maximum number packets that can be stored.
  211. pub fn packet_capacity(&self) -> usize {
  212. self.metadata_ring.capacity()
  213. }
  214. /// Return the maximum number of bytes in the payload ring buffer.
  215. pub fn payload_capacity(&self) -> usize {
  216. self.payload_ring.capacity()
  217. }
  218. /// Reset the packet buffer and clear any staged.
  219. #[allow(unused)]
  220. pub(crate) fn reset(&mut self) {
  221. self.payload_ring.clear();
  222. self.metadata_ring.clear();
  223. }
  224. }
  225. #[cfg(test)]
  226. mod test {
  227. use super::*;
  228. fn buffer() -> PacketBuffer<'static, ()> {
  229. PacketBuffer::new(vec![PacketMetadata::EMPTY; 4], vec![0u8; 16])
  230. }
  231. #[test]
  232. fn test_simple() {
  233. let mut buffer = buffer();
  234. buffer.enqueue(6, ()).unwrap().copy_from_slice(b"abcdef");
  235. assert_eq!(buffer.enqueue(16, ()), Err(Full));
  236. assert_eq!(buffer.metadata_ring.len(), 1);
  237. assert_eq!(buffer.dequeue().unwrap().1, &b"abcdef"[..]);
  238. assert_eq!(buffer.dequeue(), Err(Empty));
  239. }
  240. #[test]
  241. fn test_peek() {
  242. let mut buffer = buffer();
  243. assert_eq!(buffer.peek(), Err(Empty));
  244. buffer.enqueue(6, ()).unwrap().copy_from_slice(b"abcdef");
  245. assert_eq!(buffer.metadata_ring.len(), 1);
  246. assert_eq!(buffer.peek().unwrap().1, &b"abcdef"[..]);
  247. assert_eq!(buffer.dequeue().unwrap().1, &b"abcdef"[..]);
  248. assert_eq!(buffer.peek(), Err(Empty));
  249. }
  250. #[test]
  251. fn test_padding() {
  252. let mut buffer = buffer();
  253. assert!(buffer.enqueue(6, ()).is_ok());
  254. assert!(buffer.enqueue(8, ()).is_ok());
  255. assert!(buffer.dequeue().is_ok());
  256. buffer.enqueue(4, ()).unwrap().copy_from_slice(b"abcd");
  257. assert_eq!(buffer.metadata_ring.len(), 3);
  258. assert!(buffer.dequeue().is_ok());
  259. assert_eq!(buffer.dequeue().unwrap().1, &b"abcd"[..]);
  260. assert_eq!(buffer.metadata_ring.len(), 0);
  261. }
  262. #[test]
  263. fn test_padding_with_large_payload() {
  264. let mut buffer = buffer();
  265. assert!(buffer.enqueue(12, ()).is_ok());
  266. assert!(buffer.dequeue().is_ok());
  267. buffer
  268. .enqueue(12, ())
  269. .unwrap()
  270. .copy_from_slice(b"abcdefghijkl");
  271. }
  272. #[test]
  273. fn test_dequeue_with() {
  274. let mut buffer = buffer();
  275. assert!(buffer.enqueue(6, ()).is_ok());
  276. assert!(buffer.enqueue(8, ()).is_ok());
  277. assert!(buffer.dequeue().is_ok());
  278. buffer.enqueue(4, ()).unwrap().copy_from_slice(b"abcd");
  279. assert_eq!(buffer.metadata_ring.len(), 3);
  280. assert!(buffer.dequeue().is_ok());
  281. assert!(matches!(
  282. buffer.dequeue_with(|_, _| Result::<(), u32>::Err(123)),
  283. Ok(Err(_))
  284. ));
  285. assert_eq!(buffer.metadata_ring.len(), 1);
  286. assert!(buffer
  287. .dequeue_with(|&mut (), payload| {
  288. assert_eq!(payload, &b"abcd"[..]);
  289. Result::<(), ()>::Ok(())
  290. })
  291. .is_ok());
  292. assert_eq!(buffer.metadata_ring.len(), 0);
  293. }
  294. #[test]
  295. fn test_metadata_full_empty() {
  296. let mut buffer = buffer();
  297. assert!(buffer.is_empty());
  298. assert!(!buffer.is_full());
  299. assert!(buffer.enqueue(1, ()).is_ok());
  300. assert!(!buffer.is_empty());
  301. assert!(buffer.enqueue(1, ()).is_ok());
  302. assert!(buffer.enqueue(1, ()).is_ok());
  303. assert!(!buffer.is_full());
  304. assert!(!buffer.is_empty());
  305. assert!(buffer.enqueue(1, ()).is_ok());
  306. assert!(buffer.is_full());
  307. assert!(!buffer.is_empty());
  308. assert_eq!(buffer.metadata_ring.len(), 4);
  309. assert_eq!(buffer.enqueue(1, ()), Err(Full));
  310. }
  311. #[test]
  312. fn test_window_too_small() {
  313. let mut buffer = buffer();
  314. assert!(buffer.enqueue(4, ()).is_ok());
  315. assert!(buffer.enqueue(8, ()).is_ok());
  316. assert!(buffer.dequeue().is_ok());
  317. assert_eq!(buffer.enqueue(16, ()), Err(Full));
  318. assert_eq!(buffer.metadata_ring.len(), 1);
  319. }
  320. #[test]
  321. fn test_contiguous_window_too_small() {
  322. let mut buffer = buffer();
  323. assert!(buffer.enqueue(4, ()).is_ok());
  324. assert!(buffer.enqueue(8, ()).is_ok());
  325. assert!(buffer.dequeue().is_ok());
  326. assert_eq!(buffer.enqueue(8, ()), Err(Full));
  327. assert_eq!(buffer.metadata_ring.len(), 1);
  328. }
  329. #[test]
  330. fn test_contiguous_window_wrap() {
  331. let mut buffer = buffer();
  332. assert!(buffer.enqueue(15, ()).is_ok());
  333. assert!(buffer.dequeue().is_ok());
  334. assert!(buffer.enqueue(16, ()).is_ok());
  335. }
  336. #[test]
  337. fn test_capacity_too_small() {
  338. let mut buffer = buffer();
  339. assert_eq!(buffer.enqueue(32, ()), Err(Full));
  340. }
  341. #[test]
  342. fn test_contig_window_prioritized() {
  343. let mut buffer = buffer();
  344. assert!(buffer.enqueue(4, ()).is_ok());
  345. assert!(buffer.dequeue().is_ok());
  346. assert!(buffer.enqueue(5, ()).is_ok());
  347. }
  348. #[test]
  349. fn clear() {
  350. let mut buffer = buffer();
  351. // Ensure enqueuing data in teh buffer fills it somewhat.
  352. assert!(buffer.is_empty());
  353. assert!(buffer.enqueue(6, ()).is_ok());
  354. // Ensure that resetting the buffer causes it to be empty.
  355. assert!(!buffer.is_empty());
  356. buffer.reset();
  357. assert!(buffer.is_empty());
  358. }
  359. }