Quellcode durchsuchen

Refactor the "random access" ring buffer interface.

whitequark vor 7 Jahren
Ursprung
Commit
68921f14f8
2 geänderte Dateien mit 109 neuen und 29 gelöschten Zeilen
  1. 2 2
      src/socket/tcp.rs
  2. 107 27
      src/storage/ring_buffer.rs

+ 2 - 2
src/socket/tcp.rs

@@ -565,7 +565,7 @@ impl<'a> TcpSocket<'a> {
         // See recv() above.
         if !self.may_recv() { return Err(Error::Illegal) }
 
-        let buffer = self.rx_buffer.peek(0, size);
+        let buffer = self.rx_buffer.get_allocated(0, size);
         if buffer.len() > 0 {
             #[cfg(any(test, feature = "verbose"))]
             net_trace!("[{}]{}:{}: rx buffer: peeking at {} octets",
@@ -1074,7 +1074,7 @@ impl<'a> TcpSocket<'a> {
                 // from the transmit buffer.
                 let offset = self.remote_last_seq - self.local_seq_no;
                 let size = cmp::min(self.remote_win_len, self.remote_mss);
-                repr.payload = self.tx_buffer.peek(offset, size);
+                repr.payload = self.tx_buffer.get_allocated(offset, size);
                 // If we've sent everything we had in the buffer, follow it with the PSH or FIN
                 // flags, depending on whether the transmit half of the connection is open.
                 if offset + repr.payload.len() == self.tx_buffer.len() {

+ 107 - 27
src/storage/ring_buffer.rs

@@ -5,6 +5,19 @@ use {Error, Result};
 use super::Resettable;
 
 /// A ring buffer.
+///
+/// This ring buffer implementation provides many ways to interact with it:
+///
+///   * Enqueueing or dequeueing one element from corresponding side of the buffer;
+///   * Enqueueing or dequeueing a slice of elements from corresponding side of the buffer;
+///   * Accessing allocated and unallocated areas directly.
+///
+/// It is also zero-copy; all methods provide references into the buffer's storage.
+/// Note that all references are mutable; it is considered more important to allow
+/// in-place processing than to protect from accidental mutation.
+///
+/// This implementation is suitable for both simple uses such as a FIFO queue
+/// of UDP packets, and advanced ones such as a TCP reassembly buffer.
 #[derive(Debug)]
 pub struct RingBuffer<'a, T: 'a> {
     storage: ManagedSlice<'a, T>,
@@ -78,8 +91,8 @@ impl<'a, T: 'a> RingBuffer<'a, T> {
     }
 }
 
-// This is the "discrete" ring buffer interface: it operates with single elements,
-// and boundary conditions (empty/full) are errors.
+/// This is the "discrete" ring buffer interface: it operates with single elements,
+/// and boundary conditions (empty/full) are errors.
 impl<'a, T: 'a> RingBuffer<'a, T> {
     /// Call `f` with a single buffer element, and enqueue the element if `f`
     /// returns successfully, or return `Err(Error::Exhausted)` if the buffer is full.
@@ -131,8 +144,8 @@ impl<'a, T: 'a> RingBuffer<'a, T> {
     }
 }
 
-// This is the "continuous" ring buffer interface: it operates with element slices,
-// and boundary conditions (empty/full) simply result in empty slices.
+/// This is the "continuous" ring buffer interface: it operates with element slices,
+/// and boundary conditions (empty/full) simply result in empty slices.
 impl<'a, T: 'a> RingBuffer<'a, T> {
     /// Call `f` with the largest contiguous slice of unallocated buffer elements,
     /// and enqueue the amount of elements returned by `f`.
@@ -226,26 +239,42 @@ impl<'a, T: 'a> RingBuffer<'a, T> {
     }
 }
 
-// This is the "random access" ring buffer interface: it operates with element slices,
-// and allows to access elements of the buffer that are not adjacent to its head or tail.
+/// This is the "random access" ring buffer interface: it operates with element slices,
+/// and allows to access elements of the buffer that are not adjacent to its head or tail.
+///
+/// After calling these functions to inject or extract elements, one would normally
+/// use the `enqueue_many` or `dequeue_many` methods to adjust the head or tail.
 impl<'a, T: 'a> RingBuffer<'a, T> {
-    fn clamp_reader(&self, offset: usize, mut size: usize) -> (usize, usize) {
-        let read_at = (self.read_at + offset) % self.capacity();
-        // We can't read past the end of the queued data.
-        if offset > self.length { return (read_at, 0) }
-        // We can't dequeue more than was queued.
+    /// Return the largest contiguous slice of unallocated buffer elements starting
+    /// at the given offset past the last allocated element, and up to the given size.
+    pub fn get_unallocated(&mut self, offset: usize, mut size: usize) -> &mut [T] {
+        let start_at = (self.read_at + self.length + offset) % self.capacity();
+        // We can't access past the end of unallocated data.
+        if offset > self.window() { return &mut [] }
+        // We can't enqueue more than there is free space.
+        let clamped_window = self.window() - offset;
+        if size > clamped_window { size = clamped_window }
+        // We can't contiguously enqueue past the end of the storage.
+        let until_end = self.capacity() - start_at;
+        if size > until_end { size = until_end }
+
+        &mut self.storage[start_at..start_at + size]
+    }
+
+    /// Return the largest contiguous slice of allocated buffer elements starting
+    /// at the given offset past the first allocated element, and up to the given size.
+    pub fn get_allocated(&self, offset: usize, mut size: usize) -> &[T] {
+        let start_at = (self.read_at + offset) % self.capacity();
+        // We can't read past the end of the allocated data.
+        if offset > self.length { return &mut [] }
+        // We can't read more than we have allocated.
         let clamped_length = self.length - offset;
         if size > clamped_length { size = clamped_length }
         // We can't contiguously dequeue past the end of the storage.
-        let until_end = self.capacity() - read_at;
+        let until_end = self.capacity() - start_at;
         if size > until_end { size = until_end }
 
-        (read_at, size)
-    }
-
-    pub(crate) fn peek(&self, offset: usize, size: usize) -> &[T] {
-        let (read_at, size) = self.clamp_reader(offset, size);
-        &self.storage[read_at..read_at + size]
+        &self.storage[start_at..start_at + size]
     }
 }
 
@@ -260,7 +289,7 @@ mod test {
     use super::*;
 
     #[test]
-    pub fn test_buffer_length_changes() {
+    fn test_buffer_length_changes() {
         let mut ring = RingBuffer::new(vec![0; 2]);
         assert!(ring.empty());
         assert!(!ring.full());
@@ -284,7 +313,7 @@ mod test {
     }
 
     #[test]
-    pub fn test_buffer_enqueue_dequeue_one_with() {
+    fn test_buffer_enqueue_dequeue_one_with() {
         let mut ring = RingBuffer::new(vec![0; 5]);
         assert_eq!(ring.dequeue_one_with(|_| unreachable!()) as Result<()>,
                    Err(Error::Exhausted));
@@ -311,7 +340,7 @@ mod test {
     }
 
     #[test]
-    pub fn test_buffer_enqueue_dequeue_one() {
+    fn test_buffer_enqueue_dequeue_one() {
         let mut ring = RingBuffer::new(vec![0; 5]);
         assert_eq!(ring.dequeue_one(), Err(Error::Exhausted));
 
@@ -335,7 +364,7 @@ mod test {
     }
 
     #[test]
-    pub fn test_buffer_enqueue_many_with() {
+    fn test_buffer_enqueue_many_with() {
         let mut ring = RingBuffer::new(vec![b'.'; 12]);
 
         assert_eq!(ring.enqueue_many_with(|buf| {
@@ -392,7 +421,7 @@ mod test {
     }
 
     #[test]
-    pub fn test_buffer_enqueue_many() {
+    fn test_buffer_enqueue_many() {
         let mut ring = RingBuffer::new(vec![b'.'; 12]);
 
         ring.enqueue_many(8).copy_from_slice(b"abcdefgh");
@@ -405,7 +434,7 @@ mod test {
     }
 
     #[test]
-    pub fn test_buffer_enqueue_slice() {
+    fn test_buffer_enqueue_slice() {
         let mut ring = RingBuffer::new(vec![b'.'; 12]);
 
         assert_eq!(ring.enqueue_slice(b"abcdefgh"), 8);
@@ -424,7 +453,7 @@ mod test {
     }
 
     #[test]
-    pub fn test_buffer_dequeue_many_with() {
+    fn test_buffer_dequeue_many_with() {
         let mut ring = RingBuffer::new(vec![b'.'; 12]);
 
         assert_eq!(ring.enqueue_slice(b"abcdefghijkl"), 12);
@@ -464,7 +493,7 @@ mod test {
     }
 
     #[test]
-    pub fn test_buffer_dequeue_many() {
+    fn test_buffer_dequeue_many() {
         let mut ring = RingBuffer::new(vec![b'.'; 12]);
 
         assert_eq!(ring.enqueue_slice(b"abcdefghijkl"), 12);
@@ -487,7 +516,7 @@ mod test {
     }
 
     #[test]
-    pub fn test_buffer_dequeue_slice() {
+    fn test_buffer_dequeue_slice() {
         let mut ring = RingBuffer::new(vec![b'.'; 12]);
 
         assert_eq!(ring.enqueue_slice(b"abcdefghijkl"), 12);
@@ -508,4 +537,55 @@ mod test {
             assert_eq!(ring.len(), 0);
         }
     }
+
+    #[test]
+    fn test_buffer_get_unallocated() {
+        let mut ring = RingBuffer::new(vec![b'.'; 12]);;
+
+        assert_eq!(ring.get_unallocated(16, 4), b"");
+
+        {
+            let buf = ring.get_unallocated(0, 4);
+            buf.copy_from_slice(b"abcd");
+        }
+        assert_eq!(&ring.storage[..], b"abcd........");
+
+        ring.enqueue_many(4);
+        assert_eq!(ring.len(), 4);
+
+        {
+            let buf = ring.get_unallocated(4, 8);
+            buf.copy_from_slice(b"ijkl");
+        }
+        assert_eq!(&ring.storage[..], b"abcd....ijkl");
+
+        ring.enqueue_many(8).copy_from_slice(b"EFGHIJKL");
+        ring.dequeue_many(4).copy_from_slice(b"abcd");
+        assert_eq!(ring.len(), 8);
+        assert_eq!(&ring.storage[..], b"abcdEFGHIJKL");
+
+        {
+            let buf = ring.get_unallocated(0, 8);
+            buf.copy_from_slice(b"ABCD");
+        }
+        assert_eq!(&ring.storage[..], b"ABCDEFGHIJKL");
+    }
+
+    #[test]
+    fn test_buffer_get_allocated() {
+        let mut ring = RingBuffer::new(vec![b'.'; 12]);;
+
+        assert_eq!(ring.get_allocated(16, 4), b"");
+        assert_eq!(ring.get_allocated(0, 4),  b"");
+
+        ring.enqueue_slice(b"abcd");
+        assert_eq!(ring.get_allocated(0, 8), b"abcd");
+
+        ring.enqueue_slice(b"efghijkl");
+        ring.dequeue_many(4).copy_from_slice(b"....");
+        assert_eq!(ring.get_allocated(4, 8), b"ijkl");
+
+        ring.enqueue_slice(b"abcd");
+        assert_eq!(ring.get_allocated(4, 8), b"ijkl");
+    }
 }