Переглянути джерело

phy: make consume infallible, move timestamp to receive/transmit.

Dario Nieuwenhuis 2 роки тому
батько
коміт
0102a4b741

+ 7 - 10
examples/tcpdump.rs

@@ -10,15 +10,12 @@ fn main() {
     let mut socket = RawSocket::new(ifname.as_ref(), smoltcp::phy::Medium::Ethernet).unwrap();
     loop {
         phy_wait(socket.as_raw_fd(), None).unwrap();
-        let (rx_token, _) = socket.receive().unwrap();
-        rx_token
-            .consume(Instant::now(), |buffer| {
-                println!(
-                    "{}",
-                    PrettyPrinter::<EthernetFrame<&[u8]>>::new("", &buffer)
-                );
-                Ok(())
-            })
-            .unwrap();
+        let (rx_token, _) = socket.receive(Instant::now()).unwrap();
+        rx_token.consume(|buffer| {
+            println!(
+                "{}",
+                PrettyPrinter::<EthernetFrame<&[u8]>>::new("", &buffer)
+            );
+        })
     }
 }

+ 1 - 1
src/iface/interface/ethernet.rs

@@ -69,7 +69,7 @@ impl<'i> InterfaceInner<'i> {
         F: FnOnce(EthernetFrame<&mut [u8]>),
     {
         let tx_len = EthernetFrame::<&[u8]>::buffer_len(buffer_len);
-        tx_token.consume(self.now, tx_len, |tx_buffer| {
+        tx_token.consume(tx_len, |tx_buffer| {
             debug_assert!(tx_buffer.as_ref().len() == tx_len);
             let mut frame = EthernetFrame::new_unchecked(tx_buffer);
 

+ 1 - 1
src/iface/interface/ipv4.rs

@@ -469,7 +469,7 @@ impl<'a> InterfaceInner<'a> {
             Ok(())
         };
 
-        tx_token.consume(self.now, tx_len, |mut tx_buffer| {
+        tx_token.consume(tx_len, |mut tx_buffer| {
             #[cfg(feature = "medium-ethernet")]
             if matches!(self.caps.medium, Medium::Ethernet) {
                 emit_ethernet(&IpRepr::Ipv4(*repr), tx_buffer)?;

+ 13 - 17
src/iface/interface/mod.rs

@@ -927,7 +927,7 @@ impl<'a> Interface<'a> {
                 } else if let Some(pkt) = self.inner.igmp_report_packet(IgmpVersion::Version2, addr)
                 {
                     // Send initial membership report
-                    let tx_token = device.transmit().ok_or(Error::Exhausted)?;
+                    let tx_token = device.transmit(timestamp).ok_or(Error::Exhausted)?;
                     self.inner.dispatch_ip(tx_token, pkt, None)?;
                     Ok(true)
                 } else {
@@ -963,7 +963,7 @@ impl<'a> Interface<'a> {
                     Ok(false)
                 } else if let Some(pkt) = self.inner.igmp_leave_packet(addr) {
                     // Send group leave packet
-                    let tx_token = device.transmit().ok_or(Error::Exhausted)?;
+                    let tx_token = device.transmit(timestamp).ok_or(Error::Exhausted)?;
                     self.inner.dispatch_ip(tx_token, pkt, None)?;
                     Ok(true)
                 } else {
@@ -1161,8 +1161,8 @@ impl<'a> Interface<'a> {
             out_packets: _out_packets,
         } = self;
 
-        while let Some((rx_token, tx_token)) = device.receive() {
-            let res = rx_token.consume(inner.now, |frame| {
+        while let Some((rx_token, tx_token)) = device.receive(inner.now) {
+            rx_token.consume(|frame| {
                 match inner.caps.medium {
                     #[cfg(feature = "medium-ethernet")]
                     Medium::Ethernet => {
@@ -1195,12 +1195,7 @@ impl<'a> Interface<'a> {
                     }
                 }
                 processed_any = true;
-                Ok(())
             });
-
-            if let Err(err) = res {
-                net_debug!("Failed to consume RX token: {}", err);
-            }
         }
 
         processed_any
@@ -1229,7 +1224,7 @@ impl<'a> Interface<'a> {
             let mut neighbor_addr = None;
             let mut respond = |inner: &mut InterfaceInner, response: IpPacket| {
                 neighbor_addr = Some(response.ip_repr().dst_addr());
-                let t = device.transmit().ok_or_else(|| {
+                let t = device.transmit(inner.now).ok_or_else(|| {
                     net_debug!("failed to transmit IP: {}", Error::Exhausted);
                     Error::Exhausted
                 })?;
@@ -1328,7 +1323,7 @@ impl<'a> Interface<'a> {
             } if self.inner.now >= timeout => {
                 if let Some(pkt) = self.inner.igmp_report_packet(version, group) {
                     // Send initial membership report
-                    let tx_token = device.transmit().ok_or(Error::Exhausted)?;
+                    let tx_token = device.transmit(self.inner.now).ok_or(Error::Exhausted)?;
                     self.inner.dispatch_ip(tx_token, pkt, None)?;
                 }
 
@@ -1352,7 +1347,8 @@ impl<'a> Interface<'a> {
                     Some(addr) => {
                         if let Some(pkt) = self.inner.igmp_report_packet(version, addr) {
                             // Send initial membership report
-                            let tx_token = device.transmit().ok_or(Error::Exhausted)?;
+                            let tx_token =
+                                device.transmit(self.inner.now).ok_or(Error::Exhausted)?;
                             self.inner.dispatch_ip(tx_token, pkt, None)?;
                         }
 
@@ -1402,7 +1398,7 @@ impl<'a> Interface<'a> {
         } = &self.out_packets.ipv4_out_packet;
 
         if *packet_len > *sent_bytes {
-            match device.transmit() {
+            match device.transmit(self.inner.now) {
                 Some(tx_token) => self
                     .inner
                     .dispatch_ipv4_out_packet(tx_token, &mut self.out_packets.ipv4_out_packet),
@@ -1440,7 +1436,7 @@ impl<'a> Interface<'a> {
         } = &self.out_packets.sixlowpan_out_packet;
 
         if *packet_len > *sent_bytes {
-            match device.transmit() {
+            match device.transmit(self.inner.now) {
                 Some(tx_token) => self.inner.dispatch_ieee802154_out_packet(
                     tx_token,
                     &mut self.out_packets.sixlowpan_out_packet,
@@ -2246,7 +2242,7 @@ impl<'a> InterfaceInner<'a> {
                         }
 
                         // Transmit the first packet.
-                        tx_token.consume(self.now, tx_len, |mut tx_buffer| {
+                        tx_token.consume(tx_len, |mut tx_buffer| {
                             #[cfg(feature = "medium-ethernet")]
                             if matches!(self.caps.medium, Medium::Ethernet) {
                                 emit_ethernet(&ip_repr, tx_buffer)?;
@@ -2271,7 +2267,7 @@ impl<'a> InterfaceInner<'a> {
                     }
                 } else {
                     // No fragmentation is required.
-                    tx_token.consume(self.now, total_len, |mut tx_buffer| {
+                    tx_token.consume(total_len, |mut tx_buffer| {
                         #[cfg(feature = "medium-ethernet")]
                         if matches!(self.caps.medium, Medium::Ethernet) {
                             emit_ethernet(&ip_repr, tx_buffer)?;
@@ -2285,7 +2281,7 @@ impl<'a> InterfaceInner<'a> {
             }
             // We don't support IPv6 fragmentation yet.
             #[cfg(feature = "proto-ipv6")]
-            IpRepr::Ipv6(_) => tx_token.consume(self.now, total_len, |mut tx_buffer| {
+            IpRepr::Ipv6(_) => tx_token.consume(total_len, |mut tx_buffer| {
                 #[cfg(feature = "medium-ethernet")]
                 if matches!(self.caps.medium, Medium::Ethernet) {
                     emit_ethernet(&ip_repr, tx_buffer)?;

+ 17 - 23
src/iface/interface/sixlowpan.rs

@@ -488,27 +488,22 @@ impl<'a> InterfaceInner<'a> {
                 *sent_bytes = frag1_size;
                 *datagram_offset = frag1_size + header_diff;
 
-                tx_token.consume(
-                    self.now,
-                    ieee_len + frag1.buffer_len() + frag1_size,
-                    |mut tx_buf| {
-                        // Add the IEEE header.
-                        let mut ieee_packet =
-                            Ieee802154Frame::new_unchecked(&mut tx_buf[..ieee_len]);
-                        ieee_repr.emit(&mut ieee_packet);
-                        tx_buf = &mut tx_buf[ieee_len..];
-
-                        // Add the first fragment header
-                        let mut frag1_packet = SixlowpanFragPacket::new_unchecked(&mut tx_buf);
-                        frag1.emit(&mut frag1_packet);
-                        tx_buf = &mut tx_buf[frag1.buffer_len()..];
-
-                        // Add the buffer part.
-                        tx_buf[..frag1_size].copy_from_slice(&buffer[..frag1_size]);
-
-                        Ok(())
-                    },
-                )
+                tx_token.consume(ieee_len + frag1.buffer_len() + frag1_size, |mut tx_buf| {
+                    // Add the IEEE header.
+                    let mut ieee_packet = Ieee802154Frame::new_unchecked(&mut tx_buf[..ieee_len]);
+                    ieee_repr.emit(&mut ieee_packet);
+                    tx_buf = &mut tx_buf[ieee_len..];
+
+                    // Add the first fragment header
+                    let mut frag1_packet = SixlowpanFragPacket::new_unchecked(&mut tx_buf);
+                    frag1.emit(&mut frag1_packet);
+                    tx_buf = &mut tx_buf[frag1.buffer_len()..];
+
+                    // Add the buffer part.
+                    tx_buf[..frag1_size].copy_from_slice(&buffer[..frag1_size]);
+
+                    Ok(())
+                })
             }
 
             #[cfg(not(feature = "proto-sixlowpan-fragmentation"))]
@@ -520,7 +515,7 @@ impl<'a> InterfaceInner<'a> {
             }
         } else {
             // We don't need fragmentation, so we emit everything to the TX token.
-            tx_token.consume(self.now, total_size + ieee_len, |mut tx_buf| {
+            tx_token.consume(total_size + ieee_len, |mut tx_buf| {
                 let mut ieee_packet = Ieee802154Frame::new_unchecked(&mut tx_buf[..ieee_len]);
                 ieee_repr.emit(&mut ieee_packet);
                 tx_buf = &mut tx_buf[ieee_len..];
@@ -623,7 +618,6 @@ impl<'a> InterfaceInner<'a> {
         let frag_size = (*packet_len - *sent_bytes).min(*fragn_size);
 
         tx_token.consume(
-            self.now,
             ieee_repr.buffer_len() + fragn.buffer_len() + frag_size,
             |mut tx_buf| {
                 let mut ieee_packet = Ieee802154Frame::new_unchecked(&mut tx_buf[..ieee_len]);

+ 13 - 18
src/iface/interface/tests.rs

@@ -9,7 +9,6 @@ use crate::iface::NeighborCache;
 use crate::phy::{ChecksumCapabilities, Loopback};
 #[cfg(feature = "proto-igmp")]
 use crate::time::Instant;
-use crate::{Error, Result};
 
 #[allow(unused)]
 fn fill_slice(s: &mut [u8], val: u8) {
@@ -129,12 +128,10 @@ fn create_ieee802154<'a>() -> (Interface<'a>, SocketSet<'a>, Loopback) {
 #[cfg(feature = "proto-igmp")]
 fn recv_all(device: &mut Loopback, timestamp: Instant) -> Vec<Vec<u8>> {
     let mut pkts = Vec::new();
-    while let Some((rx, _tx)) = device.receive() {
-        rx.consume(timestamp, |pkt| {
+    while let Some((rx, _tx)) = device.receive(timestamp) {
+        rx.consume(|pkt| {
             pkts.push(pkt.to_vec());
-            Ok(())
-        })
-        .unwrap();
+        });
     }
     pkts
 }
@@ -144,11 +141,12 @@ fn recv_all(device: &mut Loopback, timestamp: Instant) -> Vec<Vec<u8>> {
 struct MockTxToken;
 
 impl TxToken for MockTxToken {
-    fn consume<R, F>(self, _: Instant, _: usize, _: F) -> Result<R>
+    fn consume<R, F>(self, len: usize, f: F) -> R
     where
-        F: FnOnce(&mut [u8]) -> Result<R>,
+        F: FnOnce(&mut [u8]) -> R,
     {
-        Err(Error::Unaddressable)
+        let mut junk = [0; 1536];
+        f(&mut junk[..len])
     }
 }
 
@@ -1206,13 +1204,10 @@ fn test_handle_igmp() {
     ];
     {
         // Transmit GENERAL_QUERY_BYTES into loopback
-        let tx_token = device.transmit().unwrap();
-        tx_token
-            .consume(timestamp, GENERAL_QUERY_BYTES.len(), |buffer| {
-                buffer.copy_from_slice(GENERAL_QUERY_BYTES);
-                Ok(())
-            })
-            .unwrap();
+        let tx_token = device.transmit(timestamp).unwrap();
+        tx_token.consume(GENERAL_QUERY_BYTES.len(), |buffer| {
+            buffer.copy_from_slice(GENERAL_QUERY_BYTES);
+        });
     }
     // Trigger processing until all packets received through the
     // loopback have been processed, including responses to
@@ -1562,7 +1557,7 @@ fn test_echo_request_sixlowpan_128_bytes() {
         Instant::now(),
     );
 
-    let tx_token = device.transmit().unwrap();
+    let tx_token = device.transmit(Instant::now()).unwrap();
     iface
         .inner
         .dispatch_ieee802154(
@@ -1702,7 +1697,7 @@ fn test_sixlowpan_udp_with_fragmentation() {
         ))
     );
 
-    let tx_token = device.transmit().unwrap();
+    let tx_token = device.transmit(Instant::now()).unwrap();
     iface
         .inner
         .dispatch_ieee802154(

+ 71 - 102
src/phy/fault_injector.rs

@@ -1,8 +1,5 @@
-use core::cell::RefCell;
-
 use crate::phy::{self, Device, DeviceCapabilities};
 use crate::time::{Duration, Instant};
-use crate::{Error, Result};
 
 // We use our own RNG to stay compatible with #![no_std].
 // The use of the RNG below has a slight bias, but it doesn't matter.
@@ -96,23 +93,24 @@ impl State {
 #[derive(Debug)]
 pub struct FaultInjector<D: Device> {
     inner: D,
-    state: RefCell<State>,
+    state: State,
     config: Config,
+    rx_buf: [u8; MTU],
 }
 
 impl<D: Device> FaultInjector<D> {
     /// Create a fault injector device, using the given random number generator seed.
     pub fn new(inner: D, seed: u32) -> FaultInjector<D> {
-        let state = State {
-            rng_seed: seed,
-            refilled_at: Instant::from_millis(0),
-            tx_bucket: 0,
-            rx_bucket: 0,
-        };
         FaultInjector {
             inner,
-            state: RefCell::new(state),
+            state: State {
+                rng_seed: seed,
+                refilled_at: Instant::from_millis(0),
+                tx_bucket: 0,
+                rx_bucket: 0,
+            },
             config: Config::default(),
+            rx_buf: [0u8; MTU],
         }
     }
 
@@ -190,13 +188,13 @@ impl<D: Device> FaultInjector<D> {
 
     /// Set the interval for packet rate limiting, in milliseconds.
     pub fn set_bucket_interval(&mut self, interval: Duration) {
-        self.state.borrow_mut().refilled_at = Instant::from_millis(0);
+        self.state.refilled_at = Instant::from_millis(0);
         self.config.interval = interval
     }
 }
 
 impl<D: Device> Device for FaultInjector<D> {
-    type RxToken<'a> = RxToken<'a, D::RxToken<'a>>
+    type RxToken<'a> = RxToken<'a>
     where
         Self: 'a;
     type TxToken<'a> = TxToken<'a, D::TxToken<'a>>
@@ -211,117 +209,94 @@ impl<D: Device> Device for FaultInjector<D> {
         caps
     }
 
-    fn receive(&mut self) -> Option<(Self::RxToken<'_>, Self::TxToken<'_>)> {
-        let &mut Self {
-            ref mut inner,
-            ref state,
-            config,
-        } = self;
-        inner.receive().map(|(rx_token, tx_token)| {
-            let rx = RxToken {
-                state,
-                config,
-                token: rx_token,
-                corrupt: [0; MTU],
-            };
-            let tx = TxToken {
-                state,
-                config,
-                token: tx_token,
-                junk: [0; MTU],
-            };
-            (rx, tx)
-        })
+    fn receive(&mut self, timestamp: Instant) -> Option<(Self::RxToken<'_>, Self::TxToken<'_>)> {
+        let (rx_token, tx_token) = self.inner.receive(timestamp)?;
+
+        let len = super::RxToken::consume(rx_token, |buffer| {
+            if (self.config.max_size > 0 && buffer.len() > self.config.max_size)
+                || buffer.len() > self.rx_buf.len()
+            {
+                net_trace!("rx: dropping a packet that is too large");
+                return None;
+            }
+            self.rx_buf[..buffer.len()].copy_from_slice(buffer);
+            Some(buffer.len())
+        })?;
+
+        let buf = &mut self.rx_buf[..len];
+
+        if self.state.maybe(self.config.drop_pct) {
+            net_trace!("rx: randomly dropping a packet");
+            return None;
+        }
+
+        if !self.state.maybe_receive(&self.config, timestamp) {
+            net_trace!("rx: dropping a packet because of rate limiting");
+            return None;
+        }
+
+        if self.state.maybe(self.config.corrupt_pct) {
+            net_trace!("rx: randomly corrupting a packet");
+            self.state.corrupt(&mut buf[..]);
+        }
+
+        let rx = RxToken { buf };
+        let tx = TxToken {
+            state: &mut self.state,
+            config: self.config,
+            token: tx_token,
+            junk: [0; MTU],
+            timestamp,
+        };
+        Some((rx, tx))
     }
 
-    fn transmit(&mut self) -> Option<Self::TxToken<'_>> {
-        let &mut Self {
-            ref mut inner,
-            ref state,
-            config,
-        } = self;
-        inner.transmit().map(|token| TxToken {
-            state,
-            config,
+    fn transmit(&mut self, timestamp: Instant) -> Option<Self::TxToken<'_>> {
+        self.inner.transmit(timestamp).map(|token| TxToken {
+            state: &mut self.state,
+            config: self.config,
             token,
             junk: [0; MTU],
+            timestamp,
         })
     }
 }
 
 #[doc(hidden)]
-pub struct RxToken<'a, Rx: phy::RxToken> {
-    state: &'a RefCell<State>,
-    config: Config,
-    token: Rx,
-    corrupt: [u8; MTU],
+pub struct RxToken<'a> {
+    buf: &'a mut [u8],
 }
 
-impl<'a, Rx: phy::RxToken> phy::RxToken for RxToken<'a, Rx> {
-    fn consume<R, F>(self, timestamp: Instant, f: F) -> Result<R>
+impl<'a> phy::RxToken for RxToken<'a> {
+    fn consume<R, F>(self, f: F) -> R
     where
-        F: FnOnce(&mut [u8]) -> Result<R>,
+        F: FnOnce(&mut [u8]) -> R,
     {
-        if self.state.borrow_mut().maybe(self.config.drop_pct) {
-            net_trace!("rx: randomly dropping a packet");
-            return Err(Error::Exhausted);
-        }
-        if !self
-            .state
-            .borrow_mut()
-            .maybe_receive(&self.config, timestamp)
-        {
-            net_trace!("rx: dropping a packet because of rate limiting");
-            return Err(Error::Exhausted);
-        }
-        let Self {
-            token,
-            config,
-            state,
-            mut corrupt,
-        } = self;
-        token.consume(timestamp, |buffer| {
-            if config.max_size > 0 && buffer.as_ref().len() > config.max_size {
-                net_trace!("rx: dropping a packet that is too large");
-                return Err(Error::Exhausted);
-            }
-            if state.borrow_mut().maybe(config.corrupt_pct) {
-                net_trace!("rx: randomly corrupting a packet");
-                let mut corrupt = &mut corrupt[..buffer.len()];
-                corrupt.copy_from_slice(buffer);
-                state.borrow_mut().corrupt(&mut corrupt);
-                f(corrupt)
-            } else {
-                f(buffer)
-            }
-        })
+        f(self.buf)
     }
 }
 
 #[doc(hidden)]
 pub struct TxToken<'a, Tx: phy::TxToken> {
-    state: &'a RefCell<State>,
+    state: &'a mut State,
     config: Config,
     token: Tx,
     junk: [u8; MTU],
+    timestamp: Instant,
 }
 
 impl<'a, Tx: phy::TxToken> phy::TxToken for TxToken<'a, Tx> {
-    fn consume<R, F>(mut self, timestamp: Instant, len: usize, f: F) -> Result<R>
+    fn consume<R, F>(mut self, len: usize, f: F) -> R
     where
-        F: FnOnce(&mut [u8]) -> Result<R>,
+        F: FnOnce(&mut [u8]) -> R,
     {
-        let drop = if self.state.borrow_mut().maybe(self.config.drop_pct) {
+        let drop = if self.state.maybe(self.config.drop_pct) {
             net_trace!("tx: randomly dropping a packet");
             true
         } else if self.config.max_size > 0 && len > self.config.max_size {
             net_trace!("tx: dropping a packet that is too large");
             true
-        } else if !self
-            .state
-            .borrow_mut()
-            .maybe_transmit(&self.config, timestamp)
-        {
+        } else if !self.state.maybe_transmit(&self.config, self.timestamp) {
             net_trace!("tx: dropping a packet because of rate limiting");
             true
         } else {
@@ -332,16 +307,10 @@ impl<'a, Tx: phy::TxToken> phy::TxToken for TxToken<'a, Tx> {
             return f(&mut self.junk[..len]);
         }
 
-        let Self {
-            token,
-            state,
-            config,
-            ..
-        } = self;
-        token.consume(timestamp, len, |mut buf| {
-            if state.borrow_mut().maybe(config.corrupt_pct) {
+        self.token.consume(len, |mut buf| {
+            if self.state.maybe(self.config.corrupt_pct) {
                 net_trace!("tx: corrupting a packet");
-                state.borrow_mut().corrupt(&mut buf)
+                self.state.corrupt(&mut buf)
             }
             f(buf)
         })

+ 10 - 11
src/phy/fuzz_injector.rs

@@ -1,6 +1,5 @@
 use crate::phy::{self, Device, DeviceCapabilities};
 use crate::time::Instant;
-use crate::Result;
 
 // This could be fixed once associated consts are stable.
 const MTU: usize = 1536;
@@ -62,13 +61,13 @@ where
         caps
     }
 
-    fn receive(&mut self) -> Option<(Self::RxToken<'_>, Self::TxToken<'_>)> {
+    fn receive(&mut self, timestamp: Instant) -> Option<(Self::RxToken<'_>, Self::TxToken<'_>)> {
         let &mut Self {
             ref mut inner,
             ref fuzz_rx,
             ref fuzz_tx,
         } = self;
-        inner.receive().map(|(rx_token, tx_token)| {
+        inner.receive(timestamp).map(|(rx_token, tx_token)| {
             let rx = RxToken {
                 fuzzer: fuzz_rx,
                 token: rx_token,
@@ -81,13 +80,13 @@ where
         })
     }
 
-    fn transmit(&mut self) -> Option<Self::TxToken<'_>> {
+    fn transmit(&mut self, timestamp: Instant) -> Option<Self::TxToken<'_>> {
         let &mut Self {
             ref mut inner,
             fuzz_rx: _,
             ref fuzz_tx,
         } = self;
-        inner.transmit().map(|token| TxToken {
+        inner.transmit(timestamp).map(|token| TxToken {
             fuzzer: fuzz_tx,
             token: token,
         })
@@ -101,12 +100,12 @@ pub struct RxToken<'a, Rx: phy::RxToken, F: Fuzzer + 'a> {
 }
 
 impl<'a, Rx: phy::RxToken, FRx: Fuzzer> phy::RxToken for RxToken<'a, Rx, FRx> {
-    fn consume<R, F>(self, timestamp: Instant, f: F) -> Result<R>
+    fn consume<R, F>(self, f: F) -> R
     where
-        F: FnOnce(&mut [u8]) -> Result<R>,
+        F: FnOnce(&mut [u8]) -> R,
     {
         let Self { fuzzer, token } = self;
-        token.consume(timestamp, |buffer| {
+        token.consume(|buffer| {
             fuzzer.fuzz_packet(buffer);
             f(buffer)
         })
@@ -120,12 +119,12 @@ pub struct TxToken<'a, Tx: phy::TxToken, F: Fuzzer + 'a> {
 }
 
 impl<'a, Tx: phy::TxToken, FTx: Fuzzer> phy::TxToken for TxToken<'a, Tx, FTx> {
-    fn consume<R, F>(self, timestamp: Instant, len: usize, f: F) -> Result<R>
+    fn consume<R, F>(self, len: usize, f: F) -> R
     where
-        F: FnOnce(&mut [u8]) -> Result<R>,
+        F: FnOnce(&mut [u8]) -> R,
     {
         let Self { fuzzer, token } = self;
-        token.consume(timestamp, len, |buf| {
+        token.consume(len, |buf| {
             let result = f(buf);
             fuzzer.fuzz_packet(buf);
             result

+ 6 - 7
src/phy/loopback.rs

@@ -3,7 +3,6 @@ use alloc::vec::Vec;
 
 use crate::phy::{self, Device, DeviceCapabilities, Medium};
 use crate::time::Instant;
-use crate::Result;
 
 /// A loopback device.
 #[derive(Debug)]
@@ -38,7 +37,7 @@ impl Device for Loopback {
         }
     }
 
-    fn receive(&mut self) -> Option<(Self::RxToken<'_>, Self::TxToken<'_>)> {
+    fn receive(&mut self, _timestamp: Instant) -> Option<(Self::RxToken<'_>, Self::TxToken<'_>)> {
         self.queue.pop_front().map(move |buffer| {
             let rx = RxToken { buffer };
             let tx = TxToken {
@@ -48,7 +47,7 @@ impl Device for Loopback {
         })
     }
 
-    fn transmit(&mut self) -> Option<Self::TxToken<'_>> {
+    fn transmit(&mut self, _timestamp: Instant) -> Option<Self::TxToken<'_>> {
         Some(TxToken {
             queue: &mut self.queue,
         })
@@ -61,9 +60,9 @@ pub struct RxToken {
 }
 
 impl phy::RxToken for RxToken {
-    fn consume<R, F>(mut self, _timestamp: Instant, f: F) -> Result<R>
+    fn consume<R, F>(mut self, f: F) -> R
     where
-        F: FnOnce(&mut [u8]) -> Result<R>,
+        F: FnOnce(&mut [u8]) -> R,
     {
         f(&mut self.buffer)
     }
@@ -76,9 +75,9 @@ pub struct TxToken<'a> {
 }
 
 impl<'a> phy::TxToken for TxToken<'a> {
-    fn consume<R, F>(self, _timestamp: Instant, len: usize, f: F) -> Result<R>
+    fn consume<R, F>(self, len: usize, f: F) -> R
     where
-        F: FnOnce(&mut [u8]) -> Result<R>,
+        F: FnOnce(&mut [u8]) -> R,
     {
         let mut buffer = Vec::new();
         buffer.resize(len, 0);

+ 18 - 19
src/phy/mod.rs

@@ -42,12 +42,12 @@ impl phy::Device for StmPhy {
     type RxToken<'a> = StmPhyRxToken<'a> where Self: 'a;
     type TxToken<'a> = StmPhyTxToken<'a> where Self: 'a;
 
-    fn receive(&mut self) -> Option<(Self::RxToken<'_>, Self::TxToken<'_>)> {
+    fn receive(&mut self, _timestamp: Instant) -> Option<(Self::RxToken<'_>, Self::TxToken<'_>)> {
         Some((StmPhyRxToken(&mut self.rx_buffer[..]),
               StmPhyTxToken(&mut self.tx_buffer[..])))
     }
 
-    fn transmit(&mut self) -> Option<Self::TxToken<'_>> {
+    fn transmit(&mut self, _timestamp: Instant) -> Option<Self::TxToken<'_>> {
         Some(StmPhyTxToken(&mut self.tx_buffer[..]))
     }
 
@@ -63,8 +63,8 @@ impl phy::Device for StmPhy {
 struct StmPhyRxToken<'a>(&'a mut [u8]);
 
 impl<'a> phy::RxToken for StmPhyRxToken<'a> {
-    fn consume<R, F>(mut self, _timestamp: Instant, f: F) -> Result<R>
-        where F: FnOnce(&mut [u8]) -> Result<R>
+    fn consume<R, F>(mut self, f: F) -> R
+        where F: FnOnce(&mut [u8]) -> R
     {
         // TODO: receive packet into buffer
         let result = f(&mut self.0);
@@ -76,8 +76,8 @@ impl<'a> phy::RxToken for StmPhyRxToken<'a> {
 struct StmPhyTxToken<'a>(&'a mut [u8]);
 
 impl<'a> phy::TxToken for StmPhyTxToken<'a> {
-    fn consume<R, F>(self, _timestamp: Instant, len: usize, f: F) -> Result<R>
-        where F: FnOnce(&mut [u8]) -> Result<R>
+    fn consume<R, F>(self, len: usize, f: F) -> R
+        where F: FnOnce(&mut [u8]) -> R
     {
         let result = f(&mut self.0[..len]);
         println!("tx called {}", len);
@@ -90,7 +90,6 @@ impl<'a> phy::TxToken for StmPhyTxToken<'a> {
 )]
 
 use crate::time::Instant;
-use crate::Result;
 
 #[cfg(all(
     any(feature = "phy-raw_socket", feature = "phy-tuntap_interface"),
@@ -322,10 +321,16 @@ pub trait Device {
     /// on the contents of the received packet. For example, this makes it possible to
     /// handle arbitrarily large ICMP echo ("ping") requests, where the all received bytes
     /// need to be sent back, without heap allocation.
-    fn receive(&mut self) -> Option<(Self::RxToken<'_>, Self::TxToken<'_>)>;
+    ///
+    /// The timestamp must be a number of milliseconds, monotonically increasing since an
+    /// arbitrary moment in time, such as system startup.
+    fn receive(&mut self, timestamp: Instant) -> Option<(Self::RxToken<'_>, Self::TxToken<'_>)>;
 
     /// Construct a transmit token.
-    fn transmit(&mut self) -> Option<Self::TxToken<'_>>;
+    ///
+    /// The timestamp must be a number of milliseconds, monotonically increasing since an
+    /// arbitrary moment in time, such as system startup.
+    fn transmit(&mut self, timestamp: Instant) -> Option<Self::TxToken<'_>>;
 
     /// Get a description of device capabilities.
     fn capabilities(&self) -> DeviceCapabilities;
@@ -337,12 +342,9 @@ pub trait RxToken {
     ///
     /// This method receives a packet and then calls the given closure `f` with the raw
     /// packet bytes as argument.
-    ///
-    /// The timestamp must be a number of milliseconds, monotonically increasing since an
-    /// arbitrary moment in time, such as system startup.
-    fn consume<R, F>(self, timestamp: Instant, f: F) -> Result<R>
+    fn consume<R, F>(self, f: F) -> R
     where
-        F: FnOnce(&mut [u8]) -> Result<R>;
+        F: FnOnce(&mut [u8]) -> R;
 }
 
 /// A token to transmit a single network packet.
@@ -353,10 +355,7 @@ pub trait TxToken {
     /// closure `f` with a mutable reference to that buffer. The closure should construct
     /// a valid network packet (e.g. an ethernet packet) in the buffer. When the closure
     /// returns, the transmit buffer is sent out.
-    ///
-    /// The timestamp must be a number of milliseconds, monotonically increasing since an
-    /// arbitrary moment in time, such as system startup.
-    fn consume<R, F>(self, timestamp: Instant, len: usize, f: F) -> Result<R>
+    fn consume<R, F>(self, len: usize, f: F) -> R
     where
-        F: FnOnce(&mut [u8]) -> Result<R>;
+        F: FnOnce(&mut [u8]) -> R;
 }

+ 41 - 32
src/phy/pcap_writer.rs

@@ -6,7 +6,6 @@ use std::io::Write;
 
 use crate::phy::{self, Device, DeviceCapabilities};
 use crate::time::Instant;
-use crate::Result;
 
 enum_with_unknown! {
     /// Captured packet header type.
@@ -177,30 +176,37 @@ where
         self.lower.capabilities()
     }
 
-    fn receive(&mut self) -> Option<(Self::RxToken<'_>, Self::TxToken<'_>)> {
+    fn receive(&mut self, timestamp: Instant) -> Option<(Self::RxToken<'_>, Self::TxToken<'_>)> {
         let sink = &self.sink;
         let mode = self.mode;
-        self.lower.receive().map(move |(rx_token, tx_token)| {
-            let rx = RxToken {
-                token: rx_token,
-                sink,
-                mode,
-            };
-            let tx = TxToken {
-                token: tx_token,
-                sink,
-                mode,
-            };
-            (rx, tx)
-        })
+        self.lower
+            .receive(timestamp)
+            .map(move |(rx_token, tx_token)| {
+                let rx = RxToken {
+                    token: rx_token,
+                    sink,
+                    mode,
+                    timestamp,
+                };
+                let tx = TxToken {
+                    token: tx_token,
+                    sink,
+                    mode,
+                    timestamp,
+                };
+                (rx, tx)
+            })
     }
 
-    fn transmit(&mut self) -> Option<Self::TxToken<'_>> {
+    fn transmit(&mut self, timestamp: Instant) -> Option<Self::TxToken<'_>> {
         let sink = &self.sink;
         let mode = self.mode;
-        self.lower
-            .transmit()
-            .map(move |token| TxToken { token, sink, mode })
+        self.lower.transmit(timestamp).map(move |token| TxToken {
+            token,
+            sink,
+            mode,
+            timestamp,
+        })
     }
 }
 
@@ -209,16 +215,17 @@ pub struct RxToken<'a, Rx: phy::RxToken, S: PcapSink> {
     token: Rx,
     sink: &'a RefCell<S>,
     mode: PcapMode,
+    timestamp: Instant,
 }
 
 impl<'a, Rx: phy::RxToken, S: PcapSink> phy::RxToken for RxToken<'a, Rx, S> {
-    fn consume<R, F: FnOnce(&mut [u8]) -> Result<R>>(self, timestamp: Instant, f: F) -> Result<R> {
-        let Self { token, sink, mode } = self;
-        token.consume(timestamp, |buffer| {
-            match mode {
-                PcapMode::Both | PcapMode::RxOnly => {
-                    sink.borrow_mut().packet(timestamp, buffer.as_ref())
-                }
+    fn consume<R, F: FnOnce(&mut [u8]) -> R>(self, f: F) -> R {
+        self.token.consume(|buffer| {
+            match self.mode {
+                PcapMode::Both | PcapMode::RxOnly => self
+                    .sink
+                    .borrow_mut()
+                    .packet(self.timestamp, buffer.as_ref()),
                 PcapMode::TxOnly => (),
             }
             f(buffer)
@@ -231,18 +238,20 @@ pub struct TxToken<'a, Tx: phy::TxToken, S: PcapSink> {
     token: Tx,
     sink: &'a RefCell<S>,
     mode: PcapMode,
+    timestamp: Instant,
 }
 
 impl<'a, Tx: phy::TxToken, S: PcapSink> phy::TxToken for TxToken<'a, Tx, S> {
-    fn consume<R, F>(self, timestamp: Instant, len: usize, f: F) -> Result<R>
+    fn consume<R, F>(self, len: usize, f: F) -> R
     where
-        F: FnOnce(&mut [u8]) -> Result<R>,
+        F: FnOnce(&mut [u8]) -> R,
     {
-        let Self { token, sink, mode } = self;
-        token.consume(timestamp, len, |buffer| {
+        self.token.consume(len, |buffer| {
             let result = f(buffer);
-            match mode {
-                PcapMode::Both | PcapMode::TxOnly => sink.borrow_mut().packet(timestamp, buffer),
+            match self.mode {
+                PcapMode::Both | PcapMode::TxOnly => {
+                    self.sink.borrow_mut().packet(self.timestamp, buffer)
+                }
                 PcapMode::RxOnly => (),
             };
             result

+ 11 - 9
src/phy/raw_socket.rs

@@ -6,7 +6,6 @@ use std::vec::Vec;
 
 use crate::phy::{self, sys, Device, DeviceCapabilities, Medium};
 use crate::time::Instant;
-use crate::Result;
 
 /// A socket that captures or transmits the complete frame.
 #[derive(Debug)]
@@ -70,7 +69,7 @@ impl Device for RawSocket {
         }
     }
 
-    fn receive(&mut self) -> Option<(Self::RxToken<'_>, Self::TxToken<'_>)> {
+    fn receive(&mut self, _timestamp: Instant) -> Option<(Self::RxToken<'_>, Self::TxToken<'_>)> {
         let mut lower = self.lower.borrow_mut();
         let mut buffer = vec![0; self.mtu];
         match lower.recv(&mut buffer[..]) {
@@ -87,7 +86,7 @@ impl Device for RawSocket {
         }
     }
 
-    fn transmit(&mut self) -> Option<Self::TxToken<'_>> {
+    fn transmit(&mut self, _timestamp: Instant) -> Option<Self::TxToken<'_>> {
         Some(TxToken {
             lower: self.lower.clone(),
         })
@@ -100,9 +99,9 @@ pub struct RxToken {
 }
 
 impl phy::RxToken for RxToken {
-    fn consume<R, F>(mut self, _timestamp: Instant, f: F) -> Result<R>
+    fn consume<R, F>(mut self, f: F) -> R
     where
-        F: FnOnce(&mut [u8]) -> Result<R>,
+        F: FnOnce(&mut [u8]) -> R,
     {
         f(&mut self.buffer[..])
     }
@@ -114,17 +113,20 @@ pub struct TxToken {
 }
 
 impl phy::TxToken for TxToken {
-    fn consume<R, F>(self, _timestamp: Instant, len: usize, f: F) -> Result<R>
+    fn consume<R, F>(self, len: usize, f: F) -> R
     where
-        F: FnOnce(&mut [u8]) -> Result<R>,
+        F: FnOnce(&mut [u8]) -> R,
     {
         let mut lower = self.lower.borrow_mut();
         let mut buffer = vec![0; len];
         let result = f(&mut buffer);
         match lower.send(&buffer[..]) {
-            Ok(_) => result,
-            Err(err) if err.kind() == io::ErrorKind::WouldBlock => Err(crate::Error::Exhausted),
+            Ok(_) => {}
+            Err(err) if err.kind() == io::ErrorKind::WouldBlock => {
+                net_debug!("phy: tx failed due to WouldBlock")
+            }
             Err(err) => panic!("{}", err),
         }
+        result
     }
 }

+ 22 - 30
src/phy/tracer.rs

@@ -2,10 +2,7 @@ use core::fmt;
 
 use crate::phy::{self, Device, DeviceCapabilities, Medium};
 use crate::time::Instant;
-use crate::{
-    wire::pretty_print::{PrettyIndent, PrettyPrint},
-    Result,
-};
+use crate::wire::pretty_print::{PrettyIndent, PrettyPrint};
 
 /// A tracer device.
 ///
@@ -56,38 +53,41 @@ impl<D: Device> Device for Tracer<D> {
         self.inner.capabilities()
     }
 
-    fn receive(&mut self) -> Option<(Self::RxToken<'_>, Self::TxToken<'_>)> {
+    fn receive(&mut self, timestamp: Instant) -> Option<(Self::RxToken<'_>, Self::TxToken<'_>)> {
         let &mut Self {
             ref mut inner,
             writer,
             ..
         } = self;
         let medium = inner.capabilities().medium;
-        inner.receive().map(|(rx_token, tx_token)| {
+        inner.receive(timestamp).map(|(rx_token, tx_token)| {
             let rx = RxToken {
                 token: rx_token,
                 writer,
                 medium,
+                timestamp,
             };
             let tx = TxToken {
                 token: tx_token,
                 writer,
                 medium,
+                timestamp,
             };
             (rx, tx)
         })
     }
 
-    fn transmit(&mut self) -> Option<Self::TxToken<'_>> {
+    fn transmit(&mut self, timestamp: Instant) -> Option<Self::TxToken<'_>> {
         let &mut Self {
             ref mut inner,
             writer,
         } = self;
         let medium = inner.capabilities().medium;
-        inner.transmit().map(|tx_token| TxToken {
+        inner.transmit(timestamp).map(|tx_token| TxToken {
             token: tx_token,
             medium,
             writer,
+            timestamp,
         })
     }
 }
@@ -97,24 +97,20 @@ pub struct RxToken<Rx: phy::RxToken> {
     token: Rx,
     writer: fn(Instant, Packet),
     medium: Medium,
+    timestamp: Instant,
 }
 
 impl<Rx: phy::RxToken> phy::RxToken for RxToken<Rx> {
-    fn consume<R, F>(self, timestamp: Instant, f: F) -> Result<R>
+    fn consume<R, F>(self, f: F) -> R
     where
-        F: FnOnce(&mut [u8]) -> Result<R>,
+        F: FnOnce(&mut [u8]) -> R,
     {
-        let Self {
-            token,
-            writer,
-            medium,
-        } = self;
-        token.consume(timestamp, |buffer| {
-            writer(
-                timestamp,
+        self.token.consume(|buffer| {
+            (self.writer)(
+                self.timestamp,
                 Packet {
                     buffer,
-                    medium,
+                    medium: self.medium,
                     prefix: "<- ",
                 },
             );
@@ -128,25 +124,21 @@ pub struct TxToken<Tx: phy::TxToken> {
     token: Tx,
     writer: fn(Instant, Packet),
     medium: Medium,
+    timestamp: Instant,
 }
 
 impl<Tx: phy::TxToken> phy::TxToken for TxToken<Tx> {
-    fn consume<R, F>(self, timestamp: Instant, len: usize, f: F) -> Result<R>
+    fn consume<R, F>(self, len: usize, f: F) -> R
     where
-        F: FnOnce(&mut [u8]) -> Result<R>,
+        F: FnOnce(&mut [u8]) -> R,
     {
-        let Self {
-            token,
-            writer,
-            medium,
-        } = self;
-        token.consume(timestamp, len, |buffer| {
+        self.token.consume(len, |buffer| {
             let result = f(buffer);
-            writer(
-                timestamp,
+            (self.writer)(
+                self.timestamp,
                 Packet {
                     buffer,
-                    medium,
+                    medium: self.medium,
                     prefix: "-> ",
                 },
             );

+ 13 - 8
src/phy/tuntap_interface.rs

@@ -6,7 +6,6 @@ use std::vec::Vec;
 
 use crate::phy::{self, sys, Device, DeviceCapabilities, Medium};
 use crate::time::Instant;
-use crate::Result;
 
 /// A virtual TUN (IP) or TAP (Ethernet) interface.
 #[derive(Debug)]
@@ -52,7 +51,7 @@ impl Device for TunTapInterface {
         }
     }
 
-    fn receive(&mut self) -> Option<(Self::RxToken<'_>, Self::TxToken<'_>)> {
+    fn receive(&mut self, _timestamp: Instant) -> Option<(Self::RxToken<'_>, Self::TxToken<'_>)> {
         let mut lower = self.lower.borrow_mut();
         let mut buffer = vec![0; self.mtu];
         match lower.recv(&mut buffer[..]) {
@@ -69,7 +68,7 @@ impl Device for TunTapInterface {
         }
     }
 
-    fn transmit(&mut self) -> Option<Self::TxToken<'_>> {
+    fn transmit(&mut self, _timestamp: Instant) -> Option<Self::TxToken<'_>> {
         Some(TxToken {
             lower: self.lower.clone(),
         })
@@ -82,9 +81,9 @@ pub struct RxToken {
 }
 
 impl phy::RxToken for RxToken {
-    fn consume<R, F>(mut self, _timestamp: Instant, f: F) -> Result<R>
+    fn consume<R, F>(mut self, f: F) -> R
     where
-        F: FnOnce(&mut [u8]) -> Result<R>,
+        F: FnOnce(&mut [u8]) -> R,
     {
         f(&mut self.buffer[..])
     }
@@ -96,14 +95,20 @@ pub struct TxToken {
 }
 
 impl phy::TxToken for TxToken {
-    fn consume<R, F>(self, _timestamp: Instant, len: usize, f: F) -> Result<R>
+    fn consume<R, F>(self, len: usize, f: F) -> R
     where
-        F: FnOnce(&mut [u8]) -> Result<R>,
+        F: FnOnce(&mut [u8]) -> R,
     {
         let mut lower = self.lower.borrow_mut();
         let mut buffer = vec![0; len];
         let result = f(&mut buffer);
-        lower.send(&buffer[..]).unwrap();
+        match lower.send(&buffer[..]) {
+            Ok(_) => {}
+            Err(err) if err.kind() == io::ErrorKind::WouldBlock => {
+                net_debug!("phy: tx failed due to WouldBlock")
+            }
+            Err(err) => panic!("{}", err),
+        }
         result
     }
 }