浏览代码

Merge the TCP ring buffer and generic ring buffer.

This adds a few methods to RingBuffer that don't quite fit into
its interface (the slice ones), but we can fix that later.
whitequark 7 年之前
父节点
当前提交
9cb813134c
共有 2 个文件被更改,包括 128 次插入203 次删除
  1. 8 148
      src/socket/tcp.rs
  2. 120 55
      src/storage/ring_buffer.rs

+ 8 - 148
src/socket/tcp.rs

@@ -8,122 +8,9 @@ use {Error, Result};
 use phy::DeviceLimits;
 use wire::{IpProtocol, IpAddress, IpEndpoint, TcpSeqNumber, TcpRepr, TcpControl};
 use socket::{Socket, IpRepr};
+use storage::RingBuffer;
 
-/// A TCP stream ring buffer.
-#[derive(Debug)]
-pub struct SocketBuffer<'a> {
-    storage: Managed<'a, [u8]>,
-    read_at: usize,
-    length:  usize
-}
-
-impl<'a> SocketBuffer<'a> {
-    /// Create a packet buffer with the given storage.
-    pub fn new<T>(storage: T) -> SocketBuffer<'a>
-            where T: Into<Managed<'a, [u8]>> {
-        SocketBuffer {
-            storage: storage.into(),
-            read_at: 0,
-            length:  0
-        }
-    }
-
-    fn clear(&mut self) {
-        self.read_at = 0;
-        self.length = 0;
-    }
-
-    fn capacity(&self) -> usize {
-        self.storage.len()
-    }
-
-    fn len(&self) -> usize {
-        self.length
-    }
-
-    fn window(&self) -> usize {
-        self.capacity() - self.len()
-    }
-
-    fn empty(&self) -> bool {
-        self.len() == 0
-    }
-
-    fn full(&self) -> bool {
-        self.window() == 0
-    }
-
-    fn clamp_writer(&self, mut size: usize) -> (usize, usize) {
-        let write_at = (self.read_at + self.length) % self.storage.len();
-        // We can't enqueue more than there is free space.
-        let free = self.storage.len() - self.length;
-        if size > free { size = free }
-        // We can't contiguously enqueue past the beginning of the storage.
-        let until_end = self.storage.len() - write_at;
-        if size > until_end { size = until_end }
-
-        (write_at, size)
-    }
-
-    fn enqueue(&mut self, size: usize) -> &mut [u8] {
-        let (write_at, size) = self.clamp_writer(size);
-        self.length += size;
-        &mut self.storage[write_at..write_at + size]
-    }
-
-    fn enqueue_slice(&mut self, data: &[u8]) {
-        let data = {
-            let mut dest = self.enqueue(data.len());
-            let (data, rest) = data.split_at(dest.len());
-            dest.copy_from_slice(data);
-            rest
-        };
-        // Retry, in case we had a wraparound.
-        let mut dest = self.enqueue(data.len());
-        let (data, _) = data.split_at(dest.len());
-        dest.copy_from_slice(data);
-    }
-
-    fn clamp_reader(&self, offset: usize, mut size: usize) -> (usize, usize) {
-        let read_at = (self.read_at + offset) % self.storage.len();
-        // We can't read past the end of the queued data.
-        if offset > self.length { return (read_at, 0) }
-        // We can't dequeue more than was queued.
-        let clamped_length = self.length - offset;
-        if size > clamped_length { size = clamped_length }
-        // We can't contiguously dequeue past the end of the storage.
-        let until_end = self.storage.len() - read_at;
-        if size > until_end { size = until_end }
-
-        (read_at, size)
-    }
-
-    fn dequeue(&mut self, size: usize) -> &[u8] {
-        let (read_at, size) = self.clamp_reader(0, size);
-        self.read_at = (self.read_at + size) % self.storage.len();
-        self.length -= size;
-        &self.storage[read_at..read_at + size]
-    }
-
-    fn peek(&self, offset: usize, size: usize) -> &[u8] {
-        let (read_at, size) = self.clamp_reader(offset, size);
-        &self.storage[read_at..read_at + size]
-    }
-
-    fn advance(&mut self, size: usize) {
-        if size > self.length {
-            panic!("advancing {} octets into free space", size - self.length)
-        }
-        self.read_at = (self.read_at + size) % self.storage.len();
-        self.length -= size;
-    }
-}
-
-impl<'a> Into<SocketBuffer<'a>> for Managed<'a, [u8]> {
-    fn into(self) -> SocketBuffer<'a> {
-        SocketBuffer::new(self)
-    }
-}
+pub type SocketBuffer<'a> = RingBuffer<'a, u8>;
 
 /// The state of a TCP socket, according to [RFC 793][rfc793].
 /// [rfc793]: https://tools.ietf.org/html/rfc793
@@ -590,7 +477,7 @@ impl<'a> TcpSocket<'a> {
 
         #[cfg(any(test, feature = "verbose"))]
         let old_length = self.tx_buffer.len();
-        let buffer = self.tx_buffer.enqueue(size);
+        let buffer = self.tx_buffer.enqueue_slice(size);
         if buffer.len() > 0 {
             #[cfg(any(test, feature = "verbose"))]
             net_trace!("[{}]{}:{}: tx buffer: enqueueing {} octets (now {})",
@@ -630,7 +517,7 @@ impl<'a> TcpSocket<'a> {
 
         #[cfg(any(test, feature = "verbose"))]
         let old_length = self.rx_buffer.len();
-        let buffer = self.rx_buffer.dequeue(size);
+        let buffer = self.rx_buffer.dequeue_slice(size);
         self.remote_seq_no += buffer.len();
         if buffer.len() > 0 {
             #[cfg(any(test, feature = "verbose"))]
@@ -1085,7 +972,8 @@ impl<'a> TcpSocket<'a> {
             net_trace!("[{}]{}:{}: tx buffer: dequeueing {} octets (now {})",
                        self.debug_id, self.local_endpoint, self.remote_endpoint,
                        ack_len, self.tx_buffer.len() - ack_len);
-            self.tx_buffer.advance(ack_len);
+            let acked = self.tx_buffer.dequeue_slice(ack_len);
+            debug_assert!(acked.len() == ack_len);
         }
 
         // We've processed everything in the incoming segment, so advance the local
@@ -1099,7 +987,7 @@ impl<'a> TcpSocket<'a> {
             net_trace!("[{}]{}:{}: rx buffer: enqueueing {} octets (now {})",
                        self.debug_id, self.local_endpoint, self.remote_endpoint,
                        repr.payload.len(), self.rx_buffer.len() + repr.payload.len());
-            self.rx_buffer.enqueue_slice(repr.payload);
+            self.rx_buffer.enqueue_slice_all(repr.payload);
         }
 
         Ok(None)
@@ -1317,34 +1205,6 @@ mod test {
     use wire::{IpAddress, Ipv4Address};
     use super::*;
 
-    #[test]
-    fn test_buffer() {
-        let mut buffer = SocketBuffer::new(vec![0; 8]); // ........
-        buffer.enqueue(6).copy_from_slice(b"foobar");   // foobar..
-        assert_eq!(buffer.dequeue(3), b"foo");          // ...bar..
-        buffer.enqueue(6).copy_from_slice(b"ba");       // ...barba
-        buffer.enqueue(4).copy_from_slice(b"zho");      // zhobarba
-        assert_eq!(buffer.dequeue(6), b"barba");        // zho.....
-        assert_eq!(buffer.dequeue(8), b"zho");          // ........
-        buffer.enqueue(8).copy_from_slice(b"gefug");    // ...gefug
-    }
-
-    #[test]
-    fn test_buffer_wraparound() {
-        let mut buffer = SocketBuffer::new(vec![0; 8]); // ........
-        buffer.enqueue_slice(&b"foobar"[..]);           // foobar..
-        assert_eq!(buffer.dequeue(3), b"foo");          // ...bar..
-        buffer.enqueue_slice(&b"bazhoge"[..]);          // zhobarba
-    }
-
-    #[test]
-    fn test_buffer_peek() {
-        let mut buffer = SocketBuffer::new(vec![0; 8]); // ........
-        buffer.enqueue_slice(&b"foobar"[..]);           // foobar..
-        assert_eq!(buffer.peek(0, 8), &b"foobar"[..]);
-        assert_eq!(buffer.peek(3, 8), &b"bar"[..]);
-    }
-
     #[test]
     fn test_timer_retransmit() {
         let mut r = Timer::Idle;
@@ -1890,7 +1750,7 @@ mod test {
             window_len: 58,
             ..RECV_TEMPL
         }]);
-        assert_eq!(s.rx_buffer.dequeue(6), &b"abcdef"[..]);
+        assert_eq!(s.rx_buffer.dequeue_slice(6), &b"abcdef"[..]);
     }
 
     #[test]

+ 120 - 55
src/storage/ring_buffer.rs

@@ -1,4 +1,4 @@
-use managed::Managed;
+use managed::{Managed, ManagedSlice};
 
 use {Error, Result};
 use super::Resettable;
@@ -6,9 +6,9 @@ use super::Resettable;
 /// A ring buffer.
 #[derive(Debug)]
 pub struct RingBuffer<'a, T: 'a> {
-    storage: Managed<'a, [T]>,
+    storage: ManagedSlice<'a, T>,
     read_at: usize,
-    length: usize,
+    length:  usize,
 }
 
 impl<'a, T: 'a> RingBuffer<'a, T> {
@@ -16,55 +16,66 @@ impl<'a, T: 'a> RingBuffer<'a, T> {
     ///
     /// During creation, every element in `storage` is reset.
     pub fn new<S>(storage: S) -> RingBuffer<'a, T>
-        where S: Into<Managed<'a, [T]>>, T: Resettable,
+        where S: Into<ManagedSlice<'a, T>>,
     {
-        let mut storage = storage.into();
-        for elem in storage.iter_mut() {
-            elem.reset();
-        }
-
         RingBuffer {
-            storage: storage,
+            storage: storage.into(),
             read_at: 0,
             length:  0,
         }
     }
 
-    fn mask(&self, index: usize) -> usize {
-        index % self.storage.len()
+    /// Clear the ring buffer.
+    pub fn clear(&mut self) {
+        self.read_at = 0;
+        self.length  = 0;
+    }
+
+    /// Clear the ring buffer, and reset every element.
+    pub fn reset(&mut self)
+            where T: Resettable {
+        self.clear();
+        for elem in self.storage.iter_mut() {
+            elem.reset();
+        }
     }
 
-    fn incr(&self, index: usize) -> usize {
-        self.mask(index + 1)
+    /// Return the current number of elements in the ring buffer.
+    pub fn len(&self) -> usize {
+        self.length
+    }
+
+    /// Return the maximum number of elements in the ring buffer.
+    pub fn capacity(&self) -> usize {
+        self.storage.len()
+    }
+
+    /// Return the number of elements that can be added to the ring buffer.
+    pub fn window(&self) -> usize {
+        self.capacity() - self.len()
     }
 
     /// Query whether the buffer is empty.
     pub fn empty(&self) -> bool {
-        self.length == 0
+        self.len() == 0
     }
 
     /// Query whether the buffer is full.
     pub fn full(&self) -> bool {
-        self.length == self.storage.len()
-    }
-
-    /// Enqueue an element into the buffer, and return a pointer to it, or return
-    /// `Err(Error::Exhausted)` if the buffer is full.
-    pub fn enqueue<'b>(&'b mut self) -> Result<&'b mut T> {
-        if self.full() { return Err(Error::Exhausted) }
-
-        let index = self.mask(self.read_at + self.length);
-        self.length += 1;
-        Ok(&mut self.storage[index])
+        self.window() == 0
     }
+}
 
-    /// Call `f` with a buffer element, and enqueue the element if `f` returns successfully, or
-    /// return `Err(Error::Exhausted)` if the buffer is full.
+// This is the "discrete" ring buffer interface: it operates with single elements,
+// and boundary conditions (empty/full) are errors.
+impl<'a, T: 'a> RingBuffer<'a, T> {
+    /// Call `f` with a single buffer element, and enqueue the element if `f`
+    /// returns successfully, or return `Err(Error::Exhausted)` if the buffer is full.
     pub fn try_enqueue<'b, R, F>(&'b mut self, f: F) -> Result<R>
             where F: FnOnce(&'b mut T) -> Result<R> {
         if self.full() { return Err(Error::Exhausted) }
 
-        let index = self.mask(self.read_at + self.length);
+        let index = (self.read_at + self.length) % self.capacity();
         match f(&mut self.storage[index]) {
             Ok(result) => {
                 self.length += 1;
@@ -74,15 +85,10 @@ impl<'a, T: 'a> RingBuffer<'a, T> {
         }
     }
 
-    /// Dequeue an element from the buffer, and return a mutable reference to it, or return
-    /// `Err(Error::Exhausted)` if the buffer is empty.
-    pub fn dequeue(&mut self) -> Result<&mut T> {
-        if self.empty() { return Err(Error::Exhausted) }
-
-        let read_at = self.read_at;
-        self.length -= 1;
-        self.read_at = self.incr(self.read_at);
-        Ok(&mut self.storage[read_at])
+    /// Enqueue a single element into the buffer, and return a pointer to it,
+    /// or return `Err(Error::Exhausted)` if the buffer is full.
+    pub fn enqueue<'b>(&'b mut self) -> Result<&'b mut T> {
+        self.try_enqueue(Ok)
     }
 
     /// Call `f` with a buffer element, and dequeue the element if `f` returns successfully, or
@@ -91,7 +97,7 @@ impl<'a, T: 'a> RingBuffer<'a, T> {
             where F: FnOnce(&'b mut T) -> Result<R> {
         if self.empty() { return Err(Error::Exhausted) }
 
-        let next_at = self.incr(self.read_at);
+        let next_at = (self.read_at + 1) % self.capacity();
         match f(&mut self.storage[self.read_at]) {
             Ok(result) => {
                 self.length -= 1;
@@ -101,32 +107,91 @@ impl<'a, T: 'a> RingBuffer<'a, T> {
             Err(error) => Err(error)
         }
     }
+
+    /// Dequeue an element from the buffer, and return a mutable reference to it, or return
+    /// `Err(Error::Exhausted)` if the buffer is empty.
+    pub fn dequeue(&mut self) -> Result<&mut T> {
+        self.try_dequeue(Ok)
+    }
 }
 
-#[cfg(test)]
-mod test {
-    use super::*;
+// This is the "continuous" ring buffer interface: it operates with element slices,
+// and boundary conditions (empty/full) simply result in empty slices.
+impl<'a, T: 'a> RingBuffer<'a, T> {
+    fn clamp_writer(&self, mut size: usize) -> (usize, usize) {
+        let write_at = (self.read_at + self.length) % self.capacity();
+        // We can't enqueue more than there is free space.
+        let free = self.capacity() - self.length;
+        if size > free { size = free }
+        // We can't contiguously enqueue past the beginning of the storage.
+        let until_end = self.capacity() - write_at;
+        if size > until_end { size = until_end }
+
+        (write_at, size)
+    }
 
-    impl Resettable for usize {
-        fn reset(&mut self) {
-            *self = 0;
-        }
+    pub(crate) fn enqueue_slice<'b>(&'b mut self, size: usize) -> &'b mut [T] {
+        let (write_at, size) = self.clamp_writer(size);
+        self.length += size;
+        &mut self.storage[write_at..write_at + size]
     }
 
-    const SIZE: usize = 5;
+    pub(crate) fn enqueue_slice_all(&mut self, data: &[T])
+            where T: Copy {
+        let data = {
+            let mut dest = self.enqueue_slice(data.len());
+            let (data, rest) = data.split_at(dest.len());
+            dest.copy_from_slice(data);
+            rest
+        };
+        // Retry, in case we had a wraparound.
+        let mut dest = self.enqueue_slice(data.len());
+        let (data, _) = data.split_at(dest.len());
+        dest.copy_from_slice(data);
+    }
 
-    fn buffer() -> RingBuffer<'static, usize> {
-        let mut storage = vec![];
-        for i in 0..SIZE {
-            storage.push(i + 10);
-        }
+    fn clamp_reader(&self, offset: usize, mut size: usize) -> (usize, usize) {
+        let read_at = (self.read_at + offset) % self.capacity();
+        // We can't read past the end of the queued data.
+        if offset > self.length { return (read_at, 0) }
+        // We can't dequeue more than was queued.
+        let clamped_length = self.length - offset;
+        if size > clamped_length { size = clamped_length }
+        // We can't contiguously dequeue past the end of the storage.
+        let until_end = self.capacity() - read_at;
+        if size > until_end { size = until_end }
+
+        (read_at, size)
+    }
+
+    pub(crate) fn dequeue_slice(&mut self, size: usize) -> &[T] {
+        let (read_at, size) = self.clamp_reader(0, size);
+        self.read_at = (self.read_at + size) % self.capacity();
+        self.length -= size;
+        &self.storage[read_at..read_at + size]
+    }
 
-        RingBuffer::new(storage)
+    pub(crate) fn peek(&self, offset: usize, size: usize) -> &[T] {
+        let (read_at, size) = self.clamp_reader(offset, size);
+        &self.storage[read_at..read_at + size]
     }
+}
+
+impl<'a, T: 'a> From<ManagedSlice<'a, T>> for RingBuffer<'a, T> {
+    fn from(slice: ManagedSlice<'a, T>) -> RingBuffer<'a, T> {
+        RingBuffer::new(slice)
+    }
+}
+
+#[cfg(test)]
+mod test {
+    use super::*;
+
+    const SIZE: usize = 5;
 
     #[test]
     pub fn test_buffer() {
-        let mut buf = buffer();
+        let mut buf = RingBuffer::new(vec![0; SIZE]);
         assert!(buf.empty());
         assert!(!buf.full());
         assert_eq!(buf.dequeue(), Err(Error::Exhausted));
@@ -152,7 +217,7 @@ mod test {
 
     #[test]
     pub fn test_buffer_try() {
-        let mut buf = buffer();
+        let mut buf = RingBuffer::new(vec![0; SIZE]);
         assert!(buf.empty());
         assert!(!buf.full());
         assert_eq!(buf.try_dequeue(|_| unreachable!()) as Result<()>,