Browse Source

Redesign the phy::Device trait to avoid Drop impls.

Philipp Oppermann 7 years ago
parent
commit
198fe239f1
10 changed files with 593 additions and 445 deletions
  1. 6 3
      examples/tcpdump.rs
  2. 1 1
      examples/utils.rs
  3. 184 131
      src/iface/ethernet.rs
  4. 103 79
      src/phy/fault_injector.rs
  5. 37 35
      src/phy/loopback.rs
  6. 82 78
      src/phy/mod.rs
  7. 60 37
      src/phy/pcap_writer.rs
  8. 32 24
      src/phy/raw_socket.rs
  9. 38 28
      src/phy/tap_interface.rs
  10. 50 29
      src/phy/tracer.rs

+ 6 - 3
examples/tcpdump.rs

@@ -1,14 +1,17 @@
 extern crate smoltcp;
 
 use std::env;
-use smoltcp::phy::{Device, RawSocket};
+use smoltcp::phy::{Device, RxToken, RawSocket};
 use smoltcp::wire::{PrettyPrinter, EthernetFrame};
 
 fn main() {
     let ifname = env::args().nth(1).unwrap();
     let mut socket = RawSocket::new(ifname.as_ref()).unwrap();
     loop {
-        let buffer = socket.receive(/*timestamp=*/0).unwrap();
-        print!("{}", PrettyPrinter::<EthernetFrame<&[u8]>>::new("", &buffer))
+        let (rx_token, _) = socket.receive().unwrap();
+        rx_token.consume(/*timestamp = */ 0, |buffer| {
+            print!("{}", PrettyPrinter::<EthernetFrame<&[u8]>>::new("", &buffer));
+            Ok(())
+        }).unwrap();
     }
 }

+ 1 - 1
examples/utils.rs

@@ -92,7 +92,7 @@ pub fn add_middleware_options(opts: &mut Options, _free: &mut Vec<&str>) {
     opts.optopt("", "shaping-interval", "Sets the interval for rate limiting (ms)", "RATE");
 }
 
-pub fn parse_middleware_options<D: Device>(matches: &mut Matches, device: D, loopback: bool)
+pub fn parse_middleware_options<D: for<'a> Device<'a>>(matches: &mut Matches, device: D, loopback: bool)
         -> FaultInjector<EthernetTracer<PcapWriter<D, Rc<PcapSink>>>> {
     let drop_chance      = matches.opt_str("drop-chance").map(|s| u8::from_str(&s).unwrap())
                                   .unwrap_or(0);

+ 184 - 131
src/iface/ethernet.rs

@@ -5,7 +5,7 @@ use core::cmp;
 use managed::{Managed, ManagedSlice};
 
 use {Error, Result};
-use phy::Device;
+use phy::{Device, DeviceCapabilities, RxToken, TxToken};
 use wire::{EthernetAddress, EthernetProtocol, EthernetFrame};
 use wire::{Ipv4Address};
 use wire::{IpAddress, IpProtocol, IpRepr, IpCidr};
@@ -25,12 +25,24 @@ use super::ArpCache;
 /// The network interface logically owns a number of other data structures; to avoid
 /// a dependency on heap allocation, it instead owns a `BorrowMut<[T]>`, which can be
 /// a `&mut [T]`, or `Vec<T>` if a heap is available.
-pub struct Interface<'a, 'b, 'c, DeviceT: Device + 'a> {
-    device:         Managed<'a, DeviceT>,
-    arp_cache:      Managed<'b, ArpCache>,
-    ethernet_addr:  EthernetAddress,
-    ip_addrs:       ManagedSlice<'c, IpCidr>,
-    ipv4_gateway:   Option<Ipv4Address>,
+pub struct Interface<'a, 'b, 'c, DeviceT: for<'d> Device<'d> + 'a> {
+    device: Managed<'a, DeviceT>,
+    inner:  InterfaceInner<'b, 'c>,
+}
+
+/// The device independent part of an Ethernet network interface.
+///
+/// Separating the device from the data required for prorcessing and dispatching makes
+/// it possible to borrow them independently. For example, the tx and rx tokens borrow
+/// the `device` mutably until they're used, which makes it impossible to call other
+/// methods on the `Interface` in this time (since its `device` field is borrowed
+/// exclusively). However, it is still possible to call methods on its `inner` field.
+struct InterfaceInner<'b, 'c> {
+    arp_cache:              Managed<'b, ArpCache>,
+    ethernet_addr:          EthernetAddress,
+    ip_addrs:               ManagedSlice<'c, IpCidr>,
+    ipv4_gateway:           Option<Ipv4Address>,
+    device_capabilities:    DeviceCapabilities,
 }
 
 #[derive(Debug, PartialEq)]
@@ -46,7 +58,8 @@ enum Packet<'a> {
     Tcp((IpRepr, TcpRepr<'a>))
 }
 
-impl<'a, 'b, 'c, DeviceT: Device + 'a> Interface<'a, 'b, 'c, DeviceT> {
+impl<'a, 'b, 'c, DeviceT> Interface<'a, 'b, 'c, DeviceT>
+        where DeviceT: for<'d> Device<'d> + 'a {
     /// Create a network interface using the provided network device.
     ///
     /// # Panics
@@ -63,24 +76,24 @@ impl<'a, 'b, 'c, DeviceT: Device + 'a> Interface<'a, 'b, 'c, DeviceT> {
                   ProtocolAddrsMT: Into<ManagedSlice<'c, IpCidr>>,
                   Ipv4GatewayAddrT: Into<Option<Ipv4Address>>, {
         let device = device.into();
-        let arp_cache = arp_cache.into();
         let ip_addrs = ip_addrs.into();
-        let ipv4_gateway = ipv4_gateway.into();
-
-        Self::check_ethernet_addr(&ethernet_addr);
-        Self::check_ip_addrs(&ip_addrs);
-        Interface { device, arp_cache, ethernet_addr, ip_addrs, ipv4_gateway }
-    }
+        InterfaceInner::check_ethernet_addr(&ethernet_addr);
+        InterfaceInner::check_ip_addrs(&ip_addrs);
+
+        let inner = InterfaceInner {
+            ethernet_addr,
+            ip_addrs,
+            arp_cache: arp_cache.into(),
+            ipv4_gateway: ipv4_gateway.into(),
+            device_capabilities: device.capabilities(),
+        };
 
-    fn check_ethernet_addr(addr: &EthernetAddress) {
-        if addr.is_multicast() {
-            panic!("Ethernet address {} is not unicast", addr)
-        }
+        Interface { device, inner }
     }
 
     /// Get the Ethernet address of the interface.
     pub fn ethernet_addr(&self) -> EthernetAddress {
-        self.ethernet_addr
+        self.inner.ethernet_addr
     }
 
     /// Set the Ethernet address of the interface.
@@ -88,21 +101,13 @@ impl<'a, 'b, 'c, DeviceT: Device + 'a> Interface<'a, 'b, 'c, DeviceT> {
     /// # Panics
     /// This function panics if the address is not unicast.
     pub fn set_ethernet_addr(&mut self, addr: EthernetAddress) {
-        self.ethernet_addr = addr;
-        Self::check_ethernet_addr(&self.ethernet_addr);
-    }
-
-    fn check_ip_addrs(addrs: &[IpCidr]) {
-        for cidr in addrs {
-            if !cidr.address().is_unicast() {
-                panic!("IP address {} is not unicast", cidr.address())
-            }
-        }
+        self.inner.ethernet_addr = addr;
+        InterfaceInner::check_ethernet_addr(&self.inner.ethernet_addr);
     }
 
     /// Get the IP addresses of the interface.
     pub fn ip_addrs(&self) -> &[IpCidr] {
-        self.ip_addrs.as_ref()
+        self.inner.ip_addrs.as_ref()
     }
 
     /// Update the IP addresses of the interface.
@@ -110,25 +115,24 @@ impl<'a, 'b, 'c, DeviceT: Device + 'a> Interface<'a, 'b, 'c, DeviceT> {
     /// # Panics
     /// This function panics if any of the addresses is not unicast.
     pub fn update_ip_addrs<F: FnOnce(&mut ManagedSlice<'c, IpCidr>)>(&mut self, f: F) {
-        f(&mut self.ip_addrs);
-        Self::check_ip_addrs(&self.ip_addrs)
+        f(&mut self.inner.ip_addrs);
+        InterfaceInner::check_ip_addrs(&self.inner.ip_addrs)
     }
 
     /// Check whether the interface has the given IP address assigned.
     pub fn has_ip_addr<T: Into<IpAddress>>(&self, addr: T) -> bool {
-        let addr = addr.into();
-        self.ip_addrs.iter().any(|probe| probe.address() == addr)
+        self.inner.has_ip_addr(addr)
     }
 
     /// Get the IPv4 gateway of the interface.
     pub fn ipv4_gateway(&self) -> Option<Ipv4Address> {
-        self.ipv4_gateway
+        self.inner.ipv4_gateway
     }
 
     /// Set the IPv4 gateway of the interface.
     pub fn set_ipv4_gateway<GatewayAddrT>(&mut self, gateway: GatewayAddrT)
             where GatewayAddrT: Into<Option<Ipv4Address>> {
-        self.ipv4_gateway = gateway.into();
+        self.inner.ipv4_gateway = gateway.into();
     }
 
     /// Transmit packets queued in the given sockets, and receive packets queued
@@ -162,38 +166,16 @@ impl<'a, 'b, 'c, DeviceT: Device + 'a> Interface<'a, 'b, 'c, DeviceT> {
         }
     }
 
-    fn icmpv4_reply<'frame, 'icmp: 'frame>(&self,
-                                           ipv4_repr: Ipv4Repr,
-                                           icmp_repr: Icmpv4Repr<'icmp>)
-            -> Packet<'frame> {
-        if ipv4_repr.dst_addr.is_unicast() {
-            let ipv4_reply_repr = Ipv4Repr {
-                src_addr:    ipv4_repr.dst_addr,
-                dst_addr:    ipv4_repr.src_addr,
-                protocol:    IpProtocol::Icmp,
-                payload_len: icmp_repr.buffer_len(),
-                ttl:         64
-            };
-            Packet::Icmpv4(ipv4_reply_repr, icmp_repr)
-        } else {
-            // Do not send Protocol Unreachable ICMPv4 error responses to datagrams
-            // with a broadcast destination address.
-            Packet::None
-        }
-    }
-
     fn socket_ingress(&mut self, sockets: &mut SocketSet, timestamp: u64) -> Result<bool> {
         let mut processed_any = false;
         loop {
-            let frame =
-                match self.device.receive(timestamp) {
-                    Ok(frame) => frame,
-                    Err(Error::Exhausted) => break, // nothing to receive
-                    Err(err) => return Err(err)
-                };
-
-            let response =
-                self.process_ethernet(sockets, timestamp, &frame).map_err(|err| {
+            let &mut Self { ref mut device, ref mut inner } = self;
+            let (rx_token, tx_token) = match device.receive() {
+                None => break,
+                Some(tokens) => tokens,
+            };
+            let dispatch_result = rx_token.consume(timestamp, |frame| {
+                let response = inner.process_ethernet(sockets, timestamp, &frame).map_err(|err| {
                     net_debug!("cannot process ingress packet: {}", err);
                     if net_log_enabled!(debug) {
                         match EthernetFrame::new_checked(frame.as_ref()) {
@@ -207,9 +189,11 @@ impl<'a, 'b, 'c, DeviceT: Device + 'a> Interface<'a, 'b, 'c, DeviceT> {
                     }
                     err
                 })?;
-            processed_any = true;
+                processed_any = true;
 
-            self.dispatch(timestamp, response).map_err(|err| {
+                inner.dispatch(tx_token, timestamp, response)
+            });
+            dispatch_result.map_err(|err| {
                 net_debug!("cannot dispatch response packet: {}", err);
                 err
             })?;
@@ -223,24 +207,28 @@ impl<'a, 'b, 'c, DeviceT: Device + 'a> Interface<'a, 'b, 'c, DeviceT> {
 
         for mut socket in sockets.iter_mut() {
             let mut device_result = Ok(());
+            let &mut Self { ref mut device, ref mut inner } = self;
             let socket_result =
                 match *socket {
                     #[cfg(feature = "proto-raw")]
                     Socket::Raw(ref mut socket) =>
                         socket.dispatch(|response| {
-                            device_result = self.dispatch(timestamp, Packet::Raw(response));
+                            let tx_token = device.transmit().ok_or(Error::Exhausted)?;
+                            device_result = inner.dispatch(tx_token, timestamp, Packet::Raw(response));
                             device_result
                         }, &caps.checksum),
                     #[cfg(feature = "proto-udp")]
                     Socket::Udp(ref mut socket) =>
                         socket.dispatch(|response| {
-                            device_result = self.dispatch(timestamp, Packet::Udp(response));
+                            let tx_token = device.transmit().ok_or(Error::Exhausted)?;
+                            device_result = inner.dispatch(tx_token, timestamp, Packet::Udp(response));
                             device_result
                         }),
                     #[cfg(feature = "proto-tcp")]
                     Socket::Tcp(ref mut socket) =>
                         socket.dispatch(timestamp, &caps, |response| {
-                            device_result = self.dispatch(timestamp, Packet::Tcp(response));
+                            let tx_token = device.transmit().ok_or(Error::Exhausted)?;
+                            device_result = inner.dispatch(tx_token, timestamp, Packet::Tcp(response));
                             device_result
                         }),
                     Socket::__Nonexhaustive(_) => unreachable!()
@@ -256,14 +244,35 @@ impl<'a, 'b, 'c, DeviceT: Device + 'a> Interface<'a, 'b, 'c, DeviceT> {
                 (Ok(()), Ok(())) => ()
             }
         }
-
         Ok(())
     }
+}
+
+impl<'b, 'c> InterfaceInner<'b, 'c> {
+    fn check_ethernet_addr(addr: &EthernetAddress) {
+        if addr.is_multicast() {
+            panic!("Ethernet address {} is not unicast", addr)
+        }
+    }
+
+    fn check_ip_addrs(addrs: &[IpCidr]) {
+        for cidr in addrs {
+            if !cidr.address().is_unicast() {
+                panic!("IP address {} is not unicast", cidr.address())
+            }
+        }
+    }
+
+    /// Check whether the interface has the given IP address assigned.
+    fn has_ip_addr<T: Into<IpAddress>>(&self, addr: T) -> bool {
+        let addr = addr.into();
+        self.ip_addrs.iter().any(|probe| probe.address() == addr)
+    }
 
     fn process_ethernet<'frame, T: AsRef<[u8]>>
-                       (&mut self, sockets: &mut SocketSet, timestamp: u64,
-                        frame: &'frame T) ->
-                       Result<Packet<'frame>> {
+                       (&mut self, sockets: &mut SocketSet, timestamp: u64, frame: &'frame T) ->
+                       Result<Packet<'frame>>
+    {
         let eth_frame = EthernetFrame::new_checked(frame)?;
 
         // Ignore any packets not directed to our hardware address.
@@ -284,7 +293,8 @@ impl<'a, 'b, 'c, DeviceT: Device + 'a> Interface<'a, 'b, 'c, DeviceT> {
 
     fn process_arp<'frame, T: AsRef<[u8]>>
                   (&mut self, eth_frame: &EthernetFrame<&'frame T>) ->
-                  Result<Packet<'frame>> {
+                  Result<Packet<'frame>>
+    {
         let arp_packet = ArpPacket::new_checked(eth_frame.payload())?;
         let arp_repr = ArpRepr::parse(&arp_packet)?;
 
@@ -324,9 +334,10 @@ impl<'a, 'b, 'c, DeviceT: Device + 'a> Interface<'a, 'b, 'c, DeviceT> {
     fn process_ipv4<'frame, T: AsRef<[u8]>>
                    (&mut self, sockets: &mut SocketSet, _timestamp: u64,
                     eth_frame: &EthernetFrame<&'frame T>) ->
-                   Result<Packet<'frame>> {
+                   Result<Packet<'frame>>
+    {
         let ipv4_packet = Ipv4Packet::new_checked(eth_frame.payload())?;
-        let checksum_caps = self.device.capabilities().checksum;
+        let checksum_caps = self.device_capabilities.checksum.clone();
         let ipv4_repr = Ipv4Repr::parse(&ipv4_packet, &checksum_caps)?;
 
         if !ipv4_repr.src_addr.is_unicast() {
@@ -386,7 +397,7 @@ impl<'a, 'b, 'c, DeviceT: Device + 'a> Interface<'a, 'b, 'c, DeviceT> {
             _ => {
                 // Send back as much of the original payload as we can
                 let payload_len = cmp::min(
-                    ip_payload.len(), self.device.capabilities().max_transmission_unit);
+                    ip_payload.len(), self.device_capabilities.max_transmission_unit);
                 let icmp_reply_repr = Icmpv4Repr::DstUnreachable {
                     reason: Icmpv4DstUnreachable::ProtoUnreachable,
                     header: ipv4_repr,
@@ -398,9 +409,10 @@ impl<'a, 'b, 'c, DeviceT: Device + 'a> Interface<'a, 'b, 'c, DeviceT> {
     }
 
     fn process_icmpv4<'frame>(&self, ipv4_repr: Ipv4Repr, ip_payload: &'frame [u8]) ->
-                             Result<Packet<'frame>> {
+                             Result<Packet<'frame>>
+    {
         let icmp_packet = Icmpv4Packet::new_checked(ip_payload)?;
-        let checksum_caps = self.device.capabilities().checksum;
+        let checksum_caps = self.device_capabilities.checksum.clone();
         let icmp_repr = Icmpv4Repr::parse(&icmp_packet, &checksum_caps)?;
 
         match icmp_repr {
@@ -422,13 +434,33 @@ impl<'a, 'b, 'c, DeviceT: Device + 'a> Interface<'a, 'b, 'c, DeviceT> {
         }
     }
 
+    fn icmpv4_reply<'frame, 'icmp: 'frame>
+                   (&self, ipv4_repr: Ipv4Repr, icmp_repr: Icmpv4Repr<'icmp>) ->
+                   Packet<'frame>
+    {
+        if ipv4_repr.dst_addr.is_unicast() {
+            let ipv4_reply_repr = Ipv4Repr {
+                src_addr:    ipv4_repr.dst_addr,
+                dst_addr:    ipv4_repr.src_addr,
+                protocol:    IpProtocol::Icmp,
+                payload_len: icmp_repr.buffer_len(),
+                ttl:         64
+            };
+            Packet::Icmpv4(ipv4_reply_repr, icmp_repr)
+        } else {
+            // Do not send any ICMP replies to a broadcast destination address.
+            Packet::None
+        }
+    }
+
     #[cfg(feature = "proto-udp")]
     fn process_udp<'frame>(&self, sockets: &mut SocketSet,
                            ip_repr: IpRepr, ip_payload: &'frame [u8]) ->
-                          Result<Packet<'frame>> {
+                          Result<Packet<'frame>>
+    {
         let (src_addr, dst_addr) = (ip_repr.src_addr(), ip_repr.dst_addr());
         let udp_packet = UdpPacket::new_checked(ip_payload)?;
-        let checksum_caps = self.device.capabilities().checksum;
+        let checksum_caps = self.device_capabilities.checksum.clone();
         let udp_repr = UdpRepr::parse(&udp_packet, &src_addr, &dst_addr, &checksum_caps)?;
 
         for mut udp_socket in sockets.iter_mut().filter_map(UdpSocket::downcast) {
@@ -447,7 +479,7 @@ impl<'a, 'b, 'c, DeviceT: Device + 'a> Interface<'a, 'b, 'c, DeviceT> {
             IpRepr::Ipv4(ipv4_repr) => {
                 // Send back as much of the original payload as we can
                 let payload_len = cmp::min(
-                    ip_payload.len(), self.device.capabilities().max_transmission_unit);
+                    ip_payload.len(), self.device_capabilities.max_transmission_unit);
                 let icmpv4_reply_repr = Icmpv4Repr::DstUnreachable {
                     reason: Icmpv4DstUnreachable::PortUnreachable,
                     header: ipv4_repr,
@@ -464,10 +496,11 @@ impl<'a, 'b, 'c, DeviceT: Device + 'a> Interface<'a, 'b, 'c, DeviceT> {
     #[cfg(feature = "proto-tcp")]
     fn process_tcp<'frame>(&self, sockets: &mut SocketSet, timestamp: u64,
                            ip_repr: IpRepr, ip_payload: &'frame [u8]) ->
-                          Result<Packet<'frame>> {
+                          Result<Packet<'frame>>
+    {
         let (src_addr, dst_addr) = (ip_repr.src_addr(), ip_repr.dst_addr());
         let tcp_packet = TcpPacket::new_checked(ip_payload)?;
-        let checksum_caps = self.device.capabilities().checksum;
+        let checksum_caps = self.device_capabilities.checksum.clone();
         let tcp_repr = TcpRepr::parse(&tcp_packet, &src_addr, &dst_addr, &checksum_caps)?;
 
         for mut tcp_socket in sockets.iter_mut().filter_map(TcpSocket::downcast) {
@@ -491,8 +524,11 @@ impl<'a, 'b, 'c, DeviceT: Device + 'a> Interface<'a, 'b, 'c, DeviceT> {
         }
     }
 
-    fn dispatch(&mut self, timestamp: u64, packet: Packet) -> Result<()> {
-        let checksum_caps = self.device.capabilities().checksum;
+    fn dispatch<Tx>(&mut self, tx_token: Tx, timestamp: u64,
+                    packet: Packet) -> Result<()>
+        where Tx: TxToken
+    {
+        let checksum_caps = self.device_capabilities.checksum.clone();
         match packet {
             Packet::Arp(arp_repr) => {
                 let dst_hardware_addr =
@@ -501,7 +537,7 @@ impl<'a, 'b, 'c, DeviceT: Device + 'a> Interface<'a, 'b, 'c, DeviceT> {
                         _ => unreachable!()
                     };
 
-                self.dispatch_ethernet(timestamp, arp_repr.buffer_len(), |mut frame| {
+                self.dispatch_ethernet(tx_token, timestamp, arp_repr.buffer_len(), |mut frame| {
                     frame.set_dst_addr(dst_hardware_addr);
                     frame.set_ethertype(EthernetProtocol::Arp);
 
@@ -510,19 +546,19 @@ impl<'a, 'b, 'c, DeviceT: Device + 'a> Interface<'a, 'b, 'c, DeviceT> {
                 })
             },
             Packet::Icmpv4(ipv4_repr, icmpv4_repr) => {
-                self.dispatch_ip(timestamp, IpRepr::Ipv4(ipv4_repr), |_ip_repr, payload| {
+                self.dispatch_ip(tx_token, timestamp, IpRepr::Ipv4(ipv4_repr), |_ip_repr, payload| {
                     icmpv4_repr.emit(&mut Icmpv4Packet::new(payload), &checksum_caps);
                 })
             }
             #[cfg(feature = "proto-raw")]
             Packet::Raw((ip_repr, raw_packet)) => {
-                self.dispatch_ip(timestamp, ip_repr, |_ip_repr, payload| {
+                self.dispatch_ip(tx_token, timestamp, ip_repr, |_ip_repr, payload| {
                     payload.copy_from_slice(raw_packet);
                 })
             }
             #[cfg(feature = "proto-udp")]
             Packet::Udp((ip_repr, udp_repr)) => {
-                self.dispatch_ip(timestamp, ip_repr, |ip_repr, payload| {
+                self.dispatch_ip(tx_token, timestamp, ip_repr, |ip_repr, payload| {
                     udp_repr.emit(&mut UdpPacket::new(payload),
                                   &ip_repr.src_addr(), &ip_repr.dst_addr(),
                                   &checksum_caps);
@@ -530,8 +566,8 @@ impl<'a, 'b, 'c, DeviceT: Device + 'a> Interface<'a, 'b, 'c, DeviceT> {
             }
             #[cfg(feature = "proto-tcp")]
             Packet::Tcp((ip_repr, mut tcp_repr)) => {
-                let caps = self.device.capabilities();
-                self.dispatch_ip(timestamp, ip_repr, |ip_repr, payload| {
+                let caps = self.device_capabilities.clone();
+                self.dispatch_ip(tx_token, timestamp, ip_repr, |ip_repr, payload| {
                     // This is a terrible hack to make TCP performance more acceptable on systems
                     // where the TCP buffers are significantly larger than network buffers,
                     // e.g. a 64 kB TCP receive buffer (and so, when empty, a 64k window)
@@ -560,18 +596,20 @@ impl<'a, 'b, 'c, DeviceT: Device + 'a> Interface<'a, 'b, 'c, DeviceT> {
         }
     }
 
-    fn dispatch_ethernet<F>(&mut self, timestamp: u64, buffer_len: usize, f: F) -> Result<()>
-            where F: FnOnce(EthernetFrame<&mut [u8]>) {
+    fn dispatch_ethernet<Tx, F>(&mut self, tx_token: Tx, timestamp: u64,
+                                buffer_len: usize, f: F) -> Result<()>
+        where Tx: TxToken, F: FnOnce(EthernetFrame<&mut [u8]>)
+    {
         let tx_len = EthernetFrame::<&[u8]>::buffer_len(buffer_len);
-        let mut tx_buffer = self.device.transmit(timestamp, tx_len)?;
-        debug_assert!(tx_buffer.as_ref().len() == tx_len);
-
-        let mut frame = EthernetFrame::new(tx_buffer.as_mut());
-        frame.set_src_addr(self.ethernet_addr);
+        tx_token.consume(timestamp, tx_len, |tx_buffer| {
+            debug_assert!(tx_buffer.as_ref().len() == tx_len);
+            let mut frame = EthernetFrame::new(tx_buffer.as_mut());
+            frame.set_src_addr(self.ethernet_addr);
 
-        f(frame);
+            f(frame);
 
-        Ok(())
+            Ok(())
+        })
     }
 
     fn route(&self, addr: &IpAddress) -> Result<IpAddress> {
@@ -590,17 +628,19 @@ impl<'a, 'b, 'c, DeviceT: Device + 'a> Interface<'a, 'b, 'c, DeviceT> {
             })
     }
 
-    fn lookup_hardware_addr(&mut self, timestamp: u64,
-                            src_addr: &IpAddress, dst_addr: &IpAddress) ->
-                           Result<EthernetAddress> {
+    fn lookup_hardware_addr<Tx>(&mut self, tx_token: Tx, timestamp: u64,
+                                src_addr: &IpAddress, dst_addr: &IpAddress) ->
+                               Result<(EthernetAddress, Tx)>
+        where Tx: TxToken
+    {
         let dst_addr = self.route(dst_addr)?;
 
         if let Some(hardware_addr) = self.arp_cache.lookup(&dst_addr) {
-            return Ok(hardware_addr)
+            return Ok((hardware_addr,tx_token))
         }
 
         if dst_addr.is_broadcast() {
-            return Ok(EthernetAddress::BROADCAST)
+            return Ok((EthernetAddress::BROADCAST, tx_token))
         }
 
         match (src_addr, dst_addr) {
@@ -616,7 +656,7 @@ impl<'a, 'b, 'c, DeviceT: Device + 'a> Interface<'a, 'b, 'c, DeviceT> {
                     target_protocol_addr: dst_addr,
                 };
 
-                self.dispatch_ethernet(timestamp, arp_repr.buffer_len(), |mut frame| {
+                self.dispatch_ethernet(tx_token, timestamp, arp_repr.buffer_len(), |mut frame| {
                     frame.set_dst_addr(EthernetAddress::BROADCAST);
                     frame.set_ethertype(EthernetProtocol::Arp);
 
@@ -629,15 +669,18 @@ impl<'a, 'b, 'c, DeviceT: Device + 'a> Interface<'a, 'b, 'c, DeviceT> {
         }
     }
 
-    fn dispatch_ip<F>(&mut self, timestamp: u64, ip_repr: IpRepr, f: F) -> Result<()>
-            where F: FnOnce(IpRepr, &mut [u8]) {
+    fn dispatch_ip<Tx, F>(&mut self, tx_token: Tx, timestamp: u64,
+                          ip_repr: IpRepr, f: F) -> Result<()>
+        where Tx: TxToken, F: FnOnce(IpRepr, &mut [u8])
+    {
         let ip_repr = ip_repr.lower(&self.ip_addrs)?;
-        let checksum_caps = self.device.capabilities().checksum;
+        let checksum_caps = self.device_capabilities.checksum.clone();
 
-        let dst_hardware_addr =
-            self.lookup_hardware_addr(timestamp, &ip_repr.src_addr(), &ip_repr.dst_addr())?;
+        let (dst_hardware_addr, tx_token) =
+            self.lookup_hardware_addr(tx_token, timestamp,
+                                      &ip_repr.src_addr(), &ip_repr.dst_addr())?;
 
-        self.dispatch_ethernet(timestamp, ip_repr.total_len(), |mut frame| {
+        self.dispatch_ethernet(tx_token, timestamp, ip_repr.total_len(), |mut frame| {
             frame.set_dst_addr(dst_hardware_addr);
             match ip_repr {
                 IpRepr::Ipv4(_) => frame.set_ethertype(EthernetProtocol::Ipv4),
@@ -656,7 +699,7 @@ impl<'a, 'b, 'c, DeviceT: Device + 'a> Interface<'a, 'b, 'c, DeviceT> {
 mod test {
     use std::boxed::Box;
     use super::Packet;
-    use phy::{Loopback, ChecksumCapabilities};
+    use phy::{self, Loopback, ChecksumCapabilities};
     use wire::{ArpOperation, ArpPacket, ArpRepr};
     use wire::{EthernetAddress, EthernetFrame, EthernetProtocol};
     use wire::{IpAddress, IpCidr, IpProtocol, IpRepr};
@@ -665,6 +708,7 @@ mod test {
     use wire::{UdpPacket, UdpRepr};
     use iface::{ArpCache, SliceArpCache, EthernetInterface};
     use socket::SocketSet;
+    use {Result, Error};
 
     fn create_loopback<'a, 'b>() ->
             (EthernetInterface<'a, 'static, 'b, Loopback>, SocketSet<'static, 'a, 'b>) {
@@ -679,6 +723,16 @@ mod test {
             EthernetAddress::default(), [ip_addr], None), SocketSet::new(vec![]))
     }
 
+    #[derive(Debug, PartialEq)]
+    struct MockTxToken;
+
+    impl phy::TxToken for MockTxToken {
+        fn consume<R, F>(self, _: u64, _: usize, _: F) -> Result<R>
+                where F: FnOnce(&mut [u8]) -> Result<R> {
+            Err(Error::__Nonexhaustive)
+        }
+    }
+
     #[test]
     fn no_icmp_to_broadcast() {
         let (mut iface, mut socket_set) = create_loopback();
@@ -710,7 +764,7 @@ mod test {
         // Ensure that the unknown protocol frame does not trigger an
         // ICMP error response when the destination address is a
         // broadcast address
-        assert_eq!(iface.process_ipv4(&mut socket_set, 0, &frame),
+        assert_eq!(iface.inner.process_ipv4(&mut socket_set, 0, &frame),
                    Ok(Packet::None));
     }
 
@@ -767,7 +821,7 @@ mod test {
 
         // Ensure that the unknown protocol triggers an error response.
         // And we correctly handle no payload.
-        assert_eq!(iface.process_ipv4(&mut socket_set, 0, &frame),
+        assert_eq!(iface.inner.process_ipv4(&mut socket_set, 0, &frame),
                    Ok(expected_repr));
     }
 
@@ -832,7 +886,7 @@ mod test {
 
         // Ensure that the unknown protocol triggers an error response.
         // And we correctly handle no payload.
-        assert_eq!(iface.process_udp(&mut socket_set, ip_repr, data),
+        assert_eq!(iface.inner.process_udp(&mut socket_set, ip_repr, data),
                    Ok(expected_repr));
 
         let ip_repr = IpRepr::Ipv4(Ipv4Repr {
@@ -851,8 +905,8 @@ mod test {
         // Ensure that the port unreachable error does not trigger an
         // ICMP error response when the destination address is a
         // broadcast address
-        assert_eq!(iface.process_udp(&mut socket_set, ip_repr, packet_broadcast.into_inner()),
-                   Ok(Packet::None));
+        assert_eq!(iface.inner.process_udp(&mut socket_set, ip_repr,
+                   packet_broadcast.into_inner()), Ok(Packet::None));
 
     }
 
@@ -885,7 +939,7 @@ mod test {
         }
 
         // Ensure an ARP Request for us triggers an ARP Reply
-        assert_eq!(iface.process_ethernet(&mut socket_set, 0, frame.into_inner()),
+        assert_eq!(iface.inner.process_ethernet(&mut socket_set, 0, frame.into_inner()),
                    Ok(Packet::Arp(ArpRepr::EthernetIpv4 {
                        operation: ArpOperation::Reply,
                        source_hardware_addr: local_hw_addr,
@@ -895,10 +949,9 @@ mod test {
                    })));
 
         // Ensure the address of the requestor was entered in the cache
-        assert_eq!(iface.lookup_hardware_addr(0,
-                                              &IpAddress::Ipv4(local_ip_addr),
-                                              &IpAddress::Ipv4(remote_ip_addr)),
-                   Ok(remote_hw_addr));
+        assert_eq!(iface.inner.lookup_hardware_addr(MockTxToken, 0,
+            &IpAddress::Ipv4(local_ip_addr), &IpAddress::Ipv4(remote_ip_addr)),
+            Ok((remote_hw_addr, MockTxToken)));
     }
 
     #[test]
@@ -928,13 +981,13 @@ mod test {
         }
 
         // Ensure an ARP Request for someone else does not trigger an ARP Reply
-        assert_eq!(iface.process_ethernet(&mut socket_set, 0, frame.into_inner()),
+        assert_eq!(iface.inner.process_ethernet(&mut socket_set, 0, frame.into_inner()),
                    Ok(Packet::None));
 
         // Ensure the address of the requestor was entered in the cache
-        assert_eq!(iface.lookup_hardware_addr(0,
-                                              &IpAddress::Ipv4(Ipv4Address([0x7f, 0x00, 0x00, 0x01])),
-                                              &IpAddress::Ipv4(remote_ip_addr)),
-                   Ok(remote_hw_addr));
+        assert_eq!(iface.inner.lookup_hardware_addr(MockTxToken, 0,
+            &IpAddress::Ipv4(Ipv4Address([0x7f, 0x00, 0x00, 0x01])),
+            &IpAddress::Ipv4(remote_ip_addr)),
+            Ok((remote_hw_addr, MockTxToken)));
     }
 }

+ 103 - 79
src/phy/fault_injector.rs

@@ -1,5 +1,7 @@
+use core::cell::RefCell;
+
 use {Error, Result};
-use super::{DeviceCapabilities, Device};
+use phy::{self, DeviceCapabilities, Device};
 
 // We use our own RNG to stay compatible with #![no_std].
 // The use of the RNG below has a slight bias, but it doesn't matter.
@@ -26,7 +28,7 @@ struct Config {
     interval:    u64,
 }
 
-#[derive(Debug, Clone, Copy)]
+#[derive(Debug, Clone)]
 struct State {
     rng_seed:    u32,
     refilled_at: u64,
@@ -86,13 +88,13 @@ impl State {
 /// adverse network conditions (such as random packet loss or corruption), or software
 /// or hardware limitations (such as a limited number or size of usable network buffers).
 #[derive(Debug)]
-pub struct FaultInjector<D: Device> {
-    inner:  D,
-    state:  State,
-    config: Config
+pub struct FaultInjector<D: for<'a> Device<'a>> {
+    inner:      D,
+    state:      RefCell<State>,
+    config:     Config,
 }
 
-impl<D: Device> FaultInjector<D> {
+impl<D: for<'a> Device<'a>> FaultInjector<D> {
     /// Create a fault injector device, using the given random number generator seed.
     pub fn new(inner: D, seed: u32) -> FaultInjector<D> {
         let state = State {
@@ -103,8 +105,8 @@ impl<D: Device> FaultInjector<D> {
         };
         FaultInjector {
             inner: inner,
-            state: state,
-            config: Config::default()
+            state: RefCell::new(state),
+            config: Config::default(),
         }
     }
 
@@ -178,15 +180,16 @@ impl<D: Device> FaultInjector<D> {
 
     /// Set the interval for packet rate limiting, in milliseconds.
     pub fn set_bucket_interval(&mut self, interval: u64) {
-        self.state.refilled_at = 0;
+        self.state.borrow_mut().refilled_at = 0;
         self.config.interval = interval
     }
 }
 
-impl<D: Device> Device for FaultInjector<D>
-        where D::RxBuffer: AsMut<[u8]> {
-    type RxBuffer = D::RxBuffer;
-    type TxBuffer = TxBuffer<D::TxBuffer>;
+impl<'a, D> Device<'a> for FaultInjector<D>
+    where D: for<'b> Device<'b>,
+{
+    type RxToken = RxToken<'a, <D as Device<'a>>::RxToken>;
+    type TxToken = TxToken<'a, <D as Device<'a>>::TxToken>;
 
     fn capabilities(&self) -> DeviceCapabilities {
         let mut caps = self.inner.capabilities();
@@ -196,88 +199,109 @@ impl<D: Device> Device for FaultInjector<D>
         caps
     }
 
-    fn receive(&mut self, timestamp: u64) -> Result<Self::RxBuffer> {
-        let mut buffer = self.inner.receive(timestamp)?;
-        if self.state.maybe(self.config.drop_pct) {
+    fn receive(&'a mut self) -> Option<(Self::RxToken, Self::TxToken)> {
+        let &mut Self { ref mut inner, ref state, config } = self;
+        inner.receive().map(|(rx_token, tx_token)| {
+            let rx = RxToken {
+                state:   &state,
+                config:  config,
+                token:   rx_token,
+                corrupt: [0; MTU],
+            };
+            let tx = TxToken {
+                state:   &state,
+                config:  config,
+                token:   tx_token,
+                junk:    [0; MTU],
+            };
+            (rx, tx)
+        })
+    }
+
+    fn transmit(&'a mut self) -> Option<Self::TxToken> {
+        let &mut Self { ref mut inner, ref state, config } = self;
+        inner.transmit().map(|token| TxToken {
+            state:  &state,
+            config: config,
+            token: token,
+            junk:   [0; MTU],
+        })
+    }
+}
+
+#[doc(hidden)]
+pub struct RxToken<'a, Rx: phy::RxToken> {
+    state:   &'a RefCell<State>,
+    config:  Config,
+    token:   Rx,
+    corrupt: [u8; MTU],
+}
+
+impl<'a, Rx: phy::RxToken> phy::RxToken for RxToken<'a, Rx> {
+    fn consume<R, F: FnOnce(&[u8]) -> Result<R>>(self, timestamp: u64, f: F) -> Result<R> {
+        if self.state.borrow_mut().maybe(self.config.drop_pct) {
             net_trace!("rx: randomly dropping a packet");
             return Err(Error::Exhausted)
         }
-        if self.state.maybe(self.config.corrupt_pct) {
-            net_trace!("rx: randomly corrupting a packet");
-            self.state.corrupt(&mut buffer)
-        }
-        if self.config.max_size > 0 && buffer.as_ref().len() > self.config.max_size {
-            net_trace!("rx: dropping a packet that is too large");
-            return Err(Error::Exhausted)
-        }
-        if !self.state.maybe_receive(&self.config, timestamp) {
+        if !self.state.borrow_mut().maybe_receive(&self.config, timestamp) {
             net_trace!("rx: dropping a packet because of rate limiting");
             return Err(Error::Exhausted)
         }
-        Ok(buffer)
-    }
-
-    fn transmit(&mut self, timestamp: u64, length: usize) -> Result<Self::TxBuffer> {
-        let buffer;
-        if self.state.maybe(self.config.drop_pct) {
-            net_trace!("tx: randomly dropping a packet");
-            buffer = None;
-        } else if self.config.max_size > 0 && length > self.config.max_size {
-            net_trace!("tx: dropping a packet that is too large");
-            buffer = None;
-        } else if !self.state.maybe_transmit(&self.config, timestamp) {
-            net_trace!("tx: dropping a packet because of rate limiting");
-            buffer = None;
-        } else {
-            buffer = Some(self.inner.transmit(timestamp, length)?);
-        }
-        Ok(TxBuffer {
-            buffer: buffer,
-            state:  self.state.clone(),
-            config: self.config,
-            junk:   [0; MTU],
-            length: length
+        let Self { token, config, state, mut corrupt } = self;
+        token.consume(timestamp, |buffer| {
+            if config.max_size > 0 && buffer.as_ref().len() > config.max_size {
+                net_trace!("rx: dropping a packet that is too large");
+                return Err(Error::Exhausted)
+            }
+            if state.borrow_mut().maybe(config.corrupt_pct) {
+                net_trace!("rx: randomly corrupting a packet");
+                let mut corrupt = &mut corrupt[..buffer.len()];
+                corrupt.copy_from_slice(buffer);
+                state.borrow_mut().corrupt(&mut corrupt);
+                f(&mut corrupt)
+            } else {
+                f(buffer)
+            }
         })
     }
 }
 
 #[doc(hidden)]
-pub struct TxBuffer<B: AsRef<[u8]> + AsMut<[u8]>> {
-    state:  State,
+pub struct TxToken<'a, Tx: phy::TxToken> {
+    state:  &'a RefCell<State>,
     config: Config,
-    buffer: Option<B>,
+    token:  Tx,
     junk:   [u8; MTU],
-    length: usize
 }
 
-impl<B: AsRef<[u8]> + AsMut<[u8]>> AsRef<[u8]> for TxBuffer<B> {
-    fn as_ref(&self) -> &[u8] {
-        match self.buffer {
-            Some(ref buf) => buf.as_ref(),
-            None => &self.junk[..self.length]
-        }
-    }
-}
+impl<'a, Tx: phy::TxToken> phy::TxToken for TxToken<'a, Tx> {
+    fn consume<R, F: FnOnce(&mut [u8]) -> Result<R>>(mut self, timestamp: u64, len: usize, f: F)
+        -> Result<R>
+    {
+        let drop = if self.state.borrow_mut().maybe(self.config.drop_pct) {
+            net_trace!("tx: randomly dropping a packet");
+            true
+        } else if self.config.max_size > 0 && len > self.config.max_size {
+            net_trace!("tx: dropping a packet that is too large");
+            true
+        } else if !self.state.borrow_mut().maybe_transmit(&self.config, timestamp) {
+            net_trace!("tx: dropping a packet because of rate limiting");
+            true
+        } else {
+            false
+        };
 
-impl<B: AsRef<[u8]> + AsMut<[u8]>> AsMut<[u8]> for TxBuffer<B> {
-    fn as_mut(&mut self) -> &mut [u8] {
-        match self.buffer {
-            Some(ref mut buf) => buf.as_mut(),
-            None => &mut self.junk[..self.length]
+        if drop {
+            return f(&mut self.junk);
         }
-    }
-}
 
-impl<B: AsRef<[u8]> + AsMut<[u8]>> Drop for TxBuffer<B> {
-    fn drop(&mut self) {
-        match self.buffer {
-            Some(ref mut buf) => {
-                if self.state.maybe(self.config.corrupt_pct) {
-                    net_trace!("tx: corrupting a packet");
-                    self.state.corrupt(buf)
-                }
-            },
-            None => ()
-        }
+        let Self { token, state, config, .. } = self;
+        token.consume(timestamp, len, |mut buf| {
+            if state.borrow_mut().maybe(config.corrupt_pct) {
+                net_trace!("tx: corrupting a packet");
+                state.borrow_mut().corrupt(&mut buf)
+            }
+            f(buf)
+        })
     }
 }

+ 37 - 35
src/phy/loopback.rs

@@ -1,9 +1,3 @@
-use core::mem::swap;
-use core::cell::RefCell;
-#[cfg(feature = "std")]
-use std::rc::Rc;
-#[cfg(feature = "alloc")]
-use alloc::rc::Rc;
 #[cfg(feature = "std")]
 use std::vec::Vec;
 #[cfg(feature = "std")]
@@ -11,12 +5,14 @@ use std::collections::VecDeque;
 #[cfg(feature = "alloc")]
 use alloc::{Vec, VecDeque};
 
-use {Error, Result};
-use super::{Device, DeviceCapabilities};
+use Result;
+use phy::{self, Device, DeviceCapabilities};
 
 /// A loopback device.
 #[derive(Debug)]
-pub struct Loopback(Rc<RefCell<VecDeque<Vec<u8>>>>);
+pub struct Loopback {
+    queue: VecDeque<Vec<u8>>,
+}
 
 impl Loopback {
     /// Creates a loopback device.
@@ -24,13 +20,15 @@ impl Loopback {
     /// Every packet transmitted through this device will be received through it
     /// in FIFO order.
     pub fn new() -> Loopback {
-        Loopback(Rc::new(RefCell::new(VecDeque::new())))
+        Loopback {
+            queue: VecDeque::new(),
+        }
     }
 }
 
-impl Device for Loopback {
-    type RxBuffer = Vec<u8>;
-    type TxBuffer = TxBuffer;
+impl<'a> Device<'a> for Loopback {
+    type RxToken = RxToken;
+    type TxToken = TxToken<'a>;
 
     fn capabilities(&self) -> DeviceCapabilities {
         DeviceCapabilities {
@@ -39,41 +37,45 @@ impl Device for Loopback {
         }
     }
 
-    fn receive(&mut self, _timestamp: u64) -> Result<Self::RxBuffer> {
-        match self.0.borrow_mut().pop_front() {
-            Some(packet) => Ok(packet),
-            None => Err(Error::Exhausted)
-        }
+    fn receive(&'a mut self) -> Option<(Self::RxToken, Self::TxToken)> {
+        self.queue.pop_front().map(move |buffer| {
+            let rx = RxToken { buffer: buffer };
+            let tx = TxToken { queue: &mut self.queue };
+            (rx, tx)
+        })
     }
 
-    fn transmit(&mut self, _timestamp: u64, length: usize) -> Result<Self::TxBuffer> {
-        let mut buffer = Vec::new();
-        buffer.resize(length, 0);
-        Ok(TxBuffer {
-            queue:  self.0.clone(),
-            buffer: buffer
+    fn transmit(&'a mut self) -> Option<Self::TxToken> {
+        Some(TxToken {
+            queue: &mut self.queue,
         })
     }
 }
 
 #[doc(hidden)]
-pub struct TxBuffer {
-    queue:  Rc<RefCell<VecDeque<Vec<u8>>>>,
-    buffer: Vec<u8>
+pub struct RxToken {
+    buffer: Vec<u8>,
 }
 
-impl AsRef<[u8]> for TxBuffer {
-    fn as_ref(&self) -> &[u8] { self.buffer.as_ref() }
+impl phy::RxToken for RxToken {
+    fn consume<R, F: FnOnce(&[u8]) -> Result<R>>(self, _timestamp: u64, f: F) -> Result<R> {
+        f(&self.buffer)
+    }
 }
 
-impl AsMut<[u8]> for TxBuffer {
-    fn as_mut(&mut self) -> &mut [u8] { self.buffer.as_mut() }
+#[doc(hidden)]
+pub struct TxToken<'a> {
+    queue: &'a mut VecDeque<Vec<u8>>,
 }
 
-impl Drop for TxBuffer {
-    fn drop(&mut self) {
+impl<'a> phy::TxToken for TxToken<'a> {
+    fn consume<R, F>(self, _timestamp: u64, len: usize, f: F) -> Result<R>
+        where F: FnOnce(&mut [u8]) -> Result<R>
+    {
         let mut buffer = Vec::new();
-        swap(&mut buffer, &mut self.buffer);
-        self.queue.borrow_mut().push_back(buffer)
+        buffer.resize(len, 0);
+        let result = f(&mut buffer);
+        self.queue.push_back(buffer);
+        result
     }
 }

+ 82 - 78
src/phy/mod.rs

@@ -19,87 +19,66 @@
 //!
 /*!
 ```rust
-use std::slice;
-use smoltcp::{Error, Result};
-use smoltcp::phy::{DeviceCapabilities, Device};
+use smoltcp::Result;
+use smoltcp::phy::{self, DeviceCapabilities, Device};
 
-const TX_BUFFERS: [*mut u8; 2] = [0x10000000 as *mut u8, 0x10001000 as *mut u8];
-const RX_BUFFERS: [*mut u8; 2] = [0x10002000 as *mut u8, 0x10003000 as *mut u8];
-
-fn rx_full() -> bool {
-    /* platform-specific code to check if an incoming packet has arrived */
-    false
-}
-
-fn rx_setup(_buf: *mut u8, _length: &mut usize) {
-    /* platform-specific code to receive a packet into a buffer */
+struct StmPhy {
+    rx_buffer: [u8; 1536],
+    tx_buffer: [u8; 1536],
 }
 
-fn tx_empty() -> bool {
-    /* platform-specific code to check if an outgoing packet can be sent */
-    false
+impl<'a> StmPhy {
+    fn new() -> StmPhy {
+        StmPhy {
+            rx_buffer: [0; 1536],
+            tx_buffer: [0; 1536],
+        }
+    }
 }
 
-fn tx_setup(_buf: *const u8, _length: usize) {
-    /* platform-specific code to send a buffer with a packet */
-}
+impl<'a> phy::Device<'a> for StmPhy {
+    type RxToken = StmPhyRxToken<'a>;
+    type TxToken = StmPhyTxToken<'a>;
 
-# #[allow(dead_code)]
-pub struct EthernetDevice {
-    tx_next: usize,
-    rx_next: usize
-}
+    fn receive(&'a mut self) -> Option<(Self::RxToken, Self::TxToken)> {
+        Some((StmPhyRxToken(&mut self.rx_buffer[..]),
+              StmPhyTxToken(&mut self.tx_buffer[..])))
+    }
 
-impl Device for EthernetDevice {
-    type RxBuffer = &'static [u8];
-    type TxBuffer = EthernetTxBuffer;
+    fn transmit(&'a mut self) -> Option<Self::TxToken> {
+        Some(StmPhyTxToken(&mut self.tx_buffer[..]))
+    }
 
     fn capabilities(&self) -> DeviceCapabilities {
         let mut caps = DeviceCapabilities::default();
         caps.max_transmission_unit = 1536;
-        caps.max_burst_size = Some(2);
+        caps.max_burst_size = Some(1);
         caps
     }
-
-    fn receive(&mut self, _timestamp: u64) -> Result<Self::RxBuffer> {
-        if rx_full() {
-            let index = self.rx_next;
-            self.rx_next = (self.rx_next + 1) % RX_BUFFERS.len();
-            let mut length = 0;
-            rx_setup(RX_BUFFERS[self.rx_next], &mut length);
-            Ok(unsafe {
-                slice::from_raw_parts(RX_BUFFERS[index], length)
-            })
-        } else {
-            Err(Error::Exhausted)
-        }
-    }
-
-    fn transmit(&mut self, _timestamp: u64, length: usize) -> Result<Self::TxBuffer> {
-        if tx_empty() {
-            let index = self.tx_next;
-            self.tx_next = (self.tx_next + 1) % TX_BUFFERS.len();
-            Ok(EthernetTxBuffer(unsafe {
-                slice::from_raw_parts_mut(TX_BUFFERS[index], length)
-            }))
-        } else {
-            Err(Error::Exhausted)
-        }
-    }
 }
 
-pub struct EthernetTxBuffer(&'static mut [u8]);
+struct StmPhyRxToken<'a>(&'a [u8]);
 
-impl AsRef<[u8]> for EthernetTxBuffer {
-    fn as_ref(&self) -> &[u8] { self.0 }
+impl<'a> phy::RxToken for StmPhyRxToken<'a> {
+    fn consume<R, F: FnOnce(&[u8]) -> Result<R>>(self, _timestamp: u64, f: F) -> Result<R> {
+        // TODO: receive packet into buffer
+        let result = f(self.0);
+        println!("rx called");
+        result
+    }
 }
 
-impl AsMut<[u8]> for EthernetTxBuffer {
-    fn as_mut(&mut self) -> &mut [u8] { self.0 }
-}
+struct StmPhyTxToken<'a>(&'a mut [u8]);
 
-impl Drop for EthernetTxBuffer {
-    fn drop(&mut self) { tx_setup(self.0.as_ptr(), self.0.len()) }
+impl<'a> phy::TxToken for StmPhyTxToken<'a> {
+    fn consume<R, F: FnOnce(&mut [u8]) -> Result<R>>(self, _timestamp: u64, len: usize, f: F)
+        -> Result<R>
+    {
+        let result = f(&mut self.0[..len]);
+        println!("tx called {}", len);
+        // TODO: send packet out
+        result
+    }
 }
 ```
 */
@@ -229,27 +208,52 @@ pub struct DeviceCapabilities {
 
 /// An interface for sending and receiving raw network frames.
 ///
-/// It is expected that a `Device` implementation would allocate memory for both sending
-/// and receiving packets from memory pools; hence, the stack borrows the buffer for a packet
-/// that it is about to receive, as well for a packet that it is about to send, from the device.
-pub trait Device {
-    type RxBuffer: AsRef<[u8]>;
-    type TxBuffer: AsRef<[u8]> + AsMut<[u8]>;
+/// The interface is based on _tokens_, which are types that allow to receive/transmit a
+/// single packet. The `receive` and `transmit` functions only construct such tokens, the
+/// real sending/receiving operation are performed when the tokens are consumed.
+pub trait Device<'a> {
+    type RxToken: RxToken + 'a;
+    type TxToken: TxToken + 'a;
+
+    /// Construct a token pair consisting of one receive token and one transmit token.
+    ///
+    /// The additional transmit token makes it possible to generate a reply packet based
+    /// on the contents of the received packet. For example, this makes it possible to
+    /// handle arbitrarily large ICMP echo ("ping") requests, where the all received bytes
+    /// need to be sent back, without heap allocation.
+    fn receive(&'a mut self) -> Option<(Self::RxToken, Self::TxToken)>;
+
+    /// Construct a transmit token.
+    fn transmit(&'a mut self) -> Option<Self::TxToken>;
 
     /// Get a description of device capabilities.
     fn capabilities(&self) -> DeviceCapabilities;
+}
 
-    /// Receive a frame.
+/// A token to receive a single network packet.
+pub trait RxToken {
+    /// Consumes the token to receive a single network packet.
     ///
-    /// It is expected that a `receive` implementation, once a packet is written to memory
-    /// through DMA, would gain ownership of the underlying buffer, provide it for parsing,
-    /// and return it to the network device once it is dropped.
-    fn receive(&mut self, timestamp: u64) -> Result<Self::RxBuffer>;
+    /// This method receives a packet and then calls the given closure `f` with the raw
+    /// packet bytes as argument.
+    ///
+    /// The timestamp must be a number of milliseconds, monotonically increasing since an
+    /// arbitrary moment in time, such as system startup.
+    fn consume<R, F>(self, timestamp: u64, f: F) -> Result<R>
+        where F: FnOnce(&[u8]) -> Result<R>;
+}
 
-    /// Transmit a frame.
+/// A token to transmit a single network packet.
+pub trait TxToken {
+    /// Consumes the token to send a single network packet.
+    ///
+    /// This method constructs a transmit buffer of size `len` and calls the passed
+    /// closure `f` with a mutable reference to that buffer. The closure should construct
+    /// a valid network packet (e.g. an ethernet packet) in the buffer. When the closure
+    /// returns, the transmit buffer is sent out.
     ///
-    /// It is expected that a `transmit` implementation would gain ownership of a buffer with
-    /// the requested length, provide it for emission, and schedule it to be read from
-    /// memory by the network device once it is dropped.
-    fn transmit(&mut self, timestamp: u64, length: usize) -> Result<Self::TxBuffer>;
+    /// The timestamp must be a number of milliseconds, monotonically increasing since an
+    /// arbitrary moment in time, such as system startup.
+    fn consume<R, F>(self, timestamp: u64, len: usize, f: F) -> Result<R>
+        where F: FnOnce(&mut [u8]) -> Result<R>;
 }

+ 60 - 37
src/phy/pcap_writer.rs

@@ -5,7 +5,7 @@ use std::io::Write;
 use byteorder::{ByteOrder, NativeEndian};
 
 use Result;
-use super::{DeviceCapabilities, Device};
+use phy::{self, DeviceCapabilities, Device};
 
 enum_with_unknown! {
     /// Captured packet header type.
@@ -114,13 +114,16 @@ impl<T: AsMut<Write>> PcapSink for RefCell<T> {
 /// [libpcap]: https://wiki.wireshark.org/Development/LibpcapFileFormat
 /// [sink]: trait.PcapSink.html
 #[derive(Debug)]
-pub struct PcapWriter<D: Device, S: PcapSink + Clone> {
+pub struct PcapWriter<D, S>
+    where D: for<'a> Device<'a>,
+          S: PcapSink + Clone,
+{
     lower: D,
     sink:  S,
-    mode:  PcapMode
+    mode:  PcapMode,
 }
 
-impl<D: Device, S: PcapSink + Clone> PcapWriter<D, S> {
+impl<D: for<'a> Device<'a>, S: PcapSink + Clone> PcapWriter<D, S> {
     /// Creates a packet capture writer.
     pub fn new(lower: D, sink: S, mode: PcapMode, link_type: PcapLinkType) -> PcapWriter<D, S> {
         sink.global_header(link_type);
@@ -128,53 +131,73 @@ impl<D: Device, S: PcapSink + Clone> PcapWriter<D, S> {
     }
 }
 
-impl<D: Device, S: PcapSink + Clone> Device for PcapWriter<D, S> {
-    type RxBuffer = D::RxBuffer;
-    type TxBuffer = TxBuffer<D::TxBuffer, S>;
+impl<'a, D, S> Device<'a> for PcapWriter<D, S>
+    where D: for<'b> Device<'b>,
+          S: PcapSink + Clone + 'a,
+{
+    type RxToken = RxToken<<D as Device<'a>>::RxToken, S>;
+    type TxToken = TxToken<<D as Device<'a>>::TxToken, S>;
 
     fn capabilities(&self) -> DeviceCapabilities { self.lower.capabilities() }
 
-    fn receive(&mut self, timestamp: u64) -> Result<Self::RxBuffer> {
-        let buffer = self.lower.receive(timestamp)?;
-        match self.mode {
-            PcapMode::Both | PcapMode::RxOnly =>
-                self.sink.packet(timestamp, buffer.as_ref()),
-            PcapMode::TxOnly => ()
-        }
-        Ok(buffer)
+    fn receive(&'a mut self) -> Option<(Self::RxToken, Self::TxToken)> {
+        let &mut Self { ref mut lower, ref sink, mode, .. } = self;
+        lower.receive().map(|(rx_token, tx_token)| {
+            let rx = RxToken { token: rx_token, sink: sink.clone(), mode: mode };
+            let tx = TxToken { token: tx_token, sink: sink.clone(), mode: mode };
+            (rx, tx)
+        })
     }
 
-    fn transmit(&mut self, timestamp: u64, length: usize) -> Result<Self::TxBuffer> {
-        let buffer = self.lower.transmit(timestamp, length)?;
-        Ok(TxBuffer { buffer, timestamp, sink: self.sink.clone(), mode: self.mode })
+    fn transmit(&'a mut self) -> Option<Self::TxToken> {
+        let &mut Self { ref mut lower, ref sink, mode } = self;
+        lower.transmit().map(|token| {
+            TxToken { token, sink: sink.clone(), mode: mode }
+        })
     }
 }
 
 #[doc(hidden)]
-pub struct TxBuffer<B: AsRef<[u8]> + AsMut<[u8]>, S: PcapSink> {
-    buffer:    B,
-    timestamp: u64,
-    sink:      S,
-    mode:      PcapMode
+pub struct RxToken<Rx: phy::RxToken, S: PcapSink> {
+    token: Rx,
+    sink:  S,
+    mode:  PcapMode,
 }
 
-impl<B, S> AsRef<[u8]> for TxBuffer<B, S>
-        where B: AsRef<[u8]> + AsMut<[u8]>, S: PcapSink {
-    fn as_ref(&self) -> &[u8] { self.buffer.as_ref() }
+impl<Rx: phy::RxToken, S: PcapSink> phy::RxToken for RxToken<Rx, S> {
+    fn consume<R, F: FnOnce(&[u8]) -> Result<R>>(self, timestamp: u64, f: F) -> Result<R> {
+        let Self { token, sink, mode } = self;
+        token.consume(timestamp, |buffer| {
+            match mode {
+                PcapMode::Both | PcapMode::RxOnly =>
+                    sink.packet(timestamp, buffer.as_ref()),
+                PcapMode::TxOnly => ()
+            }
+            f(buffer)
+        })
+    }
 }
 
-impl<B, S> AsMut<[u8]> for TxBuffer<B, S>
-        where B: AsRef<[u8]> + AsMut<[u8]>, S: PcapSink {
-    fn as_mut(&mut self) -> &mut [u8] { self.buffer.as_mut() }
+#[doc(hidden)]
+pub struct TxToken<Tx: phy::TxToken, S: PcapSink> {
+    token: Tx,
+    sink:  S,
+    mode:  PcapMode
 }
 
-impl<B, S> Drop for TxBuffer<B, S>
-        where B: AsRef<[u8]> + AsMut<[u8]>, S: PcapSink {
-    fn drop(&mut self) {
-        match self.mode {
-            PcapMode::Both | PcapMode::TxOnly =>
-                self.sink.packet(self.timestamp, self.as_ref()),
-            PcapMode::RxOnly => ()
-        }
+impl<Tx: phy::TxToken, S: PcapSink> phy::TxToken for TxToken<Tx, S> {
+    fn consume<R, F>(self, timestamp: u64, len: usize, f: F) -> Result<R>
+        where F: FnOnce(&mut [u8]) -> Result<R>
+    {
+        let Self { token, sink, mode } = self;
+        token.consume(timestamp, len, |buffer| {
+            let result = f(buffer);
+            match mode {
+                PcapMode::Both | PcapMode::TxOnly =>
+                    sink.packet(timestamp, &buffer),
+                PcapMode::RxOnly => ()
+            };
+            result
+        })
     }
 }

+ 32 - 24
src/phy/raw_socket.rs

@@ -1,11 +1,10 @@
 use std::cell::RefCell;
-use std::vec::Vec;
 use std::rc::Rc;
 use std::io;
 use std::os::unix::io::{RawFd, AsRawFd};
 
 use Result;
-use super::{sys, DeviceCapabilities, Device};
+use phy::{self, sys, DeviceCapabilities, Device};
 
 /// A socket that captures or transmits the complete frame.
 #[derive(Debug)]
@@ -36,9 +35,9 @@ impl RawSocket {
     }
 }
 
-impl Device for RawSocket {
-    type RxBuffer = Vec<u8>;
-    type TxBuffer = TxBuffer;
+impl<'a> Device<'a> for RawSocket {
+    type RxToken = RxToken;
+    type TxToken = TxToken;
 
     fn capabilities(&self) -> DeviceCapabilities {
         DeviceCapabilities {
@@ -47,39 +46,48 @@ impl Device for RawSocket {
         }
     }
 
-    fn receive(&mut self, _timestamp: u64) -> Result<Self::RxBuffer> {
-        let mut lower = self.lower.borrow_mut();
-        let mut buffer = vec![0; self.mtu];
-        let size = lower.recv(&mut buffer[..]).unwrap();
-        buffer.resize(size, 0);
-        Ok(buffer)
+    fn receive(&'a mut self) -> Option<(Self::RxToken, Self::TxToken)> {
+        let rx = RxToken { lower: self.lower.clone(), mtu: self.mtu };
+        let tx = TxToken { lower: self.lower.clone() };
+        Some((rx, tx))
     }
 
-    fn transmit(&mut self, _timestamp: u64, length: usize) -> Result<Self::TxBuffer> {
-        Ok(TxBuffer {
-            lower:  self.lower.clone(),
-            buffer: vec![0; length]
+    fn transmit(&'a mut self) -> Option<Self::TxToken> {
+        Some(TxToken {
+            lower: self.lower.clone(),
         })
     }
 }
 
 #[doc(hidden)]
-pub struct TxBuffer {
+pub struct RxToken {
     lower:  Rc<RefCell<sys::RawSocketDesc>>,
-    buffer: Vec<u8>
+    mtu:    usize,
 }
 
-impl AsRef<[u8]> for TxBuffer {
-    fn as_ref(&self) -> &[u8] { self.buffer.as_ref() }
+impl phy::RxToken for RxToken {
+    fn consume<R, F: FnOnce(&[u8]) -> Result<R>>(self, _timestamp: u64, f: F) -> Result<R> {
+        let mut lower = self.lower.borrow_mut();
+        let mut buffer = vec![0; self.mtu];
+        let size = lower.recv(&mut buffer[..]).unwrap();
+        buffer.resize(size, 0);
+        f(&mut buffer)
+    }
 }
 
-impl AsMut<[u8]> for TxBuffer {
-    fn as_mut(&mut self) -> &mut [u8] { self.buffer.as_mut() }
+#[doc(hidden)]
+pub struct TxToken {
+    lower:  Rc<RefCell<sys::RawSocketDesc>>,
 }
 
-impl Drop for TxBuffer {
-    fn drop(&mut self) {
+impl phy::TxToken for TxToken {
+    fn consume<R, F: FnOnce(&mut [u8]) -> Result<R>>(self, _timestamp: u64, len: usize, f: F)
+        -> Result<R>
+    {
         let mut lower = self.lower.borrow_mut();
-        lower.send(&mut self.buffer[..]).unwrap();
+        let mut buffer = vec![0; len];
+        let result = f(&mut buffer);
+        lower.send(&mut buffer[..]).unwrap();
+        result
     }
 }

+ 38 - 28
src/phy/tap_interface.rs

@@ -1,11 +1,10 @@
 use std::cell::RefCell;
-use std::vec::Vec;
 use std::rc::Rc;
 use std::io;
 use std::os::unix::io::{RawFd, AsRawFd};
 
 use {Error, Result};
-use super::{sys, DeviceCapabilities, Device};
+use phy::{self, sys, DeviceCapabilities, Device};
 
 /// A virtual Ethernet interface.
 #[derive(Debug)]
@@ -37,9 +36,9 @@ impl TapInterface {
     }
 }
 
-impl Device for TapInterface {
-    type RxBuffer = Vec<u8>;
-    type TxBuffer = TxBuffer;
+impl<'a> Device<'a> for TapInterface {
+    type RxToken = RxToken;
+    type TxToken = TxToken;
 
     fn capabilities(&self) -> DeviceCapabilities {
         DeviceCapabilities {
@@ -48,13 +47,35 @@ impl Device for TapInterface {
         }
     }
 
-    fn receive(&mut self, _timestamp: u64) -> Result<Self::RxBuffer> {
+    fn receive(&'a mut self) -> Option<(Self::RxToken, Self::TxToken)> {
+        let rx = RxToken { lower: self.lower.clone(), mtu: self.mtu };
+        let tx = TxToken { lower: self.lower.clone(), };
+        Some((rx, tx))
+    }
+
+    fn transmit(&'a mut self) -> Option<Self::TxToken> {
+        Some(TxToken {
+            lower: self.lower.clone(),
+        })
+    }
+}
+
+#[doc(hidden)]
+pub struct RxToken {
+    lower: Rc<RefCell<sys::TapInterfaceDesc>>,
+    mtu:   usize,
+}
+
+impl phy::RxToken for RxToken {
+    fn consume<R, F>(self, _timestamp: u64, f: F) -> Result<R>
+        where F: FnOnce(&[u8]) -> Result<R>
+    {
         let mut lower = self.lower.borrow_mut();
         let mut buffer = vec![0; self.mtu];
         match lower.recv(&mut buffer[..]) {
             Ok(size) => {
                 buffer.resize(size, 0);
-                Ok(buffer)
+                f(&buffer)
             }
             Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => {
                 Err(Error::Exhausted)
@@ -62,32 +83,21 @@ impl Device for TapInterface {
             Err(err) => panic!("{}", err)
         }
     }
-
-    fn transmit(&mut self, _timestamp: u64, length: usize) -> Result<Self::TxBuffer> {
-        Ok(TxBuffer {
-            lower:  self.lower.clone(),
-            buffer: vec![0; length]
-        })
-    }
 }
 
 #[doc(hidden)]
-pub struct TxBuffer {
-    lower:  Rc<RefCell<sys::TapInterfaceDesc>>,
-    buffer: Vec<u8>
-}
-
-impl AsRef<[u8]> for TxBuffer {
-    fn as_ref(&self) -> &[u8] { self.buffer.as_ref() }
-}
-
-impl AsMut<[u8]> for TxBuffer {
-    fn as_mut(&mut self) -> &mut [u8] { self.buffer.as_mut() }
+pub struct TxToken {
+    lower: Rc<RefCell<sys::TapInterfaceDesc>>,
 }
 
-impl Drop for TxBuffer {
-    fn drop(&mut self) {
+impl phy::TxToken for TxToken {
+    fn consume<R, F>(self, _timestamp: u64, len: usize, f: F) -> Result<R>
+        where F: FnOnce(&mut [u8]) -> Result<R>
+    {
         let mut lower = self.lower.borrow_mut();
-        lower.send(&mut self.buffer[..]).unwrap();
+        let mut buffer = vec![0; len];
+        let result = f(&mut buffer);
+        lower.send(&mut buffer[..]).unwrap();
+        result
     }
 }

+ 50 - 29
src/phy/tracer.rs

@@ -1,24 +1,21 @@
 use Result;
 use wire::pretty_print::{PrettyPrint, PrettyPrinter};
-use super::{DeviceCapabilities, Device};
+use phy::{self, DeviceCapabilities, Device};
 
 /// A tracer device.
 ///
 /// A tracer is a device that pretty prints all packets traversing it
 /// using the provided writer function, and then passes them to another
 /// device.
-pub struct Tracer<D: Device, P: PrettyPrint> {
-    inner:     D,
-    writer:    fn(u64, PrettyPrinter<P>)
+pub struct Tracer<D: for<'a> Device<'a>, P: PrettyPrint> {
+    inner:  D,
+    writer: fn(u64, PrettyPrinter<P>),
 }
 
-impl<D: Device, P: PrettyPrint> Tracer<D, P> {
+impl<D: for<'a> Device<'a>, P: PrettyPrint> Tracer<D, P> {
     /// Create a tracer device.
     pub fn new(inner: D, writer: fn(timestamp: u64, printer: PrettyPrinter<P>)) -> Tracer<D, P> {
-        Tracer {
-            inner:   inner,
-            writer:  writer
-        }
+        Tracer { inner, writer }
     }
 
     /// Return the underlying device, consuming the tracer.
@@ -27,41 +24,65 @@ impl<D: Device, P: PrettyPrint> Tracer<D, P> {
     }
 }
 
-impl<D: Device, P: PrettyPrint> Device for Tracer<D, P> {
-    type RxBuffer = D::RxBuffer;
-    type TxBuffer = TxBuffer<D::TxBuffer, P>;
+impl<'a, D, P> Device<'a> for Tracer<D, P>
+    where D: for<'b> Device<'b>,
+          P: PrettyPrint + 'a,
+{
+    type RxToken = RxToken<<D as Device<'a>>::RxToken, P>;
+    type TxToken = TxToken<<D as Device<'a>>::TxToken, P>;
 
     fn capabilities(&self) -> DeviceCapabilities { self.inner.capabilities() }
 
-    fn receive(&mut self, timestamp: u64) -> Result<Self::RxBuffer> {
-        let buffer = self.inner.receive(timestamp)?;
-        (self.writer)(timestamp, PrettyPrinter::<P>::new("<- ", &buffer));
-        Ok(buffer)
+    fn receive(&'a mut self) -> Option<(Self::RxToken, Self::TxToken)> {
+        let &mut Self { ref mut inner, writer, .. } = self;
+        inner.receive().map(|(rx_token, tx_token)| {
+            let rx = RxToken { token: rx_token, writer: writer };
+            let tx = TxToken { token: tx_token, writer: writer };
+            (rx, tx)
+        })
     }
 
-    fn transmit(&mut self, timestamp: u64, length: usize) -> Result<Self::TxBuffer> {
-        let buffer = self.inner.transmit(timestamp, length)?;
-        Ok(TxBuffer { buffer, timestamp, writer: self.writer })
+    fn transmit(&'a mut self) -> Option<Self::TxToken> {
+        let &mut Self { ref mut inner, writer } = self;
+        inner.transmit().map(|tx_token| {
+            TxToken { token: tx_token, writer: writer }
+        })
     }
 }
 
 #[doc(hidden)]
-pub struct TxBuffer<B: AsRef<[u8]> + AsMut<[u8]>, P: PrettyPrint> {
-    buffer:    B,
-    timestamp: u64,
+pub struct RxToken<Rx: phy::RxToken, P: PrettyPrint> {
+    token:     Rx,
     writer:    fn(u64, PrettyPrinter<P>)
 }
 
-impl<B: AsRef<[u8]> + AsMut<[u8]>, P: PrettyPrint> AsRef<[u8]> for TxBuffer<B, P> {
-    fn as_ref(&self) -> &[u8] { self.buffer.as_ref() }
+impl<Rx: phy::RxToken, P: PrettyPrint> phy::RxToken for RxToken<Rx, P> {
+    fn consume<R, F>(self, timestamp: u64, f: F) -> Result<R>
+        where F: FnOnce(&[u8]) -> Result<R>
+    {
+        let Self { token, writer } = self;
+        token.consume(timestamp, |buffer| {
+            writer(timestamp, PrettyPrinter::<P>::new("<- ", &buffer));
+            f(buffer)
+        })
+    }
 }
 
-impl<B: AsRef<[u8]> + AsMut<[u8]>, P: PrettyPrint> AsMut<[u8]> for TxBuffer<B, P> {
-    fn as_mut(&mut self) -> &mut [u8] { self.buffer.as_mut() }
+#[doc(hidden)]
+pub struct TxToken<Tx: phy::TxToken, P: PrettyPrint> {
+    token:     Tx,
+    writer:    fn(u64, PrettyPrinter<P>)
 }
 
-impl<B: AsRef<[u8]> + AsMut<[u8]>, P: PrettyPrint> Drop for TxBuffer<B, P> {
-    fn drop(&mut self) {
-        (self.writer)(self.timestamp, PrettyPrinter::<P>::new("-> ", &self.buffer));
+impl<Tx: phy::TxToken, P: PrettyPrint> phy::TxToken for TxToken<Tx, P> {
+    fn consume<R, F>(self, timestamp: u64, len: usize, f: F) -> Result<R>
+        where F: FnOnce(&mut [u8]) -> Result<R>
+    {
+        let Self { token, writer } = self;
+        token.consume(timestamp, len, |buffer| {
+            let result = f(buffer);
+            writer(timestamp, PrettyPrinter::<P>::new("-> ", &buffer));
+            result
+        })
     }
 }