packet_buffer.rs 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328
  1. use managed::ManagedSlice;
  2. use crate::{Error, Result};
  3. use crate::storage::RingBuffer;
  4. /// Size and header of a packet.
  5. #[derive(Debug, Clone, Copy)]
  6. #[cfg_attr(feature = "defmt", derive(defmt::Format))]
  7. pub struct PacketMetadata<H> {
  8. size: usize,
  9. header: Option<H>
  10. }
  11. impl<H> PacketMetadata<H> {
  12. /// Empty packet description.
  13. pub const EMPTY: PacketMetadata<H> = PacketMetadata { size: 0, header: None };
  14. fn padding(size: usize) -> PacketMetadata<H> {
  15. PacketMetadata {
  16. size: size,
  17. header: None
  18. }
  19. }
  20. fn packet(size: usize, header: H) -> PacketMetadata<H> {
  21. PacketMetadata {
  22. size: size,
  23. header: Some(header)
  24. }
  25. }
  26. fn is_padding(&self) -> bool {
  27. self.header.is_none()
  28. }
  29. }
  30. /// An UDP packet ring buffer.
  31. #[derive(Debug)]
  32. pub struct PacketBuffer<'a, H: 'a> {
  33. metadata_ring: RingBuffer<'a, PacketMetadata<H>>,
  34. payload_ring: RingBuffer<'a, u8>,
  35. }
  36. impl<'a, H> PacketBuffer<'a, H> {
  37. /// Create a new packet buffer with the provided metadata and payload storage.
  38. ///
  39. /// Metadata storage limits the maximum _number_ of packets in the buffer and payload
  40. /// storage limits the maximum _total size_ of packets.
  41. pub fn new<MS, PS>(metadata_storage: MS, payload_storage: PS) -> PacketBuffer<'a, H>
  42. where MS: Into<ManagedSlice<'a, PacketMetadata<H>>>,
  43. PS: Into<ManagedSlice<'a, u8>>,
  44. {
  45. PacketBuffer {
  46. metadata_ring: RingBuffer::new(metadata_storage),
  47. payload_ring: RingBuffer::new(payload_storage),
  48. }
  49. }
  50. /// Query whether the buffer is empty.
  51. pub fn is_empty(&self) -> bool {
  52. self.metadata_ring.is_empty()
  53. }
  54. /// Query whether the buffer is full.
  55. pub fn is_full(&self) -> bool {
  56. self.metadata_ring.is_full()
  57. }
  58. // There is currently no enqueue_with() because of the complexity of managing padding
  59. // in case of failure.
  60. /// Enqueue a single packet with the given header into the buffer, and
  61. /// return a reference to its payload, or return `Err(Error::Exhausted)`
  62. /// if the buffer is full, or return `Err(Error::Truncated)` if the buffer
  63. /// does not have enough spare payload space.
  64. pub fn enqueue(&mut self, size: usize, header: H) -> Result<&mut [u8]> {
  65. if self.payload_ring.capacity() < size {
  66. return Err(Error::Truncated)
  67. }
  68. if self.metadata_ring.is_full() {
  69. return Err(Error::Exhausted)
  70. }
  71. let window = self.payload_ring.window();
  72. let contig_window = self.payload_ring.contiguous_window();
  73. if window < size {
  74. return Err(Error::Exhausted)
  75. } else if contig_window < size {
  76. if window - contig_window < size {
  77. // The buffer length is larger than the current contiguous window
  78. // and is larger than the contiguous window will be after adding
  79. // the padding necessary to circle around to the beginning of the
  80. // ring buffer.
  81. return Err(Error::Exhausted)
  82. } else {
  83. // Add padding to the end of the ring buffer so that the
  84. // contiguous window is at the beginning of the ring buffer.
  85. *self.metadata_ring.enqueue_one()? = PacketMetadata::padding(contig_window);
  86. self.payload_ring.enqueue_many(contig_window);
  87. }
  88. }
  89. *self.metadata_ring.enqueue_one()? = PacketMetadata::packet(size, header);
  90. let payload_buf = self.payload_ring.enqueue_many(size);
  91. debug_assert!(payload_buf.len() == size);
  92. Ok(payload_buf)
  93. }
  94. fn dequeue_padding(&mut self) {
  95. let Self { ref mut metadata_ring, ref mut payload_ring } = *self;
  96. let _ = metadata_ring.dequeue_one_with(|metadata| {
  97. if metadata.is_padding() {
  98. payload_ring.dequeue_many(metadata.size);
  99. Ok(()) // dequeue metadata
  100. } else {
  101. Err(Error::Exhausted) // don't dequeue metadata
  102. }
  103. });
  104. }
  105. /// Call `f` with a single packet from the buffer, and dequeue the packet if `f`
  106. /// returns successfully, or return `Err(Error::Exhausted)` if the buffer is empty.
  107. pub fn dequeue_with<'c, R, F>(&'c mut self, f: F) -> Result<R>
  108. where F: FnOnce(&mut H, &'c mut [u8]) -> Result<R> {
  109. self.dequeue_padding();
  110. let Self { ref mut metadata_ring, ref mut payload_ring } = *self;
  111. metadata_ring.dequeue_one_with(move |metadata| {
  112. let PacketMetadata { ref mut header, size } = *metadata;
  113. payload_ring.dequeue_many_with(|payload_buf| {
  114. debug_assert!(payload_buf.len() >= size);
  115. match f(header.as_mut().unwrap(), &mut payload_buf[..size]) {
  116. Ok(val) => (size, Ok(val)),
  117. Err(err) => (0, Err(err)),
  118. }
  119. }).1
  120. })
  121. }
  122. /// Dequeue a single packet from the buffer, and return a reference to its payload
  123. /// as well as its header, or return `Err(Error::Exhausted)` if the buffer is empty.
  124. pub fn dequeue(&mut self) -> Result<(H, &mut [u8])> {
  125. self.dequeue_padding();
  126. let PacketMetadata { ref mut header, size } = *self.metadata_ring.dequeue_one()?;
  127. let payload_buf = self.payload_ring.dequeue_many(size);
  128. debug_assert!(payload_buf.len() == size);
  129. Ok((header.take().unwrap(), payload_buf))
  130. }
  131. /// Peek at a single packet from the buffer without removing it, and return a reference to
  132. /// its payload as well as its header, or return `Err(Error:Exhaused)` if the buffer is empty.
  133. ///
  134. /// This function otherwise behaves identically to [dequeue](#method.dequeue).
  135. pub fn peek(&mut self) -> Result<(&H, &[u8])> {
  136. self.dequeue_padding();
  137. if let Some(metadata) = self.metadata_ring.get_allocated(0, 1).first() {
  138. Ok((metadata.header.as_ref().unwrap(), self.payload_ring.get_allocated(0, metadata.size)))
  139. } else {
  140. Err(Error::Exhausted)
  141. }
  142. }
  143. /// Return the maximum number packets that can be stored.
  144. pub fn packet_capacity(&self) -> usize {
  145. self.metadata_ring.capacity()
  146. }
  147. /// Return the maximum number of bytes in the payload ring buffer.
  148. pub fn payload_capacity(&self) -> usize {
  149. self.payload_ring.capacity()
  150. }
  151. /// Reset the packet buffer and clear any staged.
  152. #[cfg(feature = "socket-udp")]
  153. pub(crate) fn reset(&mut self) {
  154. self.payload_ring.clear();
  155. self.metadata_ring.clear();
  156. }
  157. }
  158. #[cfg(test)]
  159. mod test {
  160. use super::*;
  161. fn buffer() -> PacketBuffer<'static, ()> {
  162. PacketBuffer::new(vec![PacketMetadata::EMPTY; 4],
  163. vec![0u8; 16])
  164. }
  165. #[test]
  166. fn test_simple() {
  167. let mut buffer = buffer();
  168. buffer.enqueue(6, ()).unwrap().copy_from_slice(b"abcdef");
  169. assert_eq!(buffer.enqueue(16, ()), Err(Error::Exhausted));
  170. assert_eq!(buffer.metadata_ring.len(), 1);
  171. assert_eq!(buffer.dequeue().unwrap().1, &b"abcdef"[..]);
  172. assert_eq!(buffer.dequeue(), Err(Error::Exhausted));
  173. }
  174. #[test]
  175. fn test_peek() {
  176. let mut buffer = buffer();
  177. assert_eq!(buffer.peek(), Err(Error::Exhausted));
  178. buffer.enqueue(6, ()).unwrap().copy_from_slice(b"abcdef");
  179. assert_eq!(buffer.metadata_ring.len(), 1);
  180. assert_eq!(buffer.peek().unwrap().1, &b"abcdef"[..]);
  181. assert_eq!(buffer.dequeue().unwrap().1, &b"abcdef"[..]);
  182. assert_eq!(buffer.peek(), Err(Error::Exhausted));
  183. }
  184. #[test]
  185. fn test_padding() {
  186. let mut buffer = buffer();
  187. assert!(buffer.enqueue(6, ()).is_ok());
  188. assert!(buffer.enqueue(8, ()).is_ok());
  189. assert!(buffer.dequeue().is_ok());
  190. buffer.enqueue(4, ()).unwrap().copy_from_slice(b"abcd");
  191. assert_eq!(buffer.metadata_ring.len(), 3);
  192. assert!(buffer.dequeue().is_ok());
  193. assert_eq!(buffer.dequeue().unwrap().1, &b"abcd"[..]);
  194. assert_eq!(buffer.metadata_ring.len(), 0);
  195. }
  196. #[test]
  197. fn test_padding_with_large_payload() {
  198. let mut buffer = buffer();
  199. assert!(buffer.enqueue(12, ()).is_ok());
  200. assert!(buffer.dequeue().is_ok());
  201. buffer.enqueue(12, ()).unwrap().copy_from_slice(b"abcdefghijkl");
  202. }
  203. #[test]
  204. fn test_dequeue_with() {
  205. let mut buffer = buffer();
  206. assert!(buffer.enqueue(6, ()).is_ok());
  207. assert!(buffer.enqueue(8, ()).is_ok());
  208. assert!(buffer.dequeue().is_ok());
  209. buffer.enqueue(4, ()).unwrap().copy_from_slice(b"abcd");
  210. assert_eq!(buffer.metadata_ring.len(), 3);
  211. assert!(buffer.dequeue().is_ok());
  212. assert!(buffer.dequeue_with(|_, _| Err(Error::Unaddressable) as Result<()>).is_err());
  213. assert_eq!(buffer.metadata_ring.len(), 1);
  214. assert!(buffer.dequeue_with(|&mut (), payload| {
  215. assert_eq!(payload, &b"abcd"[..]);
  216. Ok(())
  217. }).is_ok());
  218. assert_eq!(buffer.metadata_ring.len(), 0);
  219. }
  220. #[test]
  221. fn test_metadata_full_empty() {
  222. let mut buffer = buffer();
  223. assert_eq!(buffer.is_empty(), true);
  224. assert_eq!(buffer.is_full(), false);
  225. assert!(buffer.enqueue(1, ()).is_ok());
  226. assert_eq!(buffer.is_empty(), false);
  227. assert!(buffer.enqueue(1, ()).is_ok());
  228. assert!(buffer.enqueue(1, ()).is_ok());
  229. assert_eq!(buffer.is_full(), false);
  230. assert_eq!(buffer.is_empty(), false);
  231. assert!(buffer.enqueue(1, ()).is_ok());
  232. assert_eq!(buffer.is_full(), true);
  233. assert_eq!(buffer.is_empty(), false);
  234. assert_eq!(buffer.metadata_ring.len(), 4);
  235. assert_eq!(buffer.enqueue(1, ()), Err(Error::Exhausted));
  236. }
  237. #[test]
  238. fn test_window_too_small() {
  239. let mut buffer = buffer();
  240. assert!(buffer.enqueue(4, ()).is_ok());
  241. assert!(buffer.enqueue(8, ()).is_ok());
  242. assert!(buffer.dequeue().is_ok());
  243. assert_eq!(buffer.enqueue(16, ()), Err(Error::Exhausted));
  244. assert_eq!(buffer.metadata_ring.len(), 1);
  245. }
  246. #[test]
  247. fn test_contiguous_window_too_small() {
  248. let mut buffer = buffer();
  249. assert!(buffer.enqueue(4, ()).is_ok());
  250. assert!(buffer.enqueue(8, ()).is_ok());
  251. assert!(buffer.dequeue().is_ok());
  252. assert_eq!(buffer.enqueue(8, ()), Err(Error::Exhausted));
  253. assert_eq!(buffer.metadata_ring.len(), 1);
  254. }
  255. #[test]
  256. fn test_capacity_too_small() {
  257. let mut buffer = buffer();
  258. assert_eq!(buffer.enqueue(32, ()), Err(Error::Truncated));
  259. }
  260. #[test]
  261. fn test_contig_window_prioritized() {
  262. let mut buffer = buffer();
  263. assert!(buffer.enqueue(4, ()).is_ok());
  264. assert!(buffer.dequeue().is_ok());
  265. assert!(buffer.enqueue(5, ()).is_ok());
  266. }
  267. #[test]
  268. fn clear() {
  269. let mut buffer = buffer();
  270. // Ensure enqueuing data in teh buffer fills it somewhat.
  271. assert!(buffer.is_empty());
  272. assert!(buffer.enqueue(6, ()).is_ok());
  273. // Ensure that resetting the buffer causes it to be empty.
  274. assert!(!buffer.is_empty());
  275. buffer.reset();
  276. assert!(buffer.is_empty());
  277. }
  278. }