packet_buffer.rs 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360
  1. use managed::ManagedSlice;
  2. use crate::storage::RingBuffer;
  3. use crate::{Error, Result};
  4. /// Size and header of a packet.
  5. #[derive(Debug, Clone, Copy)]
  6. #[cfg_attr(feature = "defmt", derive(defmt::Format))]
  7. pub struct PacketMetadata<H> {
  8. size: usize,
  9. header: Option<H>,
  10. }
  11. impl<H> PacketMetadata<H> {
  12. /// Empty packet description.
  13. pub const EMPTY: PacketMetadata<H> = PacketMetadata {
  14. size: 0,
  15. header: None,
  16. };
  17. fn padding(size: usize) -> PacketMetadata<H> {
  18. PacketMetadata {
  19. size: size,
  20. header: None,
  21. }
  22. }
  23. fn packet(size: usize, header: H) -> PacketMetadata<H> {
  24. PacketMetadata {
  25. size: size,
  26. header: Some(header),
  27. }
  28. }
  29. fn is_padding(&self) -> bool {
  30. self.header.is_none()
  31. }
  32. }
  33. /// An UDP packet ring buffer.
  34. #[derive(Debug)]
  35. pub struct PacketBuffer<'a, H: 'a> {
  36. metadata_ring: RingBuffer<'a, PacketMetadata<H>>,
  37. payload_ring: RingBuffer<'a, u8>,
  38. }
  39. impl<'a, H> PacketBuffer<'a, H> {
  40. /// Create a new packet buffer with the provided metadata and payload storage.
  41. ///
  42. /// Metadata storage limits the maximum _number_ of packets in the buffer and payload
  43. /// storage limits the maximum _total size_ of packets.
  44. pub fn new<MS, PS>(metadata_storage: MS, payload_storage: PS) -> PacketBuffer<'a, H>
  45. where
  46. MS: Into<ManagedSlice<'a, PacketMetadata<H>>>,
  47. PS: Into<ManagedSlice<'a, u8>>,
  48. {
  49. PacketBuffer {
  50. metadata_ring: RingBuffer::new(metadata_storage),
  51. payload_ring: RingBuffer::new(payload_storage),
  52. }
  53. }
  54. /// Query whether the buffer is empty.
  55. pub fn is_empty(&self) -> bool {
  56. self.metadata_ring.is_empty()
  57. }
  58. /// Query whether the buffer is full.
  59. pub fn is_full(&self) -> bool {
  60. self.metadata_ring.is_full()
  61. }
  62. // There is currently no enqueue_with() because of the complexity of managing padding
  63. // in case of failure.
  64. /// Enqueue a single packet with the given header into the buffer, and
  65. /// return a reference to its payload, or return `Err(Error::Exhausted)`
  66. /// if the buffer is full, or return `Err(Error::Truncated)` if the buffer
  67. /// does not have enough spare payload space.
  68. pub fn enqueue(&mut self, size: usize, header: H) -> Result<&mut [u8]> {
  69. if self.payload_ring.capacity() < size {
  70. return Err(Error::Truncated);
  71. }
  72. if self.metadata_ring.is_full() {
  73. return Err(Error::Exhausted);
  74. }
  75. let window = self.payload_ring.window();
  76. let contig_window = self.payload_ring.contiguous_window();
  77. if window < size {
  78. return Err(Error::Exhausted);
  79. } else if contig_window < size {
  80. if window - contig_window < size {
  81. // The buffer length is larger than the current contiguous window
  82. // and is larger than the contiguous window will be after adding
  83. // the padding necessary to circle around to the beginning of the
  84. // ring buffer.
  85. return Err(Error::Exhausted);
  86. } else {
  87. // Add padding to the end of the ring buffer so that the
  88. // contiguous window is at the beginning of the ring buffer.
  89. *self.metadata_ring.enqueue_one()? = PacketMetadata::padding(contig_window);
  90. // note(discard): function does not write to the result
  91. // enqueued padding buffer location
  92. let _buf_enqueued = self.payload_ring.enqueue_many(contig_window);
  93. }
  94. }
  95. *self.metadata_ring.enqueue_one()? = PacketMetadata::packet(size, header);
  96. let payload_buf = self.payload_ring.enqueue_many(size);
  97. debug_assert!(payload_buf.len() == size);
  98. Ok(payload_buf)
  99. }
  100. fn dequeue_padding(&mut self) {
  101. let Self {
  102. ref mut metadata_ring,
  103. ref mut payload_ring,
  104. } = *self;
  105. let _ = metadata_ring.dequeue_one_with(|metadata| {
  106. if metadata.is_padding() {
  107. // note(discard): function does not use value of dequeued padding bytes
  108. let _buf_dequeued = payload_ring.dequeue_many(metadata.size);
  109. Ok(()) // dequeue metadata
  110. } else {
  111. Err(Error::Exhausted) // don't dequeue metadata
  112. }
  113. });
  114. }
  115. /// Call `f` with a single packet from the buffer, and dequeue the packet if `f`
  116. /// returns successfully, or return `Err(Error::Exhausted)` if the buffer is empty.
  117. pub fn dequeue_with<'c, R, F>(&'c mut self, f: F) -> Result<R>
  118. where
  119. F: FnOnce(&mut H, &'c mut [u8]) -> Result<R>,
  120. {
  121. self.dequeue_padding();
  122. let Self {
  123. ref mut metadata_ring,
  124. ref mut payload_ring,
  125. } = *self;
  126. metadata_ring.dequeue_one_with(move |metadata| {
  127. let PacketMetadata {
  128. ref mut header,
  129. size,
  130. } = *metadata;
  131. payload_ring
  132. .dequeue_many_with(|payload_buf| {
  133. debug_assert!(payload_buf.len() >= size);
  134. match f(header.as_mut().unwrap(), &mut payload_buf[..size]) {
  135. Ok(val) => (size, Ok(val)),
  136. Err(err) => (0, Err(err)),
  137. }
  138. })
  139. .1
  140. })
  141. }
  142. /// Dequeue a single packet from the buffer, and return a reference to its payload
  143. /// as well as its header, or return `Err(Error::Exhausted)` if the buffer is empty.
  144. pub fn dequeue(&mut self) -> Result<(H, &mut [u8])> {
  145. self.dequeue_padding();
  146. let PacketMetadata {
  147. ref mut header,
  148. size,
  149. } = *self.metadata_ring.dequeue_one()?;
  150. let payload_buf = self.payload_ring.dequeue_many(size);
  151. debug_assert!(payload_buf.len() == size);
  152. Ok((header.take().unwrap(), payload_buf))
  153. }
  154. /// Peek at a single packet from the buffer without removing it, and return a reference to
  155. /// its payload as well as its header, or return `Err(Error:Exhaused)` if the buffer is empty.
  156. ///
  157. /// This function otherwise behaves identically to [dequeue](#method.dequeue).
  158. pub fn peek(&mut self) -> Result<(&H, &[u8])> {
  159. self.dequeue_padding();
  160. if let Some(metadata) = self.metadata_ring.get_allocated(0, 1).first() {
  161. Ok((
  162. metadata.header.as_ref().unwrap(),
  163. self.payload_ring.get_allocated(0, metadata.size),
  164. ))
  165. } else {
  166. Err(Error::Exhausted)
  167. }
  168. }
  169. /// Return the maximum number packets that can be stored.
  170. pub fn packet_capacity(&self) -> usize {
  171. self.metadata_ring.capacity()
  172. }
  173. /// Return the maximum number of bytes in the payload ring buffer.
  174. pub fn payload_capacity(&self) -> usize {
  175. self.payload_ring.capacity()
  176. }
  177. /// Reset the packet buffer and clear any staged.
  178. #[allow(unused)]
  179. pub(crate) fn reset(&mut self) {
  180. self.payload_ring.clear();
  181. self.metadata_ring.clear();
  182. }
  183. }
  184. #[cfg(test)]
  185. mod test {
  186. use super::*;
  187. fn buffer() -> PacketBuffer<'static, ()> {
  188. PacketBuffer::new(vec![PacketMetadata::EMPTY; 4], vec![0u8; 16])
  189. }
  190. #[test]
  191. fn test_simple() {
  192. let mut buffer = buffer();
  193. buffer.enqueue(6, ()).unwrap().copy_from_slice(b"abcdef");
  194. assert_eq!(buffer.enqueue(16, ()), Err(Error::Exhausted));
  195. assert_eq!(buffer.metadata_ring.len(), 1);
  196. assert_eq!(buffer.dequeue().unwrap().1, &b"abcdef"[..]);
  197. assert_eq!(buffer.dequeue(), Err(Error::Exhausted));
  198. }
  199. #[test]
  200. fn test_peek() {
  201. let mut buffer = buffer();
  202. assert_eq!(buffer.peek(), Err(Error::Exhausted));
  203. buffer.enqueue(6, ()).unwrap().copy_from_slice(b"abcdef");
  204. assert_eq!(buffer.metadata_ring.len(), 1);
  205. assert_eq!(buffer.peek().unwrap().1, &b"abcdef"[..]);
  206. assert_eq!(buffer.dequeue().unwrap().1, &b"abcdef"[..]);
  207. assert_eq!(buffer.peek(), Err(Error::Exhausted));
  208. }
  209. #[test]
  210. fn test_padding() {
  211. let mut buffer = buffer();
  212. assert!(buffer.enqueue(6, ()).is_ok());
  213. assert!(buffer.enqueue(8, ()).is_ok());
  214. assert!(buffer.dequeue().is_ok());
  215. buffer.enqueue(4, ()).unwrap().copy_from_slice(b"abcd");
  216. assert_eq!(buffer.metadata_ring.len(), 3);
  217. assert!(buffer.dequeue().is_ok());
  218. assert_eq!(buffer.dequeue().unwrap().1, &b"abcd"[..]);
  219. assert_eq!(buffer.metadata_ring.len(), 0);
  220. }
  221. #[test]
  222. fn test_padding_with_large_payload() {
  223. let mut buffer = buffer();
  224. assert!(buffer.enqueue(12, ()).is_ok());
  225. assert!(buffer.dequeue().is_ok());
  226. buffer
  227. .enqueue(12, ())
  228. .unwrap()
  229. .copy_from_slice(b"abcdefghijkl");
  230. }
  231. #[test]
  232. fn test_dequeue_with() {
  233. let mut buffer = buffer();
  234. assert!(buffer.enqueue(6, ()).is_ok());
  235. assert!(buffer.enqueue(8, ()).is_ok());
  236. assert!(buffer.dequeue().is_ok());
  237. buffer.enqueue(4, ()).unwrap().copy_from_slice(b"abcd");
  238. assert_eq!(buffer.metadata_ring.len(), 3);
  239. assert!(buffer.dequeue().is_ok());
  240. assert!(buffer
  241. .dequeue_with(|_, _| Err(Error::Unaddressable) as Result<()>)
  242. .is_err());
  243. assert_eq!(buffer.metadata_ring.len(), 1);
  244. assert!(buffer
  245. .dequeue_with(|&mut (), payload| {
  246. assert_eq!(payload, &b"abcd"[..]);
  247. Ok(())
  248. })
  249. .is_ok());
  250. assert_eq!(buffer.metadata_ring.len(), 0);
  251. }
  252. #[test]
  253. fn test_metadata_full_empty() {
  254. let mut buffer = buffer();
  255. assert!(buffer.is_empty());
  256. assert!(!buffer.is_full());
  257. assert!(buffer.enqueue(1, ()).is_ok());
  258. assert!(!buffer.is_empty());
  259. assert!(buffer.enqueue(1, ()).is_ok());
  260. assert!(buffer.enqueue(1, ()).is_ok());
  261. assert!(!buffer.is_full());
  262. assert!(!buffer.is_empty());
  263. assert!(buffer.enqueue(1, ()).is_ok());
  264. assert!(buffer.is_full());
  265. assert!(!buffer.is_empty());
  266. assert_eq!(buffer.metadata_ring.len(), 4);
  267. assert_eq!(buffer.enqueue(1, ()), Err(Error::Exhausted));
  268. }
  269. #[test]
  270. fn test_window_too_small() {
  271. let mut buffer = buffer();
  272. assert!(buffer.enqueue(4, ()).is_ok());
  273. assert!(buffer.enqueue(8, ()).is_ok());
  274. assert!(buffer.dequeue().is_ok());
  275. assert_eq!(buffer.enqueue(16, ()), Err(Error::Exhausted));
  276. assert_eq!(buffer.metadata_ring.len(), 1);
  277. }
  278. #[test]
  279. fn test_contiguous_window_too_small() {
  280. let mut buffer = buffer();
  281. assert!(buffer.enqueue(4, ()).is_ok());
  282. assert!(buffer.enqueue(8, ()).is_ok());
  283. assert!(buffer.dequeue().is_ok());
  284. assert_eq!(buffer.enqueue(8, ()), Err(Error::Exhausted));
  285. assert_eq!(buffer.metadata_ring.len(), 1);
  286. }
  287. #[test]
  288. fn test_capacity_too_small() {
  289. let mut buffer = buffer();
  290. assert_eq!(buffer.enqueue(32, ()), Err(Error::Truncated));
  291. }
  292. #[test]
  293. fn test_contig_window_prioritized() {
  294. let mut buffer = buffer();
  295. assert!(buffer.enqueue(4, ()).is_ok());
  296. assert!(buffer.dequeue().is_ok());
  297. assert!(buffer.enqueue(5, ()).is_ok());
  298. }
  299. #[test]
  300. fn clear() {
  301. let mut buffer = buffer();
  302. // Ensure enqueuing data in teh buffer fills it somewhat.
  303. assert!(buffer.is_empty());
  304. assert!(buffer.enqueue(6, ()).is_ok());
  305. // Ensure that resetting the buffer causes it to be empty.
  306. assert!(!buffer.is_empty());
  307. buffer.reset();
  308. assert!(buffer.is_empty());
  309. }
  310. }