packet_buffer.rs 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402
  1. use managed::ManagedSlice;
  2. use crate::storage::{Full, RingBuffer};
  3. use super::Empty;
  4. /// Size and header of a packet.
  5. #[derive(Debug, Clone, Copy)]
  6. #[cfg_attr(feature = "defmt", derive(defmt::Format))]
  7. pub struct PacketMetadata<H> {
  8. size: usize,
  9. header: Option<H>,
  10. }
  11. impl<H> PacketMetadata<H> {
  12. /// Empty packet description.
  13. pub const EMPTY: PacketMetadata<H> = PacketMetadata {
  14. size: 0,
  15. header: None,
  16. };
  17. fn padding(size: usize) -> PacketMetadata<H> {
  18. PacketMetadata {
  19. size: size,
  20. header: None,
  21. }
  22. }
  23. fn packet(size: usize, header: H) -> PacketMetadata<H> {
  24. PacketMetadata {
  25. size: size,
  26. header: Some(header),
  27. }
  28. }
  29. fn is_padding(&self) -> bool {
  30. self.header.is_none()
  31. }
  32. }
  33. /// An UDP packet ring buffer.
  34. #[derive(Debug)]
  35. pub struct PacketBuffer<'a, H: 'a> {
  36. metadata_ring: RingBuffer<'a, PacketMetadata<H>>,
  37. payload_ring: RingBuffer<'a, u8>,
  38. }
  39. impl<'a, H> PacketBuffer<'a, H> {
  40. /// Create a new packet buffer with the provided metadata and payload storage.
  41. ///
  42. /// Metadata storage limits the maximum _number_ of packets in the buffer and payload
  43. /// storage limits the maximum _total size_ of packets.
  44. pub fn new<MS, PS>(metadata_storage: MS, payload_storage: PS) -> PacketBuffer<'a, H>
  45. where
  46. MS: Into<ManagedSlice<'a, PacketMetadata<H>>>,
  47. PS: Into<ManagedSlice<'a, u8>>,
  48. {
  49. PacketBuffer {
  50. metadata_ring: RingBuffer::new(metadata_storage),
  51. payload_ring: RingBuffer::new(payload_storage),
  52. }
  53. }
  54. /// Query whether the buffer is empty.
  55. pub fn is_empty(&self) -> bool {
  56. self.metadata_ring.is_empty()
  57. }
  58. /// Query whether the buffer is full.
  59. pub fn is_full(&self) -> bool {
  60. self.metadata_ring.is_full()
  61. }
  62. // There is currently no enqueue_with() because of the complexity of managing padding
  63. // in case of failure.
  64. /// Enqueue a single packet with the given header into the buffer, and
  65. /// return a reference to its payload, or return `Err(Full)`
  66. /// if the buffer is full.
  67. pub fn enqueue(&mut self, size: usize, header: H) -> Result<&mut [u8], Full> {
  68. if self.payload_ring.capacity() < size || self.metadata_ring.is_full() {
  69. return Err(Full);
  70. }
  71. // Ring is currently empty. Clear it (resetting `read_at`) to maximize
  72. // for contiguous space.
  73. if self.payload_ring.is_empty() {
  74. self.payload_ring.clear();
  75. }
  76. let window = self.payload_ring.window();
  77. let contig_window = self.payload_ring.contiguous_window();
  78. if window < size {
  79. return Err(Full);
  80. } else if contig_window < size {
  81. if window - contig_window < size {
  82. // The buffer length is larger than the current contiguous window
  83. // and is larger than the contiguous window will be after adding
  84. // the padding necessary to circle around to the beginning of the
  85. // ring buffer.
  86. return Err(Full);
  87. } else {
  88. // Add padding to the end of the ring buffer so that the
  89. // contiguous window is at the beginning of the ring buffer.
  90. *self.metadata_ring.enqueue_one()? = PacketMetadata::padding(contig_window);
  91. // note(discard): function does not write to the result
  92. // enqueued padding buffer location
  93. let _buf_enqueued = self.payload_ring.enqueue_many(contig_window);
  94. }
  95. }
  96. *self.metadata_ring.enqueue_one()? = PacketMetadata::packet(size, header);
  97. let payload_buf = self.payload_ring.enqueue_many(size);
  98. debug_assert!(payload_buf.len() == size);
  99. Ok(payload_buf)
  100. }
  101. /// Call `f` with a packet from the buffer large enough to fit `max_size` bytes. The packet
  102. /// is shrunk to the size returned from `f` and enqueued into the buffer.
  103. pub fn enqueue_with_infallible<'b, F>(
  104. &'b mut self,
  105. max_size: usize,
  106. header: H,
  107. f: F,
  108. ) -> Result<usize, Full>
  109. where
  110. F: FnOnce(&'b mut [u8]) -> usize,
  111. {
  112. if self.payload_ring.capacity() < max_size || self.metadata_ring.is_full() {
  113. return Err(Full);
  114. }
  115. let window = self.payload_ring.window();
  116. let contig_window = self.payload_ring.contiguous_window();
  117. if window < max_size {
  118. return Err(Full);
  119. } else if contig_window < max_size {
  120. if window - contig_window < max_size {
  121. // The buffer length is larger than the current contiguous window
  122. // and is larger than the contiguous window will be after adding
  123. // the padding necessary to circle around to the beginning of the
  124. // ring buffer.
  125. return Err(Full);
  126. } else {
  127. // Add padding to the end of the ring buffer so that the
  128. // contiguous window is at the beginning of the ring buffer.
  129. *self.metadata_ring.enqueue_one()? = PacketMetadata::padding(contig_window);
  130. // note(discard): function does not write to the result
  131. // enqueued padding buffer location
  132. let _buf_enqueued = self.payload_ring.enqueue_many(contig_window);
  133. }
  134. }
  135. let (size, _) = self
  136. .payload_ring
  137. .enqueue_many_with(|data| (f(&mut data[..max_size]), ()));
  138. *self.metadata_ring.enqueue_one()? = PacketMetadata::packet(size, header);
  139. Ok(size)
  140. }
  141. fn dequeue_padding(&mut self) {
  142. let _ = self.metadata_ring.dequeue_one_with(|metadata| {
  143. if metadata.is_padding() {
  144. // note(discard): function does not use value of dequeued padding bytes
  145. let _buf_dequeued = self.payload_ring.dequeue_many(metadata.size);
  146. Ok(()) // dequeue metadata
  147. } else {
  148. Err(()) // don't dequeue metadata
  149. }
  150. });
  151. }
  152. /// Call `f` with a single packet from the buffer, and dequeue the packet if `f`
  153. /// returns successfully, or return `Err(EmptyError)` if the buffer is empty.
  154. pub fn dequeue_with<'c, R, E, F>(&'c mut self, f: F) -> Result<Result<R, E>, Empty>
  155. where
  156. F: FnOnce(&mut H, &'c mut [u8]) -> Result<R, E>,
  157. {
  158. self.dequeue_padding();
  159. self.metadata_ring.dequeue_one_with(|metadata| {
  160. self.payload_ring
  161. .dequeue_many_with(|payload_buf| {
  162. debug_assert!(payload_buf.len() >= metadata.size);
  163. match f(
  164. metadata.header.as_mut().unwrap(),
  165. &mut payload_buf[..metadata.size],
  166. ) {
  167. Ok(val) => (metadata.size, Ok(val)),
  168. Err(err) => (0, Err(err)),
  169. }
  170. })
  171. .1
  172. })
  173. }
  174. /// Dequeue a single packet from the buffer, and return a reference to its payload
  175. /// as well as its header, or return `Err(Error::Exhausted)` if the buffer is empty.
  176. pub fn dequeue(&mut self) -> Result<(H, &mut [u8]), Empty> {
  177. self.dequeue_padding();
  178. let meta = self.metadata_ring.dequeue_one()?;
  179. let payload_buf = self.payload_ring.dequeue_many(meta.size);
  180. debug_assert!(payload_buf.len() == meta.size);
  181. Ok((meta.header.take().unwrap(), payload_buf))
  182. }
  183. /// Peek at a single packet from the buffer without removing it, and return a reference to
  184. /// its payload as well as its header, or return `Err(Error:Exhausted)` if the buffer is empty.
  185. ///
  186. /// This function otherwise behaves identically to [dequeue](#method.dequeue).
  187. pub fn peek(&mut self) -> Result<(&H, &[u8]), Empty> {
  188. self.dequeue_padding();
  189. if let Some(metadata) = self.metadata_ring.get_allocated(0, 1).first() {
  190. Ok((
  191. metadata.header.as_ref().unwrap(),
  192. self.payload_ring.get_allocated(0, metadata.size),
  193. ))
  194. } else {
  195. Err(Empty)
  196. }
  197. }
  198. /// Return the maximum number packets that can be stored.
  199. pub fn packet_capacity(&self) -> usize {
  200. self.metadata_ring.capacity()
  201. }
  202. /// Return the maximum number of bytes in the payload ring buffer.
  203. pub fn payload_capacity(&self) -> usize {
  204. self.payload_ring.capacity()
  205. }
  206. /// Reset the packet buffer and clear any staged.
  207. #[allow(unused)]
  208. pub(crate) fn reset(&mut self) {
  209. self.payload_ring.clear();
  210. self.metadata_ring.clear();
  211. }
  212. }
  213. #[cfg(test)]
  214. mod test {
  215. use super::*;
  216. fn buffer() -> PacketBuffer<'static, ()> {
  217. PacketBuffer::new(vec![PacketMetadata::EMPTY; 4], vec![0u8; 16])
  218. }
  219. #[test]
  220. fn test_simple() {
  221. let mut buffer = buffer();
  222. buffer.enqueue(6, ()).unwrap().copy_from_slice(b"abcdef");
  223. assert_eq!(buffer.enqueue(16, ()), Err(Full));
  224. assert_eq!(buffer.metadata_ring.len(), 1);
  225. assert_eq!(buffer.dequeue().unwrap().1, &b"abcdef"[..]);
  226. assert_eq!(buffer.dequeue(), Err(Empty));
  227. }
  228. #[test]
  229. fn test_peek() {
  230. let mut buffer = buffer();
  231. assert_eq!(buffer.peek(), Err(Empty));
  232. buffer.enqueue(6, ()).unwrap().copy_from_slice(b"abcdef");
  233. assert_eq!(buffer.metadata_ring.len(), 1);
  234. assert_eq!(buffer.peek().unwrap().1, &b"abcdef"[..]);
  235. assert_eq!(buffer.dequeue().unwrap().1, &b"abcdef"[..]);
  236. assert_eq!(buffer.peek(), Err(Empty));
  237. }
  238. #[test]
  239. fn test_padding() {
  240. let mut buffer = buffer();
  241. assert!(buffer.enqueue(6, ()).is_ok());
  242. assert!(buffer.enqueue(8, ()).is_ok());
  243. assert!(buffer.dequeue().is_ok());
  244. buffer.enqueue(4, ()).unwrap().copy_from_slice(b"abcd");
  245. assert_eq!(buffer.metadata_ring.len(), 3);
  246. assert!(buffer.dequeue().is_ok());
  247. assert_eq!(buffer.dequeue().unwrap().1, &b"abcd"[..]);
  248. assert_eq!(buffer.metadata_ring.len(), 0);
  249. }
  250. #[test]
  251. fn test_padding_with_large_payload() {
  252. let mut buffer = buffer();
  253. assert!(buffer.enqueue(12, ()).is_ok());
  254. assert!(buffer.dequeue().is_ok());
  255. buffer
  256. .enqueue(12, ())
  257. .unwrap()
  258. .copy_from_slice(b"abcdefghijkl");
  259. }
  260. #[test]
  261. fn test_dequeue_with() {
  262. let mut buffer = buffer();
  263. assert!(buffer.enqueue(6, ()).is_ok());
  264. assert!(buffer.enqueue(8, ()).is_ok());
  265. assert!(buffer.dequeue().is_ok());
  266. buffer.enqueue(4, ()).unwrap().copy_from_slice(b"abcd");
  267. assert_eq!(buffer.metadata_ring.len(), 3);
  268. assert!(buffer.dequeue().is_ok());
  269. assert!(matches!(
  270. buffer.dequeue_with(|_, _| Result::<(), u32>::Err(123)),
  271. Ok(Err(_))
  272. ));
  273. assert_eq!(buffer.metadata_ring.len(), 1);
  274. assert!(buffer
  275. .dequeue_with(|&mut (), payload| {
  276. assert_eq!(payload, &b"abcd"[..]);
  277. Result::<(), ()>::Ok(())
  278. })
  279. .is_ok());
  280. assert_eq!(buffer.metadata_ring.len(), 0);
  281. }
  282. #[test]
  283. fn test_metadata_full_empty() {
  284. let mut buffer = buffer();
  285. assert!(buffer.is_empty());
  286. assert!(!buffer.is_full());
  287. assert!(buffer.enqueue(1, ()).is_ok());
  288. assert!(!buffer.is_empty());
  289. assert!(buffer.enqueue(1, ()).is_ok());
  290. assert!(buffer.enqueue(1, ()).is_ok());
  291. assert!(!buffer.is_full());
  292. assert!(!buffer.is_empty());
  293. assert!(buffer.enqueue(1, ()).is_ok());
  294. assert!(buffer.is_full());
  295. assert!(!buffer.is_empty());
  296. assert_eq!(buffer.metadata_ring.len(), 4);
  297. assert_eq!(buffer.enqueue(1, ()), Err(Full));
  298. }
  299. #[test]
  300. fn test_window_too_small() {
  301. let mut buffer = buffer();
  302. assert!(buffer.enqueue(4, ()).is_ok());
  303. assert!(buffer.enqueue(8, ()).is_ok());
  304. assert!(buffer.dequeue().is_ok());
  305. assert_eq!(buffer.enqueue(16, ()), Err(Full));
  306. assert_eq!(buffer.metadata_ring.len(), 1);
  307. }
  308. #[test]
  309. fn test_contiguous_window_too_small() {
  310. let mut buffer = buffer();
  311. assert!(buffer.enqueue(4, ()).is_ok());
  312. assert!(buffer.enqueue(8, ()).is_ok());
  313. assert!(buffer.dequeue().is_ok());
  314. assert_eq!(buffer.enqueue(8, ()), Err(Full));
  315. assert_eq!(buffer.metadata_ring.len(), 1);
  316. }
  317. #[test]
  318. fn test_contiguous_window_wrap() {
  319. let mut buffer = buffer();
  320. assert!(buffer.enqueue(15, ()).is_ok());
  321. assert!(buffer.dequeue().is_ok());
  322. assert!(buffer.enqueue(16, ()).is_ok());
  323. }
  324. #[test]
  325. fn test_capacity_too_small() {
  326. let mut buffer = buffer();
  327. assert_eq!(buffer.enqueue(32, ()), Err(Full));
  328. }
  329. #[test]
  330. fn test_contig_window_prioritized() {
  331. let mut buffer = buffer();
  332. assert!(buffer.enqueue(4, ()).is_ok());
  333. assert!(buffer.dequeue().is_ok());
  334. assert!(buffer.enqueue(5, ()).is_ok());
  335. }
  336. #[test]
  337. fn clear() {
  338. let mut buffer = buffer();
  339. // Ensure enqueuing data in teh buffer fills it somewhat.
  340. assert!(buffer.is_empty());
  341. assert!(buffer.enqueue(6, ()).is_ok());
  342. // Ensure that resetting the buffer causes it to be empty.
  343. assert!(!buffer.is_empty());
  344. buffer.reset();
  345. assert!(buffer.is_empty());
  346. }
  347. }