packet_buffer.rs 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298
  1. use managed::ManagedSlice;
  2. use {Error, Result};
  3. use super::RingBuffer;
  4. /// Size and header of a packet.
  5. #[derive(Debug, Clone, Copy)]
  6. pub struct PacketMetadata<H> {
  7. size: usize,
  8. header: Option<H>
  9. }
  10. impl<H> PacketMetadata<H> {
  11. /// Empty packet description.
  12. pub const EMPTY: PacketMetadata<H> = PacketMetadata { size: 0, header: None };
  13. fn padding(size: usize) -> PacketMetadata<H> {
  14. PacketMetadata {
  15. size: size,
  16. header: None
  17. }
  18. }
  19. fn packet(size: usize, header: H) -> PacketMetadata<H> {
  20. PacketMetadata {
  21. size: size,
  22. header: Some(header)
  23. }
  24. }
  25. fn is_padding(&self) -> bool {
  26. self.header.is_none()
  27. }
  28. }
  29. /// An UDP packet ring buffer.
  30. #[derive(Debug)]
  31. pub struct PacketBuffer<'a, 'b, H: 'a> {
  32. metadata_ring: RingBuffer<'a, PacketMetadata<H>>,
  33. payload_ring: RingBuffer<'b, u8>,
  34. }
  35. impl<'a, 'b, H> PacketBuffer<'a, 'b, H> {
  36. /// Create a new packet buffer with the provided metadata and payload storage.
  37. ///
  38. /// Metadata storage limits the maximum _number_ of packets in the buffer and payload
  39. /// storage limits the maximum _total size_ of packets.
  40. pub fn new<MS, PS>(metadata_storage: MS, payload_storage: PS) -> PacketBuffer<'a, 'b, H>
  41. where MS: Into<ManagedSlice<'a, PacketMetadata<H>>>,
  42. PS: Into<ManagedSlice<'b, u8>>,
  43. {
  44. PacketBuffer {
  45. metadata_ring: RingBuffer::new(metadata_storage),
  46. payload_ring: RingBuffer::new(payload_storage),
  47. }
  48. }
  49. /// Query whether the buffer is empty.
  50. pub fn is_empty(&self) -> bool {
  51. self.metadata_ring.is_empty()
  52. }
  53. /// Query whether the buffer is full.
  54. pub fn is_full(&self) -> bool {
  55. self.metadata_ring.is_full()
  56. }
  57. // There is currently no enqueue_with() because of the complexity of managing padding
  58. // in case of failure.
  59. /// Enqueue a single packet with the given header into the buffer, and
  60. /// return a reference to its payload, or return `Err(Error::Exhausted)`
  61. /// if the buffer is full, or return `Err(Error::Truncated)` if the buffer
  62. /// does not have enough spare payload space.
  63. pub fn enqueue(&mut self, size: usize, header: H) -> Result<&mut [u8]> {
  64. if self.payload_ring.capacity() < size {
  65. return Err(Error::Truncated)
  66. }
  67. if self.metadata_ring.is_full() {
  68. return Err(Error::Exhausted)
  69. }
  70. let window = self.payload_ring.window();
  71. let contig_window = self.payload_ring.contiguous_window();
  72. if window < size {
  73. return Err(Error::Exhausted)
  74. } else if contig_window < size {
  75. if window - contig_window < size {
  76. // The buffer length is larger than the current contiguous window
  77. // and is larger than the contiguous window will be after adding
  78. // the padding necessary to circle around to the beginning of the
  79. // ring buffer.
  80. return Err(Error::Exhausted)
  81. } else {
  82. // Add padding to the end of the ring buffer so that the
  83. // contiguous window is at the beginning of the ring buffer.
  84. *self.metadata_ring.enqueue_one()? = PacketMetadata::padding(size);
  85. self.payload_ring.enqueue_many(size);
  86. }
  87. }
  88. *self.metadata_ring.enqueue_one()? = PacketMetadata::packet(size, header);
  89. let payload_buf = self.payload_ring.enqueue_many(size);
  90. debug_assert!(payload_buf.len() == size);
  91. Ok(payload_buf)
  92. }
  93. fn dequeue_padding(&mut self) {
  94. let Self { ref mut metadata_ring, ref mut payload_ring } = *self;
  95. let _ = metadata_ring.dequeue_one_with(|metadata| {
  96. if metadata.is_padding() {
  97. payload_ring.dequeue_many(metadata.size);
  98. Ok(()) // dequeue metadata
  99. } else {
  100. Err(Error::Exhausted) // don't dequeue metadata
  101. }
  102. });
  103. }
  104. /// Call `f` with a single packet from the buffer, and dequeue the packet if `f`
  105. /// returns successfully, or return `Err(Error::Exhausted)` if the buffer is empty.
  106. pub fn dequeue_with<'c, R, F>(&'c mut self, f: F) -> Result<R>
  107. where F: FnOnce(&mut H, &'c mut [u8]) -> Result<R> {
  108. self.dequeue_padding();
  109. let Self { ref mut metadata_ring, ref mut payload_ring } = *self;
  110. metadata_ring.dequeue_one_with(move |metadata| {
  111. let PacketMetadata { ref mut header, size } = *metadata;
  112. payload_ring.dequeue_many_with(|payload_buf| {
  113. debug_assert!(payload_buf.len() >= size);
  114. match f(header.as_mut().unwrap(), &mut payload_buf[..size]) {
  115. Ok(val) => (size, Ok(val)),
  116. Err(err) => (0, Err(err)),
  117. }
  118. }).1
  119. })
  120. }
  121. /// Dequeue a single packet from the buffer, and return a reference to its payload
  122. /// as well as its header, or return `Err(Error::Exhausted)` if the buffer is empty.
  123. pub fn dequeue(&mut self) -> Result<(H, &mut [u8])> {
  124. self.dequeue_padding();
  125. let PacketMetadata { ref mut header, size } = *self.metadata_ring.dequeue_one()?;
  126. let payload_buf = self.payload_ring.dequeue_many(size);
  127. debug_assert!(payload_buf.len() == size);
  128. Ok((header.take().unwrap(), payload_buf))
  129. }
  130. /// Peek at a single packet from the buffer without removing it, and return a reference to
  131. /// its payload as well as its header, or return `Err(Error:Exhaused)` if the buffer is empty.
  132. ///
  133. /// This function otherwise behaves identically to [dequeue](#method.dequeue).
  134. pub fn peek(&mut self) -> Result<(&H, &[u8])> {
  135. self.dequeue_padding();
  136. if let Some(metadata) = self.metadata_ring.get_allocated(0, 1).first() {
  137. Ok((metadata.header.as_ref().unwrap(), self.payload_ring.get_allocated(0, metadata.size)))
  138. } else {
  139. Err(Error::Exhausted)
  140. }
  141. }
  142. /// Return the maximum number packets that can be stored.
  143. pub fn packet_capacity(&self) -> usize {
  144. self.metadata_ring.capacity()
  145. }
  146. /// Return the maximum number of bytes in the payload ring buffer.
  147. pub fn payload_capacity(&self) -> usize {
  148. self.payload_ring.capacity()
  149. }
  150. }
  151. #[cfg(test)]
  152. mod test {
  153. use super::*;
  154. fn buffer() -> PacketBuffer<'static, 'static, ()> {
  155. PacketBuffer::new(vec![PacketMetadata::EMPTY; 4],
  156. vec![0u8; 16])
  157. }
  158. #[test]
  159. fn test_simple() {
  160. let mut buffer = buffer();
  161. buffer.enqueue(6, ()).unwrap().copy_from_slice(b"abcdef");
  162. assert_eq!(buffer.enqueue(16, ()), Err(Error::Exhausted));
  163. assert_eq!(buffer.metadata_ring.len(), 1);
  164. assert_eq!(buffer.dequeue().unwrap().1, &b"abcdef"[..]);
  165. assert_eq!(buffer.dequeue(), Err(Error::Exhausted));
  166. }
  167. #[test]
  168. fn test_peek() {
  169. let mut buffer = buffer();
  170. assert_eq!(buffer.peek(), Err(Error::Exhausted));
  171. buffer.enqueue(6, ()).unwrap().copy_from_slice(b"abcdef");
  172. assert_eq!(buffer.metadata_ring.len(), 1);
  173. assert_eq!(buffer.peek().unwrap().1, &b"abcdef"[..]);
  174. assert_eq!(buffer.dequeue().unwrap().1, &b"abcdef"[..]);
  175. assert_eq!(buffer.peek(), Err(Error::Exhausted));
  176. }
  177. #[test]
  178. fn test_padding() {
  179. let mut buffer = buffer();
  180. assert!(buffer.enqueue(6, ()).is_ok());
  181. assert!(buffer.enqueue(8, ()).is_ok());
  182. assert!(buffer.dequeue().is_ok());
  183. buffer.enqueue(4, ()).unwrap().copy_from_slice(b"abcd");
  184. assert_eq!(buffer.metadata_ring.len(), 3);
  185. assert!(buffer.dequeue().is_ok());
  186. assert_eq!(buffer.dequeue().unwrap().1, &b"abcd"[..]);
  187. assert_eq!(buffer.metadata_ring.len(), 0);
  188. }
  189. #[test]
  190. fn test_dequeue_with() {
  191. let mut buffer = buffer();
  192. assert!(buffer.enqueue(6, ()).is_ok());
  193. assert!(buffer.enqueue(8, ()).is_ok());
  194. assert!(buffer.dequeue().is_ok());
  195. buffer.enqueue(4, ()).unwrap().copy_from_slice(b"abcd");
  196. assert_eq!(buffer.metadata_ring.len(), 3);
  197. assert!(buffer.dequeue().is_ok());
  198. assert!(buffer.dequeue_with(|_, _| Err(Error::Unaddressable) as Result<()>).is_err());
  199. assert_eq!(buffer.metadata_ring.len(), 1);
  200. assert!(buffer.dequeue_with(|&mut (), payload| {
  201. assert_eq!(payload, &b"abcd"[..]);
  202. Ok(())
  203. }).is_ok());
  204. assert_eq!(buffer.metadata_ring.len(), 0);
  205. }
  206. #[test]
  207. fn test_metadata_full_empty() {
  208. let mut buffer = buffer();
  209. assert_eq!(buffer.is_empty(), true);
  210. assert_eq!(buffer.is_full(), false);
  211. assert!(buffer.enqueue(1, ()).is_ok());
  212. assert_eq!(buffer.is_empty(), false);
  213. assert!(buffer.enqueue(1, ()).is_ok());
  214. assert!(buffer.enqueue(1, ()).is_ok());
  215. assert_eq!(buffer.is_full(), false);
  216. assert_eq!(buffer.is_empty(), false);
  217. assert!(buffer.enqueue(1, ()).is_ok());
  218. assert_eq!(buffer.is_full(), true);
  219. assert_eq!(buffer.is_empty(), false);
  220. assert_eq!(buffer.metadata_ring.len(), 4);
  221. assert_eq!(buffer.enqueue(1, ()), Err(Error::Exhausted));
  222. }
  223. #[test]
  224. fn test_window_too_small() {
  225. let mut buffer = buffer();
  226. assert!(buffer.enqueue(4, ()).is_ok());
  227. assert!(buffer.enqueue(8, ()).is_ok());
  228. assert!(buffer.dequeue().is_ok());
  229. assert_eq!(buffer.enqueue(16, ()), Err(Error::Exhausted));
  230. assert_eq!(buffer.metadata_ring.len(), 1);
  231. }
  232. #[test]
  233. fn test_contiguous_window_too_small() {
  234. let mut buffer = buffer();
  235. assert!(buffer.enqueue(4, ()).is_ok());
  236. assert!(buffer.enqueue(8, ()).is_ok());
  237. assert!(buffer.dequeue().is_ok());
  238. assert_eq!(buffer.enqueue(8, ()), Err(Error::Exhausted));
  239. assert_eq!(buffer.metadata_ring.len(), 1);
  240. }
  241. #[test]
  242. fn test_capacity_too_small() {
  243. let mut buffer = buffer();
  244. assert_eq!(buffer.enqueue(32, ()), Err(Error::Truncated));
  245. }
  246. #[test]
  247. fn test_contig_window_prioritized() {
  248. let mut buffer = buffer();
  249. assert!(buffer.enqueue(4, ()).is_ok());
  250. assert!(buffer.dequeue().is_ok());
  251. assert!(buffer.enqueue(5, ()).is_ok());
  252. }
  253. }