packet_buffer.rs 11 KB


  1. use managed::ManagedSlice;
  2. use crate::storage::RingBuffer;
  3. use crate::{Error, Result};
  4. /// Size and header of a packet.
  5. #[derive(Debug, Clone, Copy)]
  6. #[cfg_attr(feature = "defmt", derive(defmt::Format))]
  7. pub struct PacketMetadata<H> {
  8. size: usize,
  9. header: Option<H>,
  10. }
  11. impl<H> PacketMetadata<H> {
  12. /// Empty packet description.
  13. pub const EMPTY: PacketMetadata<H> = PacketMetadata {
  14. size: 0,
  15. header: None,
  16. };
  17. fn padding(size: usize) -> PacketMetadata<H> {
  18. PacketMetadata {
  19. size: size,
  20. header: None,
  21. }
  22. }
  23. fn packet(size: usize, header: H) -> PacketMetadata<H> {
  24. PacketMetadata {
  25. size: size,
  26. header: Some(header),
  27. }
  28. }
  29. fn is_padding(&self) -> bool {
  30. self.header.is_none()
  31. }
  32. }
  33. /// An UDP packet ring buffer.
  34. #[derive(Debug)]
  35. pub struct PacketBuffer<'a, H: 'a> {
  36. metadata_ring: RingBuffer<'a, PacketMetadata<H>>,
  37. payload_ring: RingBuffer<'a, u8>,
  38. }
  39. impl<'a, H> PacketBuffer<'a, H> {
  40. /// Create a new packet buffer with the provided metadata and payload storage.
  41. ///
  42. /// Metadata storage limits the maximum _number_ of packets in the buffer and payload
  43. /// storage limits the maximum _total size_ of packets.
  44. pub fn new<MS, PS>(metadata_storage: MS, payload_storage: PS) -> PacketBuffer<'a, H>
  45. where
  46. MS: Into<ManagedSlice<'a, PacketMetadata<H>>>,
  47. PS: Into<ManagedSlice<'a, u8>>,
  48. {
  49. PacketBuffer {
  50. metadata_ring: RingBuffer::new(metadata_storage),
  51. payload_ring: RingBuffer::new(payload_storage),
  52. }
  53. }
  54. /// Query whether the buffer is empty.
  55. pub fn is_empty(&self) -> bool {
  56. self.metadata_ring.is_empty()
  57. }
  58. /// Query whether the buffer is full.
  59. pub fn is_full(&self) -> bool {
  60. self.metadata_ring.is_full()
  61. }
  62. // There is currently no enqueue_with() because of the complexity of managing padding
  63. // in case of failure.
  64. /// Enqueue a single packet with the given header into the buffer, and
  65. /// return a reference to its payload, or return `Err(Error::Exhausted)`
  66. /// if the buffer is full, or return `Err(Error::Truncated)` if the buffer
  67. /// does not have enough spare payload space.
  68. pub fn enqueue(&mut self, size: usize, header: H) -> Result<&mut [u8]> {
  69. if self.payload_ring.capacity() < size {
  70. return Err(Error::Truncated);
  71. }
  72. if self.metadata_ring.is_full() {
  73. return Err(Error::Exhausted);
  74. }
  75. let window = self.payload_ring.window();
  76. let contig_window = self.payload_ring.contiguous_window();
  77. if window < size {
  78. return Err(Error::Exhausted);
  79. } else if contig_window < size {
  80. if window - contig_window < size {
  81. // The buffer length is larger than the current contiguous window
  82. // and is larger than the contiguous window will be after adding
  83. // the padding necessary to circle around to the beginning of the
  84. // ring buffer.
  85. return Err(Error::Exhausted);
  86. } else {
  87. // Add padding to the end of the ring buffer so that the
  88. // contiguous window is at the beginning of the ring buffer.
  89. *self.metadata_ring.enqueue_one()? = PacketMetadata::padding(contig_window);
  90. self.payload_ring.enqueue_many(contig_window);
  91. }
  92. }
  93. *self.metadata_ring.enqueue_one()? = PacketMetadata::packet(size, header);
  94. let payload_buf = self.payload_ring.enqueue_many(size);
  95. debug_assert!(payload_buf.len() == size);
  96. Ok(payload_buf)
  97. }
  98. fn dequeue_padding(&mut self) {
  99. let Self {
  100. ref mut metadata_ring,
  101. ref mut payload_ring,
  102. } = *self;
  103. let _ = metadata_ring.dequeue_one_with(|metadata| {
  104. if metadata.is_padding() {
  105. payload_ring.dequeue_many(metadata.size);
  106. Ok(()) // dequeue metadata
  107. } else {
  108. Err(Error::Exhausted) // don't dequeue metadata
  109. }
  110. });
  111. }
  112. /// Call `f` with a single packet from the buffer, and dequeue the packet if `f`
  113. /// returns successfully, or return `Err(Error::Exhausted)` if the buffer is empty.
  114. pub fn dequeue_with<'c, R, F>(&'c mut self, f: F) -> Result<R>
  115. where
  116. F: FnOnce(&mut H, &'c mut [u8]) -> Result<R>,
  117. {
  118. self.dequeue_padding();
  119. let Self {
  120. ref mut metadata_ring,
  121. ref mut payload_ring,
  122. } = *self;
  123. metadata_ring.dequeue_one_with(move |metadata| {
  124. let PacketMetadata {
  125. ref mut header,
  126. size,
  127. } = *metadata;
  128. payload_ring
  129. .dequeue_many_with(|payload_buf| {
  130. debug_assert!(payload_buf.len() >= size);
  131. match f(header.as_mut().unwrap(), &mut payload_buf[..size]) {
  132. Ok(val) => (size, Ok(val)),
  133. Err(err) => (0, Err(err)),
  134. }
  135. })
  136. .1
  137. })
  138. }
  139. /// Dequeue a single packet from the buffer, and return a reference to its payload
  140. /// as well as its header, or return `Err(Error::Exhausted)` if the buffer is empty.
  141. pub fn dequeue(&mut self) -> Result<(H, &mut [u8])> {
  142. self.dequeue_padding();
  143. let PacketMetadata {
  144. ref mut header,
  145. size,
  146. } = *self.metadata_ring.dequeue_one()?;
  147. let payload_buf = self.payload_ring.dequeue_many(size);
  148. debug_assert!(payload_buf.len() == size);
  149. Ok((header.take().unwrap(), payload_buf))
  150. }
  151. /// Peek at a single packet from the buffer without removing it, and return a reference to
  152. /// its payload as well as its header, or return `Err(Error:Exhaused)` if the buffer is empty.
  153. ///
  154. /// This function otherwise behaves identically to [dequeue](#method.dequeue).
  155. pub fn peek(&mut self) -> Result<(&H, &[u8])> {
  156. self.dequeue_padding();
  157. if let Some(metadata) = self.metadata_ring.get_allocated(0, 1).first() {
  158. Ok((
  159. metadata.header.as_ref().unwrap(),
  160. self.payload_ring.get_allocated(0, metadata.size),
  161. ))
  162. } else {
  163. Err(Error::Exhausted)
  164. }
  165. }
  166. /// Return the maximum number packets that can be stored.
  167. pub fn packet_capacity(&self) -> usize {
  168. self.metadata_ring.capacity()
  169. }
  170. /// Return the maximum number of bytes in the payload ring buffer.
  171. pub fn payload_capacity(&self) -> usize {
  172. self.payload_ring.capacity()
  173. }
  174. /// Reset the packet buffer and clear any staged.
  175. #[allow(unused)]
  176. pub(crate) fn reset(&mut self) {
  177. self.payload_ring.clear();
  178. self.metadata_ring.clear();
  179. }
  180. }
  181. #[cfg(test)]
  182. mod test {
  183. use super::*;
  184. fn buffer() -> PacketBuffer<'static, ()> {
  185. PacketBuffer::new(vec![PacketMetadata::EMPTY; 4], vec![0u8; 16])
  186. }
  187. #[test]
  188. fn test_simple() {
  189. let mut buffer = buffer();
  190. buffer.enqueue(6, ()).unwrap().copy_from_slice(b"abcdef");
  191. assert_eq!(buffer.enqueue(16, ()), Err(Error::Exhausted));
  192. assert_eq!(buffer.metadata_ring.len(), 1);
  193. assert_eq!(buffer.dequeue().unwrap().1, &b"abcdef"[..]);
  194. assert_eq!(buffer.dequeue(), Err(Error::Exhausted));
  195. }
  196. #[test]
  197. fn test_peek() {
  198. let mut buffer = buffer();
  199. assert_eq!(buffer.peek(), Err(Error::Exhausted));
  200. buffer.enqueue(6, ()).unwrap().copy_from_slice(b"abcdef");
  201. assert_eq!(buffer.metadata_ring.len(), 1);
  202. assert_eq!(buffer.peek().unwrap().1, &b"abcdef"[..]);
  203. assert_eq!(buffer.dequeue().unwrap().1, &b"abcdef"[..]);
  204. assert_eq!(buffer.peek(), Err(Error::Exhausted));
  205. }
  206. #[test]
  207. fn test_padding() {
  208. let mut buffer = buffer();
  209. assert!(buffer.enqueue(6, ()).is_ok());
  210. assert!(buffer.enqueue(8, ()).is_ok());
  211. assert!(buffer.dequeue().is_ok());
  212. buffer.enqueue(4, ()).unwrap().copy_from_slice(b"abcd");
  213. assert_eq!(buffer.metadata_ring.len(), 3);
  214. assert!(buffer.dequeue().is_ok());
  215. assert_eq!(buffer.dequeue().unwrap().1, &b"abcd"[..]);
  216. assert_eq!(buffer.metadata_ring.len(), 0);
  217. }
  218. #[test]
  219. fn test_padding_with_large_payload() {
  220. let mut buffer = buffer();
  221. assert!(buffer.enqueue(12, ()).is_ok());
  222. assert!(buffer.dequeue().is_ok());
  223. buffer
  224. .enqueue(12, ())
  225. .unwrap()
  226. .copy_from_slice(b"abcdefghijkl");
  227. }
  228. #[test]
  229. fn test_dequeue_with() {
  230. let mut buffer = buffer();
  231. assert!(buffer.enqueue(6, ()).is_ok());
  232. assert!(buffer.enqueue(8, ()).is_ok());
  233. assert!(buffer.dequeue().is_ok());
  234. buffer.enqueue(4, ()).unwrap().copy_from_slice(b"abcd");
  235. assert_eq!(buffer.metadata_ring.len(), 3);
  236. assert!(buffer.dequeue().is_ok());
  237. assert!(buffer
  238. .dequeue_with(|_, _| Err(Error::Unaddressable) as Result<()>)
  239. .is_err());
  240. assert_eq!(buffer.metadata_ring.len(), 1);
  241. assert!(buffer
  242. .dequeue_with(|&mut (), payload| {
  243. assert_eq!(payload, &b"abcd"[..]);
  244. Ok(())
  245. })
  246. .is_ok());
  247. assert_eq!(buffer.metadata_ring.len(), 0);
  248. }
  249. #[test]
  250. fn test_metadata_full_empty() {
  251. let mut buffer = buffer();
  252. assert!(buffer.is_empty());
  253. assert!(!buffer.is_full());
  254. assert!(buffer.enqueue(1, ()).is_ok());
  255. assert!(!buffer.is_empty());
  256. assert!(buffer.enqueue(1, ()).is_ok());
  257. assert!(buffer.enqueue(1, ()).is_ok());
  258. assert!(!buffer.is_full());
  259. assert!(!buffer.is_empty());
  260. assert!(buffer.enqueue(1, ()).is_ok());
  261. assert!(buffer.is_full());
  262. assert!(!buffer.is_empty());
  263. assert_eq!(buffer.metadata_ring.len(), 4);
  264. assert_eq!(buffer.enqueue(1, ()), Err(Error::Exhausted));
  265. }
  266. #[test]
  267. fn test_window_too_small() {
  268. let mut buffer = buffer();
  269. assert!(buffer.enqueue(4, ()).is_ok());
  270. assert!(buffer.enqueue(8, ()).is_ok());
  271. assert!(buffer.dequeue().is_ok());
  272. assert_eq!(buffer.enqueue(16, ()), Err(Error::Exhausted));
  273. assert_eq!(buffer.metadata_ring.len(), 1);
  274. }
  275. #[test]
  276. fn test_contiguous_window_too_small() {
  277. let mut buffer = buffer();
  278. assert!(buffer.enqueue(4, ()).is_ok());
  279. assert!(buffer.enqueue(8, ()).is_ok());
  280. assert!(buffer.dequeue().is_ok());
  281. assert_eq!(buffer.enqueue(8, ()), Err(Error::Exhausted));
  282. assert_eq!(buffer.metadata_ring.len(), 1);
  283. }
  284. #[test]
  285. fn test_capacity_too_small() {
  286. let mut buffer = buffer();
  287. assert_eq!(buffer.enqueue(32, ()), Err(Error::Truncated));
  288. }
  289. #[test]
  290. fn test_contig_window_prioritized() {
  291. let mut buffer = buffer();
  292. assert!(buffer.enqueue(4, ()).is_ok());
  293. assert!(buffer.dequeue().is_ok());
  294. assert!(buffer.enqueue(5, ()).is_ok());
  295. }
  296. #[test]
  297. fn clear() {
  298. let mut buffer = buffer();
  299. // Ensure enqueuing data in teh buffer fills it somewhat.
  300. assert!(buffer.is_empty());
  301. assert!(buffer.enqueue(6, ()).is_ok());
  302. // Ensure that resetting the buffer causes it to be empty.
  303. assert!(!buffer.is_empty());
  304. buffer.reset();
  305. assert!(buffer.is_empty());
  306. }
  307. }