Browse Source

Factor out storage::PacketBuffer from socket::UdpSocket.

whitequark 7 years ago
parent
commit
86c59fe344
6 changed files with 279 additions and 251 deletions
  1. 2 2
      examples/server.rs
  2. 3 3
      src/iface/ethernet.rs
  3. 2 2
      src/socket/mod.rs
  4. 35 244
      src/socket/udp.rs
  5. 2 0
      src/storage/mod.rs
  6. 235 0
      src/storage/packet_buffer.rs

+ 2 - 2
examples/server.rs

@@ -32,8 +32,8 @@ fn main() {
 
     let neighbor_cache = NeighborCache::new(BTreeMap::new());
 
-    let udp_rx_buffer = UdpSocketBuffer::new(vec![UdpPacketMetadata::default()], vec![0; 64]);
-    let udp_tx_buffer = UdpSocketBuffer::new(vec![UdpPacketMetadata::default()], vec![0; 128]);
+    let udp_rx_buffer = UdpSocketBuffer::new(vec![UdpPacketMetadata::empty()], vec![0; 64]);
+    let udp_tx_buffer = UdpSocketBuffer::new(vec![UdpPacketMetadata::empty()], vec![0; 128]);
     let udp_socket = UdpSocket::new(udp_rx_buffer, udp_tx_buffer);
 
     let tcp1_rx_buffer = TcpSocketBuffer::new(vec![0; 64]);

+ 3 - 3
src/iface/ethernet.rs

@@ -1365,15 +1365,15 @@ mod test {
     #[test]
     #[cfg(all(feature = "socket-udp", feature = "proto-ipv4"))]
     fn test_handle_udp_broadcast() {
-        use socket::{UdpSocket, UdpSocketBuffer};
+        use socket::{UdpSocket, UdpSocketBuffer, UdpPacketMetadata};
         use wire::IpEndpoint;
 
         static UDP_PAYLOAD: [u8; 5] = [0x48, 0x65, 0x6c, 0x6c, 0x6f];
 
         let (iface, mut socket_set) = create_loopback();
 
-        let rx_buffer = UdpSocketBuffer::new(vec![Default::default()], vec![0; 15]);
-        let tx_buffer = UdpSocketBuffer::new(vec![Default::default()], vec![0; 15]);
+        let rx_buffer = UdpSocketBuffer::new(vec![UdpPacketMetadata::empty()], vec![0; 15]);
+        let tx_buffer = UdpSocketBuffer::new(vec![UdpPacketMetadata::empty()], vec![0; 15]);
 
         let udp_socket = UdpSocket::new(rx_buffer, tx_buffer);
 

+ 2 - 2
src/socket/mod.rs

@@ -40,8 +40,8 @@ pub use self::icmp::{PacketBuffer as IcmpPacketBuffer,
                      IcmpSocket};
 
 #[cfg(feature = "socket-udp")]
-pub use self::udp::{PacketMetadata as UdpPacketMetadata,
-                    SocketBuffer as UdpSocketBuffer,
+pub use self::udp::{UdpPacketMetadata,
+                    UdpSocketBuffer,
                     UdpSocket};
 
 #[cfg(feature = "socket-tcp")]

+ 35 - 244
src/socket/udp.rs

@@ -1,79 +1,14 @@
 use core::cmp::min;
-use managed::ManagedSlice;
 
 use {Error, Result};
 use socket::{Socket, SocketMeta, SocketHandle};
-use storage::RingBuffer;
+use storage::{PacketBuffer, PacketMetadata};
 use time::Instant;
 use wire::{IpProtocol, IpRepr, IpEndpoint, UdpRepr};
 
-/// Endpoint and size of an UDP packet.
-#[derive(Debug, Clone, Copy, Default)]
-pub struct PacketMetadata {
-    endpoint: IpEndpoint,
-    size: usize,
-    /// Padding packets can be used to avoid wrap-arounds of packets in the payload buffer
-    padding: bool,
-}
+pub type UdpPacketMetadata = PacketMetadata<IpEndpoint>;
 
-/// An UDP packet ring buffer.
-#[derive(Debug)]
-pub struct SocketBuffer<'a, 'b> {
-    metadata_buffer: RingBuffer<'a, PacketMetadata>,
-    payload_buffer: RingBuffer<'b, u8>,
-}
-
-impl<'a, 'b> SocketBuffer<'a, 'b> {
-    /// Create a new socket buffer with the provided metadata and payload storage.
-    ///
-    /// Metadata storage limits the maximum _number_ of UDP packets in the buffer and payload
-    /// storage limits the maximum _cumulated size_ of UDP packets.
-    pub fn new<MS, PS>(metadata_storage: MS, payload_storage: PS) -> SocketBuffer<'a, 'b>
-        where MS: Into<ManagedSlice<'a, PacketMetadata>>, PS: Into<ManagedSlice<'b, u8>>,
-    {
-        SocketBuffer {
-            metadata_buffer: RingBuffer::new(metadata_storage),
-            payload_buffer: RingBuffer::new(payload_storage),
-        }
-    }
-
-    fn is_full(&self) -> bool {
-        self.metadata_buffer.is_full() || self.payload_buffer.is_full()
-    }
-
-    fn is_empty(&self) -> bool {
-        self.metadata_buffer.is_empty()
-    }
-
-    fn enqueue(&mut self, required_size: usize, endpoint: IpEndpoint) -> Result<&mut [u8]> {
-        let window = self.payload_buffer.window();
-        let contig_window = self.payload_buffer.contiguous_window();
-
-        if self.metadata_buffer.is_full() || self.payload_buffer.window() < required_size {
-            return Err(Error::Exhausted);
-        }
-
-        if contig_window < required_size {
-            // we reached the end of buffer, so the data does not fit without wrap-around
-            // -> insert padding and try again
-            self.payload_buffer.enqueue_many(required_size);
-            let metadata_buf = self.metadata_buffer.enqueue_one()?;
-            metadata_buf.padding = true;
-            metadata_buf.size = required_size;
-            metadata_buf.endpoint = IpEndpoint::default();
-            if window - contig_window < required_size {
-                return Err(Error::Exhausted);
-            }
-        }
-
-        let metadata_buf = self.metadata_buffer.enqueue_one()?;
-        metadata_buf.endpoint = endpoint;
-        metadata_buf.size = required_size;
-        metadata_buf.padding = false;
-
-        Ok(self.payload_buffer.enqueue_many(required_size))
-    }
-}
+pub type UdpSocketBuffer<'a, 'b> = PacketBuffer<'a, 'b, IpEndpoint>;
 
 /// An User Datagram Protocol socket.
 ///
@@ -83,16 +18,16 @@ impl<'a, 'b> SocketBuffer<'a, 'b> {
 pub struct UdpSocket<'a, 'b: 'a> {
     pub(crate) meta: SocketMeta,
     endpoint:  IpEndpoint,
-    rx_buffer: SocketBuffer<'a, 'b>,
-    tx_buffer: SocketBuffer<'a, 'b>,
+    rx_buffer: UdpSocketBuffer<'a, 'b>,
+    tx_buffer: UdpSocketBuffer<'a, 'b>,
     /// The time-to-live (IPv4) or hop limit (IPv6) value used in outgoing packets.
     hop_limit: Option<u8>
 }
 
 impl<'a, 'b> UdpSocket<'a, 'b> {
     /// Create an UDP socket with the given buffers.
-    pub fn new(rx_buffer: SocketBuffer<'a, 'b>,
-               tx_buffer: SocketBuffer<'a, 'b>) -> UdpSocket<'a, 'b> {
+    pub fn new(rx_buffer: UdpSocketBuffer<'a, 'b>,
+               tx_buffer: UdpSocketBuffer<'a, 'b>) -> UdpSocket<'a, 'b> {
         UdpSocket {
             meta:      SocketMeta::default(),
             endpoint:  IpEndpoint::default(),
@@ -185,8 +120,6 @@ impl<'a, 'b> UdpSocket<'a, 'b> {
 
         let payload_buf = self.tx_buffer.enqueue(size, endpoint)?;
 
-        debug_assert_eq!(payload_buf.len(), size);
-
         net_trace!("{}:{}:{}: buffer to send {} octets",
                    self.meta.handle, self.endpoint, endpoint, size);
         Ok(payload_buf)
@@ -205,21 +138,12 @@ impl<'a, 'b> UdpSocket<'a, 'b> {
     ///
     /// This function returns `Err(Error::Exhausted)` if the receive buffer is empty.
     pub fn recv(&mut self) -> Result<(&[u8], IpEndpoint)> {
-        let mut metadata_buf = *self.rx_buffer.metadata_buffer.dequeue_one()?;
-        if metadata_buf.padding {
-            // packet is padding packet -> drop it and try again
-            self.rx_buffer.payload_buffer.dequeue_many(metadata_buf.size);
-            metadata_buf = *self.rx_buffer.metadata_buffer.dequeue_one()?;
-        }
-
-        debug_assert!(!metadata_buf.padding);
-        let payload_buf = self.rx_buffer.payload_buffer.dequeue_many(metadata_buf.size);
-        debug_assert_eq!(metadata_buf.size, payload_buf.len()); // ensured by inserting logic
+        let (endpoint, payload_buf) = self.rx_buffer.dequeue()?;
 
         net_trace!("{}:{}:{}: receive {} buffered octets",
                    self.meta.handle, self.endpoint,
-                metadata_buf.endpoint, metadata_buf.size);
-        Ok((payload_buf, metadata_buf.endpoint))
+                   endpoint, payload_buf.len());
+        Ok((payload_buf, endpoint))
     }
 
     /// Dequeue a packet received from a remote endpoint, copy the payload into the given slice,
@@ -259,50 +183,28 @@ impl<'a, 'b> UdpSocket<'a, 'b> {
 
     pub(crate) fn dispatch<F>(&mut self, emit: F) -> Result<()>
             where F: FnOnce((IpRepr, UdpRepr)) -> Result<()> {
-        let handle   = self.handle();
-        let endpoint = self.endpoint;
+        let handle    = self.handle();
+        let endpoint  = self.endpoint;
         let hop_limit = self.hop_limit.unwrap_or(64);
 
-        let SocketBuffer { ref mut metadata_buffer, ref mut payload_buffer } = self.tx_buffer;
-
-        // dequeue potential padding packet
-        let result = metadata_buffer.dequeue_one_with(|metadata_buf| {
-            if metadata_buf.padding {
-                Ok(metadata_buf.size) // dequeue metadata
-            } else {
-                Err(Error::Exhausted) // don't dequeue metadata
-            }
-        });
-        if let Ok(size) = result {
-            payload_buffer.dequeue_many(size); // dequeue padding payload
-        }
-
-        metadata_buffer.dequeue_one_with(move |metadata_buf| {
-            debug_assert!(!metadata_buf.padding);
-            payload_buffer.dequeue_many_with(|payload_buf| {
-                let payload_buf = &payload_buf[..metadata_buf.size];
-
-                net_trace!("{}:{}:{}: sending {} octets",
-                            handle, endpoint,
-                            metadata_buf.endpoint, metadata_buf.size);
-
-                let repr = UdpRepr {
-                    src_port: endpoint.port,
-                    dst_port: metadata_buf.endpoint.port,
-                    payload:  payload_buf,
-                };
-                let ip_repr = IpRepr::Unspecified {
-                    src_addr:    endpoint.addr,
-                    dst_addr:    metadata_buf.endpoint.addr,
-                    protocol:    IpProtocol::Udp,
-                    payload_len: repr.buffer_len(),
-                    hop_limit:   hop_limit,
-                };
-                match emit((ip_repr, repr)) {
-                    Ok(ret) => (metadata_buf.size, Ok(ret)),
-                    Err(ret) => (0, Err(ret)),
-                }
-            }).1
+        self.tx_buffer.dequeue_with(|remote_endpoint, payload_buf| {
+            net_trace!("{}:{}:{}: sending {} octets",
+                        handle, endpoint,
+                        endpoint, payload_buf.len());
+
+            let repr = UdpRepr {
+                src_port: endpoint.port,
+                dst_port: remote_endpoint.port,
+                payload:  payload_buf,
+            };
+            let ip_repr = IpRepr::Unspecified {
+                src_addr:    endpoint.addr,
+                dst_addr:    remote_endpoint.addr,
+                protocol:    IpProtocol::Udp,
+                payload_len: repr.buffer_len(),
+                hop_limit:   hop_limit,
+            };
+            emit((ip_repr, repr))
         })
     }
 
@@ -331,12 +233,12 @@ mod test {
     use wire::ip::test::{MOCK_IP_ADDR_1, MOCK_IP_ADDR_2, MOCK_IP_ADDR_3};
     use super::*;
 
-    fn buffer(packets: usize) -> SocketBuffer<'static, 'static> {
-        SocketBuffer::new(vec![Default::default(); packets], vec![0; 16 * packets])
+    fn buffer(packets: usize) -> UdpSocketBuffer<'static, 'static> {
+        UdpSocketBuffer::new(vec![UdpPacketMetadata::empty(); packets], vec![0; 16 * packets])
     }
 
-    fn socket(rx_buffer: SocketBuffer<'static, 'static>,
-              tx_buffer: SocketBuffer<'static, 'static>)
+    fn socket(rx_buffer: UdpSocketBuffer<'static, 'static>,
+              tx_buffer: UdpSocketBuffer<'static, 'static>)
             -> UdpSocket<'static, 'static> {
         UdpSocket::new(rx_buffer, tx_buffer)
     }
@@ -559,108 +461,9 @@ mod test {
         assert_eq!(socket.send_slice(&too_large[..16*4], REMOTE_END), Ok(()));
     }
 
-    #[test]
-    fn test_send_wraparound_1() {
-        let mut socket = socket(buffer(0), buffer(3));
-        assert_eq!(socket.bind(LOCAL_END), Ok(()));
-
-        let large = b"0123456789abcdef0123456789abcdef0123456789abcdef";
-
-        assert_eq!(socket.send_slice(&large[..15], REMOTE_END), Ok(()));
-        assert_eq!(socket.send_slice(&large[..16*2], REMOTE_END), Ok(()));
-        // no padding should be inserted because capacity does not suffice
-        assert_eq!(socket.send_slice(b"12", REMOTE_END), Err(Error::Exhausted));
-        assert_eq!(socket.tx_buffer.metadata_buffer.len(), 2);
-        assert_eq!(socket.tx_buffer.payload_buffer.len(), 16*3-1);
-
-        assert_eq!(socket.dispatch(|_| Ok(())), Ok(()));
-        // insert padding
-        assert_eq!(socket.send_slice(&large[..16], REMOTE_END), Err(Error::Exhausted));
-        assert_eq!(socket.tx_buffer.metadata_buffer.len(), 2);
-        assert_eq!(socket.tx_buffer.payload_buffer.len(), 16*3-15);
-
-        assert_eq!(socket.dispatch(|_| Ok(())), Ok(()));
-        // packet dequed, but padding is still there
-        assert_eq!(socket.tx_buffer.metadata_buffer.len(), 1);
-        assert_eq!(socket.tx_buffer.payload_buffer.len(), 1);
-
-        assert_eq!(socket.dispatch(|_| Ok(())), Err(Error::Exhausted));
-        assert_eq!(socket.tx_buffer.metadata_buffer.len(), 0);
-        assert_eq!(socket.tx_buffer.payload_buffer.len(), 0);
-    }
-
-    #[test]
-    fn test_send_wraparound_2() {
-        let mut socket = socket(buffer(0), buffer(3));
-        assert_eq!(socket.bind(LOCAL_END), Ok(()));
-
-        let large = b"0123456789abcdef0123456789abcdef0123456789abcdef";
-
-        assert_eq!(socket.send_slice(&large[..16*2], REMOTE_END), Ok(()));
-        assert_eq!(socket.send_slice(&large[..15], REMOTE_END), Ok(()));
-        // no padding should be inserted because capacity does not suffice
-        assert_eq!(socket.send_slice(b"12", REMOTE_END), Err(Error::Exhausted));
-        assert_eq!(socket.tx_buffer.metadata_buffer.len(), 2);
-        assert_eq!(socket.tx_buffer.payload_buffer.len(), 16*3-1);
-
-        assert_eq!(socket.dispatch(|_| Ok(())), Ok(()));
-        // insert padding and slice
-        assert_eq!(socket.send_slice(&large[..16*2], REMOTE_END), Ok(()));
-        assert_eq!(socket.tx_buffer.metadata_buffer.len(), 3);
-        assert_eq!(socket.tx_buffer.payload_buffer.len(), 16*3);
-
-        assert_eq!(socket.dispatch(|_| Ok(())), Ok(()));
-        // packet dequed, but padding is still there
-        assert_eq!(socket.tx_buffer.metadata_buffer.len(), 2);
-        assert_eq!(socket.tx_buffer.payload_buffer.len(), 16*3-15);
-
-        assert_eq!(socket.dispatch(|_| Ok(())), Ok(()));
-        // padding and packet dequeued
-        assert_eq!(socket.tx_buffer.metadata_buffer.len(), 0);
-        assert_eq!(socket.tx_buffer.payload_buffer.len(), 0);
-    }
-
-    #[test]
-    fn test_process_wraparound() {
-        // every packet will be 6 bytes
-        let recv_buffer = SocketBuffer::new(vec![Default::default(); 4], vec![0; 6*3 + 2]);
-        let mut socket = socket(recv_buffer, buffer(0));
-        assert_eq!(socket.bind(LOCAL_PORT), Ok(()));
-
-        assert_eq!(socket.process(&remote_ip_repr(), &REMOTE_UDP_REPR), Ok(()));
-        assert_eq!(socket.process(&remote_ip_repr(), &REMOTE_UDP_REPR), Ok(()));
-        assert_eq!(socket.process(&remote_ip_repr(), &REMOTE_UDP_REPR), Ok(()));
-        assert_eq!(socket.rx_buffer.metadata_buffer.len(), 3);
-        assert_eq!(socket.rx_buffer.payload_buffer.len(), 6*3);
-
-        assert_eq!(socket.process(&remote_ip_repr(), &REMOTE_UDP_REPR),
-                   Err(Error::Exhausted));
-        // no padding inserted because capacity does not suffice
-        assert_eq!(socket.rx_buffer.metadata_buffer.len(), 3);
-        assert_eq!(socket.rx_buffer.payload_buffer.len(), 6*3);
-
-        assert_eq!(socket.recv(), Ok((&b"abcdef"[..], REMOTE_END)));
-        assert_eq!(socket.process(&remote_ip_repr(), &REMOTE_UDP_REPR), Ok(()));
-        // padding inserted
-        assert_eq!(socket.rx_buffer.metadata_buffer.len(), 4);
-        assert_eq!(socket.rx_buffer.payload_buffer.len(), 6*3 + 2);
-
-        assert_eq!(socket.recv(), Ok((&b"abcdef"[..], REMOTE_END)));
-        assert_eq!(socket.recv(), Ok((&b"abcdef"[..], REMOTE_END)));
-        // two packets dequed, last packet and padding still there
-        assert_eq!(socket.rx_buffer.metadata_buffer.len(), 2);
-        assert_eq!(socket.rx_buffer.payload_buffer.len(), 6 + 2);
-
-        assert_eq!(socket.recv(), Ok((&b"abcdef"[..], REMOTE_END)));
-        // everything dequed
-        assert_eq!(socket.rx_buffer.metadata_buffer.len(), 0);
-        assert_eq!(socket.rx_buffer.payload_buffer.len(), 0);
-    }
-
     #[test]
     fn test_process_empty_payload() {
-        // every packet will be 6 bytes
-        let recv_buffer = SocketBuffer::new(vec![Default::default(); 1], vec![]);
+        let recv_buffer = UdpSocketBuffer::new(vec![UdpPacketMetadata::empty(); 1], vec![]);
         let mut socket = socket(recv_buffer, buffer(0));
         assert_eq!(socket.bind(LOCAL_PORT), Ok(()));
 
@@ -669,18 +472,6 @@ mod test {
             dst_port: LOCAL_PORT,
             payload: &[]
         };
-
-        assert_eq!(socket.process(&remote_ip_repr(), &repr), Ok(()));
-        assert_eq!(socket.rx_buffer.metadata_buffer.len(), 1);
-        assert_eq!(socket.rx_buffer.payload_buffer.len(), 0);
-
-        // The metatdata has been queued into the metadata buffer
-        assert!(!socket.rx_buffer.metadata_buffer.is_empty());
-        // The no payload data has been queued into the payload buffer
-        assert!(socket.rx_buffer.payload_buffer.is_empty());
-        // The received packets buffer is not empty and we can recv
-        assert!(socket.can_recv());
-        assert_eq!(socket.recv(), Ok((&[][..], REMOTE_END)));
         assert_eq!(socket.process(&remote_ip_repr(), &repr), Ok(()));
         assert_eq!(socket.recv(), Ok((&[][..], REMOTE_END)));
     }

+ 2 - 0
src/storage/mod.rs

@@ -7,9 +7,11 @@ or `alloc` crates being available, and heap-allocated memory.
 
 mod assembler;
 mod ring_buffer;
+mod packet_buffer;
 
 pub use self::assembler::Assembler;
 pub use self::ring_buffer::RingBuffer;
+pub use self::packet_buffer::{PacketBuffer, PacketMetadata};
 
 /// A trait for setting a value to a known state.
 ///

+ 235 - 0
src/storage/packet_buffer.rs

@@ -0,0 +1,235 @@
+use managed::ManagedSlice;
+
+use {Error, Result};
+use super::RingBuffer;
+
+/// Size and header of a packet.
+#[derive(Debug, Clone, Copy)]
+pub struct PacketMetadata<H> {
+    size:   usize,
+    header: Option<H>
+}
+
+impl<H> PacketMetadata<H> {
+    /// Create an empty packet description.
+    pub fn empty() -> PacketMetadata<H> {
+        Self::padding(0)
+    }
+
+    fn padding(size: usize) -> PacketMetadata<H> {
+        PacketMetadata {
+            size:   size,
+            header: None
+        }
+    }
+
+    fn packet(size: usize, header: H) -> PacketMetadata<H> {
+        PacketMetadata {
+            size:   size,
+            header: Some(header)
+        }
+    }
+
+    fn is_padding(&self) -> bool {
+        self.header.is_none()
+    }
+}
+
+/// An UDP packet ring buffer.
+#[derive(Debug)]
+pub struct PacketBuffer<'a, 'b, H: 'a> {
+    metadata_ring: RingBuffer<'a, PacketMetadata<H>>,
+    payload_ring:  RingBuffer<'b, u8>,
+}
+
+impl<'a, 'b, H> PacketBuffer<'a, 'b, H> {
+    /// Create a new packet buffer with the provided metadata and payload storage.
+    ///
+    /// Metadata storage limits the maximum _number_ of packets in the buffer and payload
+    /// storage limits the maximum _total size_ of packets.
+    pub fn new<MS, PS>(metadata_storage: MS, payload_storage: PS) -> PacketBuffer<'a, 'b, H>
+        where MS: Into<ManagedSlice<'a, PacketMetadata<H>>>,
+              PS: Into<ManagedSlice<'b, u8>>,
+    {
+        PacketBuffer {
+            metadata_ring: RingBuffer::new(metadata_storage),
+            payload_ring:  RingBuffer::new(payload_storage),
+        }
+    }
+
+    /// Query whether the buffer is empty.
+    pub fn is_empty(&self) -> bool {
+        self.metadata_ring.is_empty()
+    }
+
+    /// Query whether the buffer is full.
+    pub fn is_full(&self) -> bool {
+        self.metadata_ring.is_full()
+    }
+
+    // There is currently no enqueue_with() because of the complexity of managing padding
+    // in case of failure.
+
+    /// Enqueue a single packet with the given header into the buffer, and
+    /// return a reference to its payload, or return `Err(Error::Exhausted)`
+    /// if the buffer is full or does not have enough spare payload space.
+    pub fn enqueue(&mut self, size: usize, header: H) -> Result<&mut [u8]> {
+        let window = self.payload_ring.window();
+        let contig_window = self.payload_ring.contiguous_window();
+
+        if self.metadata_ring.is_full() || window < size ||
+                (window != contig_window && window - contig_window < size) {
+            return Err(Error::Exhausted)
+        }
+
+        if contig_window < size {
+            *self.metadata_ring.enqueue_one()? = PacketMetadata::padding(size);
+            self.payload_ring.enqueue_many(size);
+        }
+
+        *self.metadata_ring.enqueue_one()? = PacketMetadata::packet(size, header);
+
+        let payload_buf = self.payload_ring.enqueue_many(size);
+        debug_assert!(payload_buf.len() == size);
+        Ok(payload_buf)
+    }
+
+    fn dequeue_padding(&mut self) {
+        let Self { ref mut metadata_ring, ref mut payload_ring } = *self;
+
+        let _ = metadata_ring.dequeue_one_with(|metadata| {
+            if metadata.is_padding() {
+                payload_ring.dequeue_many(metadata.size);
+                Ok(()) // dequeue metadata
+            } else {
+                Err(Error::Exhausted) // don't dequeue metadata
+            }
+        });
+    }
+
+    /// Call `f` with a single packet from the buffer, and dequeue the packet if `f`
+    /// returns successfully, or return `Err(Error::Exhausted)` if the buffer is empty.
+    pub fn dequeue_with<'c, R, F>(&'c mut self, f: F) -> Result<R>
+            where F: FnOnce(&mut H, &'c mut [u8]) -> Result<R> {
+        self.dequeue_padding();
+
+        let Self { ref mut metadata_ring, ref mut payload_ring } = *self;
+
+        metadata_ring.dequeue_one_with(move |metadata| {
+            let PacketMetadata { ref mut header, size } = *metadata;
+
+            payload_ring.dequeue_many_with(|payload_buf| {
+                debug_assert!(payload_buf.len() >= size);
+
+                match f(header.as_mut().unwrap(), &mut payload_buf[..size]) {
+                    Ok(val)  => (size, Ok(val)),
+                    Err(err) => (0,    Err(err)),
+                }
+            }).1
+        })
+    }
+
+    /// Dequeue a single packet from the buffer, and return a reference to its payload
+    /// as well as its header, or return `Err(Error::Exhausted)` if the buffer is empty.
+    pub fn dequeue(&mut self) -> Result<(H, &mut [u8])> {
+        self.dequeue_padding();
+
+        let PacketMetadata { ref mut header, size } = *self.metadata_ring.dequeue_one()?;
+
+        let payload_buf = self.payload_ring.dequeue_many(size);
+        debug_assert!(payload_buf.len() == size);
+        Ok((header.take().unwrap(), payload_buf))
+    }
+}
+
+#[cfg(test)]
+mod test {
+    use super::*;
+
+    fn buffer() -> PacketBuffer<'static, 'static, ()> {
+        PacketBuffer::new(vec![PacketMetadata::empty(); 4],
+                          vec![0u8; 16])
+    }
+
+    #[test]
+    fn test_simple() {
+        let mut buffer = buffer();
+        buffer.enqueue(6, ()).unwrap().copy_from_slice(b"abcdef");
+        assert_eq!(buffer.enqueue(32, ()), Err(Error::Exhausted));
+        assert_eq!(buffer.metadata_ring.len(), 1);
+        assert_eq!(buffer.dequeue().unwrap().1, &b"abcdef"[..]);
+        assert_eq!(buffer.dequeue(), Err(Error::Exhausted));
+    }
+
+    #[test]
+    fn test_padding() {
+        let mut buffer = buffer();
+        assert!(buffer.enqueue(6, ()).is_ok());
+        assert!(buffer.enqueue(8, ()).is_ok());
+        assert!(buffer.dequeue().is_ok());
+        buffer.enqueue(4, ()).unwrap().copy_from_slice(b"abcd");
+        assert_eq!(buffer.metadata_ring.len(), 3);
+        assert!(buffer.dequeue().is_ok());
+
+        assert_eq!(buffer.dequeue().unwrap().1, &b"abcd"[..]);
+        assert_eq!(buffer.metadata_ring.len(), 0);
+    }
+
+    #[test]
+    fn test_dequeue_with() {
+        let mut buffer = buffer();
+        assert!(buffer.enqueue(6, ()).is_ok());
+        assert!(buffer.enqueue(8, ()).is_ok());
+        assert!(buffer.dequeue().is_ok());
+        buffer.enqueue(4, ()).unwrap().copy_from_slice(b"abcd");
+        assert_eq!(buffer.metadata_ring.len(), 3);
+        assert!(buffer.dequeue().is_ok());
+
+        assert!(buffer.dequeue_with(|_, _| Err(Error::Unaddressable) as Result<()>).is_err());
+        assert_eq!(buffer.metadata_ring.len(), 1);
+
+        assert!(buffer.dequeue_with(|&mut (), payload| {
+            assert_eq!(payload, &b"abcd"[..]);
+            Ok(())
+        }).is_ok());
+        assert_eq!(buffer.metadata_ring.len(), 0);
+    }
+
+    #[test]
+    fn test_metadata_full_empty() {
+        let mut buffer = buffer();
+        assert_eq!(buffer.is_empty(), true);
+        assert_eq!(buffer.is_full(), false);
+        assert!(buffer.enqueue(1, ()).is_ok());
+        assert_eq!(buffer.is_empty(), false);
+        assert!(buffer.enqueue(1, ()).is_ok());
+        assert!(buffer.enqueue(1, ()).is_ok());
+        assert_eq!(buffer.is_full(), false);
+        assert_eq!(buffer.is_empty(), false);
+        assert!(buffer.enqueue(1, ()).is_ok());
+        assert_eq!(buffer.is_full(), true);
+        assert_eq!(buffer.is_empty(), false);
+        assert_eq!(buffer.metadata_ring.len(), 4);
+        assert_eq!(buffer.enqueue(1, ()), Err(Error::Exhausted));
+    }
+
+    #[test]
+    fn test_window_too_small() {
+        let mut buffer = buffer();
+        assert!(buffer.enqueue(4, ()).is_ok());
+        assert!(buffer.enqueue(8, ()).is_ok());
+        assert!(buffer.dequeue().is_ok());
+        assert_eq!(buffer.enqueue(16, ()), Err(Error::Exhausted));
+        assert_eq!(buffer.metadata_ring.len(), 1);
+    }
+
+    #[test]
+    fn test_contiguous_window_too_small() {
+        let mut buffer = buffer();
+        assert!(buffer.enqueue(4, ()).is_ok());
+        assert!(buffer.enqueue(8, ()).is_ok());
+        assert!(buffer.dequeue().is_ok());
+        assert_eq!(buffer.enqueue(8, ()), Err(Error::Exhausted));
+        assert_eq!(buffer.metadata_ring.len(), 1);
+    }
+}