瀏覽代碼

Refactor the "continuous" ring buffer interface.

This also makes TcpSocket::{send,recv}_slice slightly more efficient
in case when the slice wraps around the corresponding buffer,
halving the necessary amount of calls.
whitequark 7 年之前
父節點
當前提交
fb05226a62
共有 4 個文件被更改,包括 391 次插入110 次删除
  1. 4 4
      src/socket/raw.rs
  2. 32 16
      src/socket/tcp.rs
  3. 4 4
      src/socket/udp.rs
  4. 351 86
      src/storage/ring_buffer.rs

+ 4 - 4
src/socket/raw.rs

@@ -127,7 +127,7 @@ impl<'a, 'b> RawSocket<'a, 'b> {
     /// **Note:** The IP header is parsed and reserialized, and may not match
     /// the header actually transmitted bit for bit.
     pub fn send(&mut self, size: usize) -> Result<&mut [u8]> {
-        let packet_buf = self.tx_buffer.try_enqueue(|buf| buf.resize(size))?;
+        let packet_buf = self.tx_buffer.enqueue_one_with(|buf| buf.resize(size))?;
         net_trace!("[{}]:{}:{}: buffer to send {} octets",
                    self.debug_id, self.ip_version, self.ip_protocol,
                    packet_buf.size);
@@ -149,7 +149,7 @@ impl<'a, 'b> RawSocket<'a, 'b> {
     /// **Note:** The IP header is parsed and reserialized, and may not match
     /// the header actually received bit for bit.
     pub fn recv(&mut self) -> Result<&[u8]> {
-        let packet_buf = self.rx_buffer.dequeue()?;
+        let packet_buf = self.rx_buffer.dequeue_one()?;
         net_trace!("[{}]:{}:{}: receive {} buffered octets",
                    self.debug_id, self.ip_version, self.ip_protocol,
                    packet_buf.size);
@@ -172,7 +172,7 @@ impl<'a, 'b> RawSocket<'a, 'b> {
 
         let header_len = ip_repr.buffer_len();
         let total_len = header_len + payload.len();
-        let packet_buf = self.rx_buffer.try_enqueue(|buf| buf.resize(total_len))?;
+        let packet_buf = self.rx_buffer.enqueue_one_with(|buf| buf.resize(total_len))?;
         ip_repr.emit(&mut packet_buf.as_mut()[..header_len]);
         packet_buf.as_mut()[header_len..].copy_from_slice(payload);
         net_trace!("[{}]:{}:{}: receiving {} octets",
@@ -202,7 +202,7 @@ impl<'a, 'b> RawSocket<'a, 'b> {
         let debug_id    = self.debug_id;
         let ip_protocol = self.ip_protocol;
         let ip_version  = self.ip_version;
-        self.tx_buffer.try_dequeue(|packet_buf| {
+        self.tx_buffer.dequeue_one_with(|packet_buf| {
             match prepare(ip_protocol, packet_buf.as_mut()) {
                 Ok((ip_repr, raw_packet)) => {
                     net_trace!("[{}]:{}:{}: sending {} octets",

+ 32 - 16
src/socket/tcp.rs

@@ -475,14 +475,13 @@ impl<'a> TcpSocket<'a> {
     pub fn send(&mut self, size: usize) -> Result<&mut [u8]> {
         if !self.may_send() { return Err(Error::Illegal) }
 
-        #[cfg(any(test, feature = "verbose"))]
-        let old_length = self.tx_buffer.len();
-        let buffer = self.tx_buffer.enqueue_slice(size);
+        let _old_length = self.tx_buffer.len();
+        let buffer = self.tx_buffer.enqueue_many(size);
         if buffer.len() > 0 {
             #[cfg(any(test, feature = "verbose"))]
             net_trace!("[{}]{}:{}: tx buffer: enqueueing {} octets (now {})",
                        self.debug_id, self.local_endpoint, self.remote_endpoint,
-                       buffer.len(), old_length + buffer.len());
+                       buffer.len(), _old_length + buffer.len());
             self.timer.reset();
         }
         Ok(buffer)
@@ -495,10 +494,18 @@ impl<'a> TcpSocket<'a> {
     ///
     /// See also [send](#method.send).
     pub fn send_slice(&mut self, data: &[u8]) -> Result<usize> {
-        let buffer = self.send(data.len())?;
-        let data = &data[..buffer.len()];
-        buffer.copy_from_slice(data);
-        Ok(buffer.len())
+        if !self.may_send() { return Err(Error::Illegal) }
+
+        let old_length = self.tx_buffer.len();
+        let enqueued = self.tx_buffer.enqueue_slice(data);
+        if enqueued != 0 {
+            #[cfg(any(test, feature = "verbose"))]
+            net_trace!("[{}]{}:{}: tx buffer: enqueueing {} octets (now {})",
+                       self.debug_id, self.local_endpoint, self.remote_endpoint,
+                       enqueued, old_length + enqueued);
+            self.timer.reset();
+        }
+        Ok(enqueued)
     }
 
     /// Dequeue a sequence of received octets, and return a pointer to it.
@@ -517,7 +524,7 @@ impl<'a> TcpSocket<'a> {
 
         #[cfg(any(test, feature = "verbose"))]
         let old_length = self.rx_buffer.len();
-        let buffer = self.rx_buffer.dequeue_slice(size);
+        let buffer = self.rx_buffer.dequeue_many(size);
         self.remote_seq_no += buffer.len();
         if buffer.len() > 0 {
             #[cfg(any(test, feature = "verbose"))]
@@ -535,10 +542,19 @@ impl<'a> TcpSocket<'a> {
     ///
     /// See also [recv](#method.recv).
     pub fn recv_slice(&mut self, data: &mut [u8]) -> Result<usize> {
-        let buffer = self.recv(data.len())?;
-        let data = &mut data[..buffer.len()];
-        data.copy_from_slice(buffer);
-        Ok(buffer.len())
+        // See recv() above.
+        if !self.may_recv() { return Err(Error::Illegal) }
+
+        let old_length = self.rx_buffer.len();
+        let dequeued = self.rx_buffer.dequeue_slice(data);
+        self.remote_seq_no += dequeued;
+        if dequeued > 0 {
+            #[cfg(any(test, feature = "verbose"))]
+            net_trace!("[{}]{}:{}: rx buffer: dequeueing {} octets (now {})",
+                       self.debug_id, self.local_endpoint, self.remote_endpoint,
+                       dequeued, old_length - dequeued);
+        }
+        Ok(dequeued)
     }
 
     /// Peek at a sequence of received octets without removing them from
@@ -972,7 +988,7 @@ impl<'a> TcpSocket<'a> {
             net_trace!("[{}]{}:{}: tx buffer: dequeueing {} octets (now {})",
                        self.debug_id, self.local_endpoint, self.remote_endpoint,
                        ack_len, self.tx_buffer.len() - ack_len);
-            let acked = self.tx_buffer.dequeue_slice(ack_len);
+            let acked = self.tx_buffer.dequeue_many(ack_len);
             debug_assert!(acked.len() == ack_len);
         }
 
@@ -987,7 +1003,7 @@ impl<'a> TcpSocket<'a> {
             net_trace!("[{}]{}:{}: rx buffer: enqueueing {} octets (now {})",
                        self.debug_id, self.local_endpoint, self.remote_endpoint,
                        repr.payload.len(), self.rx_buffer.len() + repr.payload.len());
-            self.rx_buffer.enqueue_slice_all(repr.payload);
+            self.rx_buffer.enqueue_slice(repr.payload);
         }
 
         Ok(None)
@@ -1750,7 +1766,7 @@ mod test {
             window_len: 58,
             ..RECV_TEMPL
         }]);
-        assert_eq!(s.rx_buffer.dequeue_slice(6), &b"abcdef"[..]);
+        assert_eq!(s.rx_buffer.dequeue_many(6), &b"abcdef"[..]);
     }
 
     #[test]

+ 4 - 4
src/socket/udp.rs

@@ -141,7 +141,7 @@ impl<'a, 'b> UdpSocket<'a, 'b> {
         if self.endpoint.port == 0 { return Err(Error::Unaddressable) }
         if !endpoint.is_specified() { return Err(Error::Unaddressable) }
 
-        let packet_buf = self.tx_buffer.try_enqueue(|buf| buf.resize(size))?;
+        let packet_buf = self.tx_buffer.enqueue_one_with(|buf| buf.resize(size))?;
         packet_buf.endpoint = endpoint;
         net_trace!("[{}]{}:{}: buffer to send {} octets",
                    self.debug_id, self.endpoint, packet_buf.endpoint, size);
@@ -161,7 +161,7 @@ impl<'a, 'b> UdpSocket<'a, 'b> {
     ///
     /// This function returns `Err(Error::Exhausted)` if the receive buffer is empty.
     pub fn recv(&mut self) -> Result<(&[u8], IpEndpoint)> {
-        let packet_buf = self.rx_buffer.dequeue()?;
+        let packet_buf = self.rx_buffer.dequeue_one()?;
         net_trace!("[{}]{}:{}: receive {} buffered octets",
                    self.debug_id, self.endpoint,
                    packet_buf.endpoint, packet_buf.size);
@@ -185,7 +185,7 @@ impl<'a, 'b> UdpSocket<'a, 'b> {
         if !self.endpoint.addr.is_unspecified() &&
            self.endpoint.addr != ip_repr.dst_addr() { return Err(Error::Rejected) }
 
-        let packet_buf = self.rx_buffer.try_enqueue(|buf| buf.resize(repr.payload.len()))?;
+        let packet_buf = self.rx_buffer.enqueue_one_with(|buf| buf.resize(repr.payload.len()))?;
         packet_buf.as_mut().copy_from_slice(repr.payload);
         packet_buf.endpoint = IpEndpoint { addr: ip_repr.src_addr(), port: repr.src_port };
         net_trace!("[{}]{}:{}: receiving {} octets",
@@ -198,7 +198,7 @@ impl<'a, 'b> UdpSocket<'a, 'b> {
             where F: FnOnce((IpRepr, UdpRepr)) -> Result<()> {
         let debug_id = self.debug_id;
         let endpoint = self.endpoint;
-        self.tx_buffer.try_dequeue(|packet_buf| {
+        self.tx_buffer.dequeue_one_with(|packet_buf| {
             net_trace!("[{}]{}:{}: sending {} octets",
                        debug_id, endpoint,
                        packet_buf.endpoint, packet_buf.size);

+ 351 - 86
src/storage/ring_buffer.rs

@@ -1,3 +1,4 @@
+use core::cmp;
 use managed::{Managed, ManagedSlice};
 
 use {Error, Result};
@@ -31,6 +32,11 @@ impl<'a, T: 'a> RingBuffer<'a, T> {
         self.length  = 0;
     }
 
+    /// Return the maximum number of elements in the ring buffer.
+    pub fn capacity(&self) -> usize {
+        self.storage.len()
+    }
+
     /// Clear the ring buffer, and reset every element.
     pub fn reset(&mut self)
             where T: Resettable {
@@ -45,9 +51,15 @@ impl<'a, T: 'a> RingBuffer<'a, T> {
         self.length
     }
 
-    /// Return the maximum number of elements in the ring buffer.
-    pub fn capacity(&self) -> usize {
-        self.storage.len()
+    /// Set the current number of elements in the ring buffer.
+    ///
+    /// The newly added elements (if any) retain their old value.
+    ///
+    /// # Panics
+    /// This function panics if the new length is greater than capacity.
+    pub fn set_len(&mut self, length: usize) {
+        assert!(length <= self.capacity());
+        self.length = length
     }
 
     /// Return the number of elements that can be added to the ring buffer.
@@ -71,7 +83,7 @@ impl<'a, T: 'a> RingBuffer<'a, T> {
 impl<'a, T: 'a> RingBuffer<'a, T> {
     /// Call `f` with a single buffer element, and enqueue the element if `f`
     /// returns successfully, or return `Err(Error::Exhausted)` if the buffer is full.
-    pub fn try_enqueue<'b, R, F>(&'b mut self, f: F) -> Result<R>
+    pub fn enqueue_one_with<'b, R, F>(&'b mut self, f: F) -> Result<R>
             where F: FnOnce(&'b mut T) -> Result<R> {
         if self.full() { return Err(Error::Exhausted) }
 
@@ -85,15 +97,17 @@ impl<'a, T: 'a> RingBuffer<'a, T> {
         }
     }
 
-    /// Enqueue a single element into the buffer, and return a pointer to it,
+    /// Enqueue a single element into the buffer, and return a reference to it,
     /// or return `Err(Error::Exhausted)` if the buffer is full.
-    pub fn enqueue<'b>(&'b mut self) -> Result<&'b mut T> {
-        self.try_enqueue(Ok)
+    ///
+    /// This function is a shortcut for `ring_buf.enqueue_one_with(Ok)`.
+    pub fn enqueue_one<'b>(&'b mut self) -> Result<&'b mut T> {
+        self.enqueue_one_with(Ok)
     }
 
-    /// Call `f` with a buffer element, and dequeue the element if `f` returns successfully, or
-    /// return `Err(Error::Exhausted)` if the buffer is empty.
-    pub fn try_dequeue<'b, R, F>(&'b mut self, f: F) -> Result<R>
+    /// Call `f` with a single buffer element, and dequeue the element if `f`
+    /// returns successfully, or return `Err(Error::Exhausted)` if the buffer is empty.
+    pub fn dequeue_one_with<'b, R, F>(&'b mut self, f: F) -> Result<R>
             where F: FnOnce(&'b mut T) -> Result<R> {
         if self.empty() { return Err(Error::Exhausted) }
 
@@ -108,48 +122,113 @@ impl<'a, T: 'a> RingBuffer<'a, T> {
         }
     }
 
-    /// Dequeue an element from the buffer, and return a mutable reference to it, or return
-    /// `Err(Error::Exhausted)` if the buffer is empty.
-    pub fn dequeue(&mut self) -> Result<&mut T> {
-        self.try_dequeue(Ok)
+    /// Dequeue an element from the buffer, and return a reference to it,
+    /// or return `Err(Error::Exhausted)` if the buffer is empty.
+    ///
+    /// This function is a shortcut for `ring_buf.dequeue_one_with(Ok)`.
+    pub fn dequeue_one(&mut self) -> Result<&mut T> {
+        self.dequeue_one_with(Ok)
     }
 }
 
 // This is the "continuous" ring buffer interface: it operates with element slices,
 // and boundary conditions (empty/full) simply result in empty slices.
 impl<'a, T: 'a> RingBuffer<'a, T> {
-    fn clamp_writer(&self, mut size: usize) -> (usize, usize) {
+    /// Call `f` with the largest contiguous slice of unallocated buffer elements,
+    /// and enqueue the amount of elements returned by `f`.
+    ///
+    /// # Panics
+    /// This function panics if the amount of elements returned by `f` is larger
+    /// than the size of the slice passed into it.
+    pub fn enqueue_many_with<'b, R, F>(&'b mut self, f: F) -> (usize, R)
+            where F: FnOnce(&'b mut [T]) -> (usize, R) {
         let write_at = (self.read_at + self.length) % self.capacity();
-        // We can't enqueue more than there is free space.
-        let free = self.capacity() - self.length;
-        if size > free { size = free }
-        // We can't contiguously enqueue past the beginning of the storage.
-        let until_end = self.capacity() - write_at;
-        if size > until_end { size = until_end }
+        let max_size = cmp::min(self.window(), self.capacity() - write_at);
+        let (size, result) = f(&mut self.storage[write_at..write_at + max_size]);
+        assert!(size <= max_size);
+        self.length += size;
+        (size, result)
+    }
 
-        (write_at, size)
+    /// Enqueue a slice of elements up to the given size into the buffer,
+    /// and return a reference to them.
+    ///
+    /// This function may return a slice smaller than the given size
+    /// if the free space in the buffer is not contiguous.
+    pub fn enqueue_many<'b>(&'b mut self, size: usize) -> &'b mut [T] {
+        self.enqueue_many_with(|buf| {
+            let size = cmp::min(size, buf.len());
+            (size, &mut buf[..size])
+        }).1
     }
 
-    pub(crate) fn enqueue_slice<'b>(&'b mut self, size: usize) -> &'b mut [T] {
-        let (write_at, size) = self.clamp_writer(size);
-        self.length += size;
-        &mut self.storage[write_at..write_at + size]
+    /// Enqueue as many elements from the given slice into the buffer as possible,
+    /// and return the amount of elements that could fit.
+    pub fn enqueue_slice(&mut self, data: &[T]) -> usize
+            where T: Copy {
+        let (size_1, data) = self.enqueue_many_with(|buf| {
+            let size = cmp::min(buf.len(), data.len());
+            buf[..size].copy_from_slice(&data[..size]);
+            (size, &data[size..])
+        });
+        let (size_2, ()) = self.enqueue_many_with(|buf| {
+            let size = cmp::min(buf.len(), data.len());
+            buf[..size].copy_from_slice(&data[..size]);
+            (size, ())
+        });
+        size_1 + size_2
+    }
+
+    /// Call `f` with the largest contiguous slice of allocated buffer elements,
+    /// and dequeue the amount of elements returned by `f`.
+    ///
+    /// # Panics
+    /// This function panics if the amount of elements returned by `f` is larger
+    /// than the size of the slice passed into it.
+    pub fn dequeue_many_with<'b, R, F>(&'b mut self, f: F) -> (usize, R)
+            where F: FnOnce(&'b mut [T]) -> (usize, R) {
+        let capacity = self.capacity();
+        let max_size = cmp::min(self.len(), capacity - self.read_at);
+        let (size, result) = f(&mut self.storage[self.read_at..self.read_at + max_size]);
+        assert!(size <= max_size);
+        self.read_at = (self.read_at + size) % capacity;
+        self.length -= size;
+        (size, result)
+    }
+
+    /// Dequeue a slice of elements up to the given size from the buffer,
+    /// and return a reference to them.
+    ///
+    /// This function may return a slice smaller than the given size
+    /// if the allocated space in the buffer is not contiguous.
+    pub fn dequeue_many<'b>(&'b mut self, size: usize) -> &'b mut [T] {
+        self.dequeue_many_with(|buf| {
+            let size = cmp::min(size, buf.len());
+            (size, &mut buf[..size])
+        }).1
     }
 
-    pub(crate) fn enqueue_slice_all(&mut self, data: &[T])
+    /// Dequeue as many elements from the buffer into the given slice as possible,
+    /// and return the amount of elements that could fit.
+    pub fn dequeue_slice(&mut self, data: &mut [T]) -> usize
             where T: Copy {
-        let data = {
-            let mut dest = self.enqueue_slice(data.len());
-            let (data, rest) = data.split_at(dest.len());
-            dest.copy_from_slice(data);
-            rest
-        };
-        // Retry, in case we had a wraparound.
-        let mut dest = self.enqueue_slice(data.len());
-        let (data, _) = data.split_at(dest.len());
-        dest.copy_from_slice(data);
+        let (size_1, data) = self.dequeue_many_with(|buf| {
+            let size = cmp::min(buf.len(), data.len());
+            data[..size].copy_from_slice(&buf[..size]);
+            (size, &mut data[size..])
+        });
+        let (size_2, ()) = self.dequeue_many_with(|buf| {
+            let size = cmp::min(buf.len(), data.len());
+            data[..size].copy_from_slice(&buf[..size]);
+            (size, ())
+        });
+        size_1 + size_2
     }
+}
 
+// This is the "random access" ring buffer interface: it operates with element slices,
+// and allows to access elements of the buffer that are not adjacent to its head or tail.
+impl<'a, T: 'a> RingBuffer<'a, T> {
     fn clamp_reader(&self, offset: usize, mut size: usize) -> (usize, usize) {
         let read_at = (self.read_at + offset) % self.capacity();
         // We can't read past the end of the queued data.
@@ -164,13 +243,6 @@ impl<'a, T: 'a> RingBuffer<'a, T> {
         (read_at, size)
     }
 
-    pub(crate) fn dequeue_slice(&mut self, size: usize) -> &[T] {
-        let (read_at, size) = self.clamp_reader(0, size);
-        self.read_at = (self.read_at + size) % self.capacity();
-        self.length -= size;
-        &self.storage[read_at..read_at + size]
-    }
-
     pub(crate) fn peek(&self, offset: usize, size: usize) -> &[T] {
         let (read_at, size) = self.clamp_reader(offset, size);
         &self.storage[read_at..read_at + size]
@@ -187,60 +259,253 @@ impl<'a, T: 'a> From<ManagedSlice<'a, T>> for RingBuffer<'a, T> {
 mod test {
     use super::*;
 
-    const SIZE: usize = 5;
+    #[test]
+    pub fn test_buffer_length_changes() {
+        let mut ring = RingBuffer::new(vec![0; 2]);
+        assert!(ring.empty());
+        assert!(!ring.full());
+        assert_eq!(ring.len(), 0);
+        assert_eq!(ring.capacity(), 2);
+        assert_eq!(ring.window(), 2);
+
+        ring.set_len(1);
+        assert!(!ring.empty());
+        assert!(!ring.full());
+        assert_eq!(ring.len(), 1);
+        assert_eq!(ring.capacity(), 2);
+        assert_eq!(ring.window(), 1);
+
+        ring.set_len(2);
+        assert!(!ring.empty());
+        assert!(ring.full());
+        assert_eq!(ring.len(), 2);
+        assert_eq!(ring.capacity(), 2);
+        assert_eq!(ring.window(), 0);
+    }
 
     #[test]
-    pub fn test_buffer() {
-        let mut buf = RingBuffer::new(vec![0; SIZE]);
-        assert!(buf.empty());
-        assert!(!buf.full());
-        assert_eq!(buf.dequeue(), Err(Error::Exhausted));
-
-        buf.enqueue().unwrap();
-        assert!(!buf.empty());
-        assert!(!buf.full());
-
-        for i in 1..SIZE {
-            *buf.enqueue().unwrap() = i;
-            assert!(!buf.empty());
+    pub fn test_buffer_enqueue_dequeue_one_with() {
+        let mut ring = RingBuffer::new(vec![0; 5]);
+        assert_eq!(ring.dequeue_one_with(|_| unreachable!()) as Result<()>,
+                   Err(Error::Exhausted));
+
+        ring.enqueue_one_with(|e| Ok(e)).unwrap();
+        assert!(!ring.empty());
+        assert!(!ring.full());
+
+        for i in 1..5 {
+            ring.enqueue_one_with(|e| Ok(*e = i)).unwrap();
+            assert!(!ring.empty());
         }
-        assert!(buf.full());
-        assert_eq!(buf.enqueue(), Err(Error::Exhausted));
+        assert!(ring.full());
+        assert_eq!(ring.enqueue_one_with(|_| unreachable!()) as Result<()>,
+                   Err(Error::Exhausted));
 
-        for i in 0..SIZE {
-            assert_eq!(*buf.dequeue().unwrap(), i);
-            assert!(!buf.full());
+        for i in 0..5 {
+            assert_eq!(ring.dequeue_one_with(|e| Ok(*e)).unwrap(), i);
+            assert!(!ring.full());
         }
-        assert_eq!(buf.dequeue(), Err(Error::Exhausted));
-        assert!(buf.empty());
+        assert_eq!(ring.dequeue_one_with(|_| unreachable!()) as Result<()>,
+                   Err(Error::Exhausted));
+        assert!(ring.empty());
     }
 
     #[test]
-    pub fn test_buffer_try() {
-        let mut buf = RingBuffer::new(vec![0; SIZE]);
-        assert!(buf.empty());
-        assert!(!buf.full());
-        assert_eq!(buf.try_dequeue(|_| unreachable!()) as Result<()>,
-                   Err(Error::Exhausted));
+    pub fn test_buffer_enqueue_dequeue_one() {
+        let mut ring = RingBuffer::new(vec![0; 5]);
+        assert_eq!(ring.dequeue_one(), Err(Error::Exhausted));
 
-        buf.try_enqueue(|e| Ok(e)).unwrap();
-        assert!(!buf.empty());
-        assert!(!buf.full());
+        ring.enqueue_one().unwrap();
+        assert!(!ring.empty());
+        assert!(!ring.full());
 
-        for i in 1..SIZE {
-            buf.try_enqueue(|e| Ok(*e = i)).unwrap();
-            assert!(!buf.empty());
+        for i in 1..5 {
+            *ring.enqueue_one().unwrap() = i;
+            assert!(!ring.empty());
         }
-        assert!(buf.full());
-        assert_eq!(buf.try_enqueue(|_| unreachable!()) as Result<()>,
-                   Err(Error::Exhausted));
+        assert!(ring.full());
+        assert_eq!(ring.enqueue_one(), Err(Error::Exhausted));
 
-        for i in 0..SIZE {
-            assert_eq!(buf.try_dequeue(|e| Ok(*e)).unwrap(), i);
-            assert!(!buf.full());
+        for i in 0..5 {
+            assert_eq!(*ring.dequeue_one().unwrap(), i);
+            assert!(!ring.full());
+        }
+        assert_eq!(ring.dequeue_one(), Err(Error::Exhausted));
+        assert!(ring.empty());
+    }
+
+    #[test]
+    pub fn test_buffer_enqueue_many_with() {
+        let mut ring = RingBuffer::new(vec![b'.'; 12]);
+
+        assert_eq!(ring.enqueue_many_with(|buf| {
+            assert_eq!(buf.len(), 12);
+            buf[0..2].copy_from_slice(b"ab");
+            (2, true)
+        }), (2, true));
+        assert_eq!(ring.len(), 2);
+        assert_eq!(&ring.storage[..], b"ab..........");
+
+        ring.enqueue_many_with(|buf| {
+            assert_eq!(buf.len(), 12 - 2);
+            buf[0..4].copy_from_slice(b"cdXX");
+            (2, ())
+        });
+        assert_eq!(ring.len(), 4);
+        assert_eq!(&ring.storage[..], b"abcdXX......");
+
+        ring.enqueue_many_with(|buf| {
+            assert_eq!(buf.len(), 12 - 4);
+            buf[0..4].copy_from_slice(b"efgh");
+            (4, ())
+        });
+        assert_eq!(ring.len(), 8);
+        assert_eq!(&ring.storage[..], b"abcdefgh....");
+
+        for i in 0..4 {
+            *ring.dequeue_one().unwrap() = b'.';
+        }
+        assert_eq!(ring.len(), 4);
+        assert_eq!(&ring.storage[..], b"....efgh....");
+
+        ring.enqueue_many_with(|buf| {
+            assert_eq!(buf.len(), 12 - 8);
+            buf[0..4].copy_from_slice(b"ijkl");
+            (4, ())
+        });
+        assert_eq!(ring.len(), 8);
+        assert_eq!(&ring.storage[..], b"....efghijkl");
+
+        ring.enqueue_many_with(|buf| {
+            assert_eq!(buf.len(), 4);
+            buf[0..4].copy_from_slice(b"abcd");
+            (4, ())
+        });
+        assert_eq!(ring.len(), 12);
+        assert_eq!(&ring.storage[..], b"abcdefghijkl");
+
+        for i in 0..4 {
+            *ring.dequeue_one().unwrap() = b'.';
+        }
+        assert_eq!(ring.len(), 8);
+        assert_eq!(&ring.storage[..], b"abcd....ijkl");
+    }
+
+    #[test]
+    pub fn test_buffer_enqueue_many() {
+        let mut ring = RingBuffer::new(vec![b'.'; 12]);
+
+        ring.enqueue_many(8).copy_from_slice(b"abcdefgh");
+        assert_eq!(ring.len(), 8);
+        assert_eq!(&ring.storage[..], b"abcdefgh....");
+
+        ring.enqueue_many(8).copy_from_slice(b"ijkl");
+        assert_eq!(ring.len(), 12);
+        assert_eq!(&ring.storage[..], b"abcdefghijkl");
+    }
+
+    #[test]
+    pub fn test_buffer_enqueue_slice() {
+        let mut ring = RingBuffer::new(vec![b'.'; 12]);
+
+        assert_eq!(ring.enqueue_slice(b"abcdefgh"), 8);
+        assert_eq!(ring.len(), 8);
+        assert_eq!(&ring.storage[..], b"abcdefgh....");
+
+        for i in 0..4 {
+            *ring.dequeue_one().unwrap() = b'.';
+        }
+        assert_eq!(ring.len(), 4);
+        assert_eq!(&ring.storage[..], b"....efgh....");
+
+        assert_eq!(ring.enqueue_slice(b"ijklabcd"), 8);
+        assert_eq!(ring.len(), 12);
+        assert_eq!(&ring.storage[..], b"abcdefghijkl");
+    }
+
+    #[test]
+    pub fn test_buffer_dequeue_many_with() {
+        let mut ring = RingBuffer::new(vec![b'.'; 12]);
+
+        assert_eq!(ring.enqueue_slice(b"abcdefghijkl"), 12);
+
+        assert_eq!(ring.dequeue_many_with(|buf| {
+            assert_eq!(buf.len(), 12);
+            assert_eq!(buf, b"abcdefghijkl");
+            buf[..4].copy_from_slice(b"....");
+            (4, true)
+        }), (4, true));
+        assert_eq!(ring.len(), 8);
+        assert_eq!(&ring.storage[..], b"....efghijkl");
+
+        ring.dequeue_many_with(|buf| {
+            assert_eq!(buf, b"efghijkl");
+            buf[..4].copy_from_slice(b"....");
+            (4, ())
+        });
+        assert_eq!(ring.len(), 4);
+        assert_eq!(&ring.storage[..], b"........ijkl");
+
+        assert_eq!(ring.enqueue_slice(b"abcd"), 4);
+        assert_eq!(ring.len(), 8);
+
+        ring.dequeue_many_with(|buf| {
+            assert_eq!(buf, b"ijkl");
+            buf[..4].copy_from_slice(b"....");
+            (4, ())
+        });
+        ring.dequeue_many_with(|buf| {
+            assert_eq!(buf, b"abcd");
+            buf[..4].copy_from_slice(b"....");
+            (4, ())
+        });
+        assert_eq!(ring.len(), 0);
+        assert_eq!(&ring.storage[..], b"............");
+    }
+
+    #[test]
+    pub fn test_buffer_dequeue_many() {
+        let mut ring = RingBuffer::new(vec![b'.'; 12]);
+
+        assert_eq!(ring.enqueue_slice(b"abcdefghijkl"), 12);
+
+        {
+            let mut buf = ring.dequeue_many(8);
+            assert_eq!(buf, b"abcdefgh");
+            buf.copy_from_slice(b"........");
+        }
+        assert_eq!(ring.len(), 4);
+        assert_eq!(&ring.storage[..], b"........ijkl");
+
+        {
+            let mut buf = ring.dequeue_many(8);
+            assert_eq!(buf, b"ijkl");
+            buf.copy_from_slice(b"....");
+        }
+        assert_eq!(ring.len(), 0);
+        assert_eq!(&ring.storage[..], b"............");
+    }
+
+    #[test]
+    pub fn test_buffer_dequeue_slice() {
+        let mut ring = RingBuffer::new(vec![b'.'; 12]);
+
+        assert_eq!(ring.enqueue_slice(b"abcdefghijkl"), 12);
+
+        {
+            let mut buf = [0; 8];
+            assert_eq!(ring.dequeue_slice(&mut buf[..]), 8);
+            assert_eq!(&buf[..], b"abcdefgh");
+            assert_eq!(ring.len(), 4);
+        }
+
+        assert_eq!(ring.enqueue_slice(b"abcd"), 4);
+
+        {
+            let mut buf = [0; 8];
+            assert_eq!(ring.dequeue_slice(&mut buf[..]), 8);
+            assert_eq!(&buf[..], b"ijklabcd");
+            assert_eq!(ring.len(), 0);
         }
-        assert_eq!(buf.try_dequeue(|_| unreachable!()) as Result<()>,
-                   Err(Error::Exhausted));
-        assert!(buf.empty());
     }
 }