浏览代码

Compute soft deadline in poll() and use nonblocking sockets.

Before this commit, anything that touched RawSocket or TapInterface
worked partly by accident and partly because of a horrible crutch
that resulted in massive latencies as well as inevitable packet loss
every time an ARP request had to be issued. Also, there was no way
to use poll() other than by continuously calling it in a busy loop.

After this commit, poll() indicates when the earliest timer expires,
and so the caller can sleep until that moment (or until packets
arrive).

Note that there is a subtle problem remaining: every time poll()
is called, every socket with a pending outbound packet whose
IP address doesn't correspond to a MAC address will send a new
ARP request, resulting in potentially a whole lot of such requests.
ARP rate limiting is a separate topic though.
whitequark 7 年之前
父节点
当前提交
39464a53fc

+ 6 - 8
examples/client.rs

@@ -8,7 +8,8 @@ mod utils;
 
 
 use std::str::{self, FromStr};
 use std::str::{self, FromStr};
 use std::time::Instant;
 use std::time::Instant;
-use smoltcp::Error;
+use std::os::unix::io::AsRawFd;
+use smoltcp::phy::wait as phy_wait;
 use smoltcp::wire::{EthernetAddress, IpAddress};
 use smoltcp::wire::{EthernetAddress, IpAddress};
 use smoltcp::iface::{ArpCache, SliceArpCache, EthernetInterface};
 use smoltcp::iface::{ArpCache, SliceArpCache, EthernetInterface};
 use smoltcp::socket::{AsSocket, SocketSet};
 use smoltcp::socket::{AsSocket, SocketSet};
@@ -25,6 +26,7 @@ fn main() {
 
 
     let mut matches = utils::parse_options(&opts, free);
     let mut matches = utils::parse_options(&opts, free);
     let device = utils::parse_tap_options(&mut matches);
     let device = utils::parse_tap_options(&mut matches);
+    let fd = device.as_raw_fd();
     let device = utils::parse_middleware_options(&mut matches, device, /*loopback=*/false);
     let device = utils::parse_middleware_options(&mut matches, device, /*loopback=*/false);
     let address = IpAddress::from_str(&matches.free[0]).expect("invalid address format");
     let address = IpAddress::from_str(&matches.free[0]).expect("invalid address format");
     let port = u16::from_str(&matches.free[1]).expect("invalid port format");
     let port = u16::from_str(&matches.free[1]).expect("invalid port format");
@@ -86,12 +88,8 @@ fn main() {
             }
             }
         }
         }
 
 
-        let timestamp = Instant::now().duration_since(startup_time);
-        let timestamp_ms = (timestamp.as_secs() * 1000) +
-                           (timestamp.subsec_nanos() / 1000000) as u64;
-        match iface.poll(&mut sockets, timestamp_ms) {
-            Ok(()) | Err(Error::Exhausted) => (),
-            Err(e) => debug!("poll error: {}", e)
-        }
+        let timestamp = utils::millis_since(startup_time);
+        let poll_at = iface.poll(&mut sockets, timestamp).expect("poll error");
+        phy_wait(fd, poll_at).expect("wait error");
     }
     }
 }
 }

+ 6 - 4
examples/loopback.rs

@@ -16,7 +16,6 @@ extern crate getopts;
 mod utils;
 mod utils;
 
 
 use core::str;
 use core::str;
-use smoltcp::Error;
 use smoltcp::phy::Loopback;
 use smoltcp::phy::Loopback;
 use smoltcp::wire::{EthernetAddress, IpAddress};
 use smoltcp::wire::{EthernetAddress, IpAddress};
 use smoltcp::iface::{ArpCache, SliceArpCache, EthernetInterface};
 use smoltcp::iface::{ArpCache, SliceArpCache, EthernetInterface};
@@ -161,11 +160,14 @@ fn main() {
         }
         }
 
 
         match iface.poll(&mut socket_set, clock.elapsed()) {
         match iface.poll(&mut socket_set, clock.elapsed()) {
-            Ok(()) | Err(Error::Exhausted) => (),
+            Ok(Some(poll_at)) => {
+                let delay = poll_at - clock.elapsed();
+                debug!("sleeping for {} ms", delay);
+                clock.advance(delay)
+            }
+            Ok(None) => clock.advance(1),
             Err(e) => debug!("poll error: {}", e)
             Err(e) => debug!("poll error: {}", e)
         }
         }
-
-        clock.advance(1);
     }
     }
 
 
     if done {
     if done {

+ 23 - 16
examples/ping.rs

@@ -8,8 +8,10 @@ extern crate byteorder;
 mod utils;
 mod utils;
 
 
 use std::str::{self, FromStr};
 use std::str::{self, FromStr};
-use std::time::{Duration, Instant};
-use smoltcp::Error;
+use std::cmp;
+use std::time::Instant;
+use std::os::unix::io::AsRawFd;
+use smoltcp::phy::wait as phy_wait;
 use smoltcp::wire::{EthernetAddress, IpVersion, IpProtocol, IpAddress,
 use smoltcp::wire::{EthernetAddress, IpVersion, IpProtocol, IpAddress,
                     Ipv4Address, Ipv4Packet, Ipv4Repr,
                     Ipv4Address, Ipv4Packet, Ipv4Repr,
                     Icmpv4Repr, Icmpv4Packet};
                     Icmpv4Repr, Icmpv4Packet};
@@ -35,6 +37,7 @@ fn main() {
 
 
     let mut matches = utils::parse_options(&opts, free);
     let mut matches = utils::parse_options(&opts, free);
     let device = utils::parse_tap_options(&mut matches);
     let device = utils::parse_tap_options(&mut matches);
+    let fd = device.as_raw_fd();
     let device = utils::parse_middleware_options(&mut matches, device, /*loopback=*/false);
     let device = utils::parse_middleware_options(&mut matches, device, /*loopback=*/false);
     let address  = Ipv4Address::from_str(&matches.free[0]).expect("invalid address format");
     let address  = Ipv4Address::from_str(&matches.free[0]).expect("invalid address format");
     let count    = matches.opt_str("count").map(|s| usize::from_str(&s).unwrap()).unwrap_or(4);
     let count    = matches.opt_str("count").map(|s| usize::from_str(&s).unwrap()).unwrap_or(4);
@@ -61,7 +64,7 @@ fn main() {
     let mut sockets = SocketSet::new(vec![]);
     let mut sockets = SocketSet::new(vec![]);
     let raw_handle = sockets.add(raw_socket);
     let raw_handle = sockets.add(raw_socket);
 
 
-    let mut send_next = Duration::default();
+    let mut send_at = 0;
     let mut seq_no = 0;
     let mut seq_no = 0;
     let mut received = 0;
     let mut received = 0;
     let mut echo_payload = [0xffu8; 40];
     let mut echo_payload = [0xffu8; 40];
@@ -75,11 +78,8 @@ fn main() {
             let timestamp_us = (timestamp.as_secs() * 1000000) +
             let timestamp_us = (timestamp.as_secs() * 1000000) +
                 (timestamp.subsec_nanos() / 1000) as u64;
                 (timestamp.subsec_nanos() / 1000) as u64;
 
 
-            if seq_no == count as u16 && waiting_queue.is_empty() {
-                break;
-            }
-
-            if socket.can_send() && seq_no < count as u16 && send_next <= timestamp {
+            if socket.can_send() && seq_no < count as u16 &&
+                    send_at <= utils::millis_since(startup_time) {
                 NetworkEndian::write_u64(&mut echo_payload, timestamp_us);
                 NetworkEndian::write_u64(&mut echo_payload, timestamp_us);
                 let icmp_repr = Icmpv4Repr::EchoRequest {
                 let icmp_repr = Icmpv4Repr::EchoRequest {
                     ident: 1,
                     ident: 1,
@@ -105,7 +105,7 @@ fn main() {
 
 
                 waiting_queue.insert(seq_no, timestamp);
                 waiting_queue.insert(seq_no, timestamp);
                 seq_no += 1;
                 seq_no += 1;
-                send_next += Duration::new(interval, 0);
+                send_at += interval * 1000;
             }
             }
 
 
             if socket.can_recv() {
             if socket.can_recv() {
@@ -137,16 +137,23 @@ fn main() {
                     println!("From {} icmp_seq={} timeout", remote_addr, seq);
                     println!("From {} icmp_seq={} timeout", remote_addr, seq);
                     false
                     false
                 }
                 }
-            })
+            });
+
+            if seq_no == count as u16 && waiting_queue.is_empty() {
+                break
+            }
         }
         }
 
 
-        let timestamp = Instant::now().duration_since(startup_time);
-        let timestamp_ms = (timestamp.as_secs() * 1000) +
-            (timestamp.subsec_nanos() / 1000000) as u64;
-        match iface.poll(&mut sockets, timestamp_ms) {
-            Ok(()) | Err(Error::Exhausted) => (),
-            Err(e) => debug!("poll error: {}", e),
+        let timestamp = utils::millis_since(startup_time);
+
+        let poll_at = iface.poll(&mut sockets, timestamp).expect("poll error");
+        let mut resume_at = Some(send_at);
+        if let Some(poll_at) = poll_at {
+            resume_at = resume_at.map(|at| cmp::min(at, poll_at))
         }
         }
+
+        debug!("waiting until {:?} ms", resume_at);
+        phy_wait(fd, resume_at.map(|at| at.saturating_sub(timestamp))).expect("wait error");
     }
     }
 
 
     println!("--- {} ping statistics ---", remote_addr);
     println!("--- {} ping statistics ---", remote_addr);

+ 6 - 8
examples/server.rs

@@ -9,7 +9,8 @@ mod utils;
 use std::str;
 use std::str;
 use std::fmt::Write;
 use std::fmt::Write;
 use std::time::Instant;
 use std::time::Instant;
-use smoltcp::Error;
+use std::os::unix::io::AsRawFd;
+use smoltcp::phy::wait as phy_wait;
 use smoltcp::wire::{EthernetAddress, IpAddress};
 use smoltcp::wire::{EthernetAddress, IpAddress};
 use smoltcp::iface::{ArpCache, SliceArpCache, EthernetInterface};
 use smoltcp::iface::{ArpCache, SliceArpCache, EthernetInterface};
 use smoltcp::socket::{AsSocket, SocketSet};
 use smoltcp::socket::{AsSocket, SocketSet};
@@ -25,6 +26,7 @@ fn main() {
 
 
     let mut matches = utils::parse_options(&opts, free);
     let mut matches = utils::parse_options(&opts, free);
     let device = utils::parse_tap_options(&mut matches);
     let device = utils::parse_tap_options(&mut matches);
+    let fd = device.as_raw_fd();
     let device = utils::parse_middleware_options(&mut matches, device, /*loopback=*/false);
     let device = utils::parse_middleware_options(&mut matches, device, /*loopback=*/false);
 
 
     let startup_time = Instant::now();
     let startup_time = Instant::now();
@@ -154,12 +156,8 @@ fn main() {
             }
             }
         }
         }
 
 
-        let timestamp = Instant::now().duration_since(startup_time);
-        let timestamp_ms = (timestamp.as_secs() * 1000) +
-                           (timestamp.subsec_nanos() / 1000000) as u64;
-        match iface.poll(&mut sockets, timestamp_ms) {
-            Ok(()) | Err(Error::Exhausted) => (),
-            Err(e) => debug!("poll error: {}", e)
-        }
+        let timestamp = utils::millis_since(startup_time);
+        let poll_at = iface.poll(&mut sockets, timestamp).expect("poll error");
+        phy_wait(fd, poll_at).expect("wait error");
     }
     }
 }
 }

+ 7 - 0
examples/utils.rs

@@ -129,3 +129,10 @@ pub fn parse_middleware_options<D: Device>(matches: &mut Matches, device: D, loo
     device.set_bucket_interval(shaping_interval);
     device.set_bucket_interval(shaping_interval);
     device
     device
 }
 }
+
+pub fn millis_since(startup_time: Instant) -> u64 {
+    let duration = Instant::now().duration_since(startup_time);
+    let duration_ms = (duration.as_secs() * 1000) +
+        (duration.subsec_nanos() / 1000000) as u64;
+    duration_ms
+}

+ 109 - 81
src/iface/ethernet.rs

@@ -23,8 +23,8 @@ pub struct Interface<'a, 'b, 'c, DeviceT: Device + 'a> {
     protocol_addrs: ManagedSlice<'c, IpAddress>,
     protocol_addrs: ManagedSlice<'c, IpAddress>,
 }
 }
 
 
-enum Response<'a> {
-    Nop,
+enum Packet<'a> {
+    None,
     Arp(ArpRepr),
     Arp(ArpRepr),
     Icmpv4(Ipv4Repr, Icmpv4Repr<'a>),
     Icmpv4(Ipv4Repr, Icmpv4Repr<'a>),
     Raw((IpRepr, &'a [u8])),
     Raw((IpRepr, &'a [u8])),
@@ -107,38 +107,103 @@ impl<'a, 'b, 'c, DeviceT: Device + 'a> Interface<'a, 'b, 'c, DeviceT> {
         self.protocol_addrs.iter().any(|&probe| probe == addr)
         self.protocol_addrs.iter().any(|&probe| probe == addr)
     }
     }
 
 
-    /// Receive and process a packet, if available, and then transmit a packet, if necessary,
-    /// handling the given set of sockets.
+    /// Transmit packets queued in the given sockets, and receive packets queued
+    /// in the device.
     ///
     ///
-    /// The timestamp is a monotonically increasing number of milliseconds.
-    pub fn poll(&mut self, sockets: &mut SocketSet, timestamp: u64) -> Result<()> {
-        // First, transmit any outgoing packets.
-        loop {
-            if self.dispatch(sockets, timestamp)? { break }
+    /// The timestamp must be a number of milliseconds, monotonically increasing
+    /// since an arbitrary moment in time, such as system startup.
+    ///
+    /// This function returns a _soft deadline_ for calling it the next time.
+    /// That is, if `iface.poll(&mut sockets, 1000)` returns `Ok(Some(2000))`,
+    /// it harmless (but wastes energy) to call it 500 ms later, and potentially
+    /// harmful (impacting quality of service) to call it 1500 ms later.
+    pub fn poll(&mut self, sockets: &mut SocketSet, timestamp: u64) -> Result<Option<u64>> {
+        self.socket_egress(sockets, timestamp)?;
+
+        if self.socket_ingress(sockets, timestamp)? {
+            Ok(Some(0))
+        } else {
+            Ok(sockets.iter().filter_map(|socket| socket.poll_at()).min())
         }
         }
-
-        // Now, receive any incoming packets.
-        self.process(sockets, timestamp)
     }
     }
 
 
-    fn process(&mut self, sockets: &mut SocketSet, timestamp: u64) -> Result<()> {
+    fn socket_ingress(&mut self, sockets: &mut SocketSet, timestamp: u64) -> Result<bool> {
+        let mut processed_any = false;
         loop {
         loop {
-            let frame = self.device.receive(timestamp)?;
-            let response = self.process_ethernet(sockets, timestamp, &frame)?;
-            self.dispatch_response(timestamp, response)?;
+            let frame =
+                match self.device.receive(timestamp) {
+                    Ok(frame) => frame,
+                    Err(Error::Exhausted) => break, // nothing to receive
+                    Err(err) => return Err(err)
+                };
+
+            let response =
+                match self.process_ethernet(sockets, timestamp, &frame) {
+                    Ok(response) => response,
+                    Err(err) => {
+                        net_debug!("cannot process ingress packet: {}", err);
+                        continue
+                    }
+                };
+            processed_any = true;
+
+            match self.dispatch(timestamp, response) {
+                Ok(()) => (),
+                Err(err) => {
+                    net_debug!("cannot dispatch response packet: {}", err);
+                    continue
+                }
+            }
         }
         }
+        Ok(processed_any)
+    }
+
+    fn socket_egress(&mut self, sockets: &mut SocketSet, timestamp: u64) -> Result<()> {
+        let mut limits = self.device.limits();
+        limits.max_transmission_unit -= EthernetFrame::<&[u8]>::header_len();
+
+        for socket in sockets.iter_mut() {
+            let mut device_result = Ok(());
+            let socket_result =
+                match socket {
+                    &mut Socket::Raw(ref mut socket) =>
+                        socket.dispatch(|response| {
+                            device_result = self.dispatch(timestamp, Packet::Raw(response));
+                            device_result
+                        }),
+                    &mut Socket::Udp(ref mut socket) =>
+                        socket.dispatch(|response| {
+                            device_result = self.dispatch(timestamp, Packet::Udp(response));
+                            device_result
+                        }),
+                    &mut Socket::Tcp(ref mut socket) =>
+                        socket.dispatch(timestamp, &limits, |response| {
+                            device_result = self.dispatch(timestamp, Packet::Tcp(response));
+                            device_result
+                        }),
+                    &mut Socket::__Nonexhaustive => unreachable!()
+                };
+            match (device_result, socket_result) {
+                (Ok(()), Err(Error::Exhausted)) => (), // nothing to transmit
+                (Err(err), _) | (_, Err(err)) =>
+                    net_debug!("cannot dispatch egress packet: {}", err),
+                (Ok(()), Ok(())) => ()
+            }
+        }
+
+        Ok(())
     }
     }
 
 
     fn process_ethernet<'frame, T: AsRef<[u8]>>
     fn process_ethernet<'frame, T: AsRef<[u8]>>
                        (&mut self, sockets: &mut SocketSet, timestamp: u64,
                        (&mut self, sockets: &mut SocketSet, timestamp: u64,
                         frame: &'frame T) ->
                         frame: &'frame T) ->
-                       Result<Response<'frame>> {
+                       Result<Packet<'frame>> {
         let eth_frame = EthernetFrame::new_checked(frame)?;
         let eth_frame = EthernetFrame::new_checked(frame)?;
 
 
         // Ignore any packets not directed to our hardware address.
         // Ignore any packets not directed to our hardware address.
         if !eth_frame.dst_addr().is_broadcast() &&
         if !eth_frame.dst_addr().is_broadcast() &&
                 eth_frame.dst_addr() != self.hardware_addr {
                 eth_frame.dst_addr() != self.hardware_addr {
-            return Ok(Response::Nop)
+            return Ok(Packet::None)
         }
         }
 
 
         match eth_frame.ethertype() {
         match eth_frame.ethertype() {
@@ -153,7 +218,7 @@ impl<'a, 'b, 'c, DeviceT: Device + 'a> Interface<'a, 'b, 'c, DeviceT> {
 
 
     fn process_arp<'frame, T: AsRef<[u8]>>
     fn process_arp<'frame, T: AsRef<[u8]>>
                   (&mut self, eth_frame: &EthernetFrame<&'frame T>) ->
                   (&mut self, eth_frame: &EthernetFrame<&'frame T>) ->
-                  Result<Response<'frame>> {
+                  Result<Packet<'frame>> {
         let arp_packet = ArpPacket::new_checked(eth_frame.payload())?;
         let arp_packet = ArpPacket::new_checked(eth_frame.payload())?;
         let arp_repr = ArpRepr::parse(&arp_packet)?;
         let arp_repr = ArpRepr::parse(&arp_packet)?;
 
 
@@ -175,7 +240,7 @@ impl<'a, 'b, 'c, DeviceT: Device + 'a> Interface<'a, 'b, 'c, DeviceT> {
 
 
                 if operation == ArpOperation::Request &&
                 if operation == ArpOperation::Request &&
                         self.has_protocol_addr(target_protocol_addr) {
                         self.has_protocol_addr(target_protocol_addr) {
-                    Ok(Response::Arp(ArpRepr::EthernetIpv4 {
+                    Ok(Packet::Arp(ArpRepr::EthernetIpv4 {
                         operation: ArpOperation::Reply,
                         operation: ArpOperation::Reply,
                         source_hardware_addr: self.hardware_addr,
                         source_hardware_addr: self.hardware_addr,
                         source_protocol_addr: target_protocol_addr,
                         source_protocol_addr: target_protocol_addr,
@@ -183,7 +248,7 @@ impl<'a, 'b, 'c, DeviceT: Device + 'a> Interface<'a, 'b, 'c, DeviceT> {
                         target_protocol_addr: source_protocol_addr
                         target_protocol_addr: source_protocol_addr
                     }))
                     }))
                 } else {
                 } else {
-                    Ok(Response::Nop)
+                    Ok(Packet::None)
                 }
                 }
             }
             }
 
 
@@ -194,7 +259,7 @@ impl<'a, 'b, 'c, DeviceT: Device + 'a> Interface<'a, 'b, 'c, DeviceT> {
     fn process_ipv4<'frame, T: AsRef<[u8]>>
     fn process_ipv4<'frame, T: AsRef<[u8]>>
                    (&mut self, sockets: &mut SocketSet, timestamp: u64,
                    (&mut self, sockets: &mut SocketSet, timestamp: u64,
                     eth_frame: &EthernetFrame<&'frame T>) ->
                     eth_frame: &EthernetFrame<&'frame T>) ->
-                   Result<Response<'frame>> {
+                   Result<Packet<'frame>> {
         let ipv4_packet = Ipv4Packet::new_checked(eth_frame.payload())?;
         let ipv4_packet = Ipv4Packet::new_checked(eth_frame.payload())?;
         let ipv4_repr = Ipv4Repr::parse(&ipv4_packet)?;
         let ipv4_repr = Ipv4Repr::parse(&ipv4_packet)?;
 
 
@@ -229,7 +294,7 @@ impl<'a, 'b, 'c, DeviceT: Device + 'a> Interface<'a, 'b, 'c, DeviceT> {
 
 
         if !self.has_protocol_addr(ipv4_repr.dst_addr) {
         if !self.has_protocol_addr(ipv4_repr.dst_addr) {
             // Ignore IP packets not directed at us.
             // Ignore IP packets not directed at us.
-            return Ok(Response::Nop)
+            return Ok(Packet::None)
         }
         }
 
 
         match ipv4_repr.protocol {
         match ipv4_repr.protocol {
@@ -240,7 +305,7 @@ impl<'a, 'b, 'c, DeviceT: Device + 'a> Interface<'a, 'b, 'c, DeviceT> {
             IpProtocol::Tcp =>
             IpProtocol::Tcp =>
                 Self::process_tcp(sockets, timestamp, ip_repr, ip_payload),
                 Self::process_tcp(sockets, timestamp, ip_repr, ip_payload),
             _ if handled_by_raw_socket =>
             _ if handled_by_raw_socket =>
-                Ok(Response::Nop),
+                Ok(Packet::None),
             _ => {
             _ => {
                 let icmp_reply_repr = Icmpv4Repr::DstUnreachable {
                 let icmp_reply_repr = Icmpv4Repr::DstUnreachable {
                     reason: Icmpv4DstUnreachable::ProtoUnreachable,
                     reason: Icmpv4DstUnreachable::ProtoUnreachable,
@@ -253,13 +318,13 @@ impl<'a, 'b, 'c, DeviceT: Device + 'a> Interface<'a, 'b, 'c, DeviceT> {
                     protocol:    IpProtocol::Icmp,
                     protocol:    IpProtocol::Icmp,
                     payload_len: icmp_reply_repr.buffer_len()
                     payload_len: icmp_reply_repr.buffer_len()
                 };
                 };
-                Ok(Response::Icmpv4(ipv4_reply_repr, icmp_reply_repr))
+                Ok(Packet::Icmpv4(ipv4_reply_repr, icmp_reply_repr))
             }
             }
         }
         }
     }
     }
 
 
     fn process_icmpv4<'frame>(ipv4_repr: Ipv4Repr, ip_payload: &'frame [u8]) ->
     fn process_icmpv4<'frame>(ipv4_repr: Ipv4Repr, ip_payload: &'frame [u8]) ->
-                             Result<Response<'frame>> {
+                             Result<Packet<'frame>> {
         let icmp_packet = Icmpv4Packet::new_checked(ip_payload)?;
         let icmp_packet = Icmpv4Packet::new_checked(ip_payload)?;
         let icmp_repr = Icmpv4Repr::parse(&icmp_packet)?;
         let icmp_repr = Icmpv4Repr::parse(&icmp_packet)?;
 
 
@@ -277,11 +342,11 @@ impl<'a, 'b, 'c, DeviceT: Device + 'a> Interface<'a, 'b, 'c, DeviceT> {
                     protocol:    IpProtocol::Icmp,
                     protocol:    IpProtocol::Icmp,
                     payload_len: icmp_reply_repr.buffer_len()
                     payload_len: icmp_reply_repr.buffer_len()
                 };
                 };
-                Ok(Response::Icmpv4(ipv4_reply_repr, icmp_reply_repr))
+                Ok(Packet::Icmpv4(ipv4_reply_repr, icmp_reply_repr))
             }
             }
 
 
             // Ignore any echo replies.
             // Ignore any echo replies.
-            Icmpv4Repr::EchoReply { .. } => Ok(Response::Nop),
+            Icmpv4Repr::EchoReply { .. } => Ok(Packet::None),
 
 
             // FIXME: do something correct here?
             // FIXME: do something correct here?
             _ => Err(Error::Unrecognized),
             _ => Err(Error::Unrecognized),
@@ -290,7 +355,7 @@ impl<'a, 'b, 'c, DeviceT: Device + 'a> Interface<'a, 'b, 'c, DeviceT> {
 
 
     fn process_udp<'frame>(sockets: &mut SocketSet,
     fn process_udp<'frame>(sockets: &mut SocketSet,
                            ip_repr: IpRepr, ip_payload: &'frame [u8]) ->
                            ip_repr: IpRepr, ip_payload: &'frame [u8]) ->
-                          Result<Response<'frame>> {
+                          Result<Packet<'frame>> {
         let (src_addr, dst_addr) = (ip_repr.src_addr(), ip_repr.dst_addr());
         let (src_addr, dst_addr) = (ip_repr.src_addr(), ip_repr.dst_addr());
         let udp_packet = UdpPacket::new_checked(ip_payload)?;
         let udp_packet = UdpPacket::new_checked(ip_payload)?;
         let udp_repr = UdpRepr::parse(&udp_packet, &src_addr, &dst_addr)?;
         let udp_repr = UdpRepr::parse(&udp_packet, &src_addr, &dst_addr)?;
@@ -299,7 +364,7 @@ impl<'a, 'b, 'c, DeviceT: Device + 'a> Interface<'a, 'b, 'c, DeviceT> {
                 <Socket as AsSocket<UdpSocket>>::try_as_socket) {
                 <Socket as AsSocket<UdpSocket>>::try_as_socket) {
             match udp_socket.process(&ip_repr, &udp_repr) {
             match udp_socket.process(&ip_repr, &udp_repr) {
                 // The packet is valid and handled by socket.
                 // The packet is valid and handled by socket.
-                Ok(()) => return Ok(Response::Nop),
+                Ok(()) => return Ok(Packet::None),
                 // The packet isn't addressed to the socket.
                 // The packet isn't addressed to the socket.
                 Err(Error::Rejected) => continue,
                 Err(Error::Rejected) => continue,
                 // The packet is malformed, or addressed to the socket but cannot be accepted.
                 // The packet is malformed, or addressed to the socket but cannot be accepted.
@@ -321,7 +386,7 @@ impl<'a, 'b, 'c, DeviceT: Device + 'a> Interface<'a, 'b, 'c, DeviceT> {
                     protocol:    IpProtocol::Icmp,
                     protocol:    IpProtocol::Icmp,
                     payload_len: icmpv4_reply_repr.buffer_len()
                     payload_len: icmpv4_reply_repr.buffer_len()
                 };
                 };
-                Ok(Response::Icmpv4(ipv4_reply_repr, icmpv4_reply_repr))
+                Ok(Packet::Icmpv4(ipv4_reply_repr, icmpv4_reply_repr))
             },
             },
             IpRepr::Unspecified { .. } |
             IpRepr::Unspecified { .. } |
             IpRepr::__Nonexhaustive =>
             IpRepr::__Nonexhaustive =>
@@ -331,7 +396,7 @@ impl<'a, 'b, 'c, DeviceT: Device + 'a> Interface<'a, 'b, 'c, DeviceT> {
 
 
     fn process_tcp<'frame>(sockets: &mut SocketSet, timestamp: u64,
     fn process_tcp<'frame>(sockets: &mut SocketSet, timestamp: u64,
                            ip_repr: IpRepr, ip_payload: &'frame [u8]) ->
                            ip_repr: IpRepr, ip_payload: &'frame [u8]) ->
-                          Result<Response<'frame>> {
+                          Result<Packet<'frame>> {
         let (src_addr, dst_addr) = (ip_repr.src_addr(), ip_repr.dst_addr());
         let (src_addr, dst_addr) = (ip_repr.src_addr(), ip_repr.dst_addr());
         let tcp_packet = TcpPacket::new_checked(ip_payload)?;
         let tcp_packet = TcpPacket::new_checked(ip_payload)?;
         let tcp_repr = TcpRepr::parse(&tcp_packet, &src_addr, &dst_addr)?;
         let tcp_repr = TcpRepr::parse(&tcp_packet, &src_addr, &dst_addr)?;
@@ -340,7 +405,7 @@ impl<'a, 'b, 'c, DeviceT: Device + 'a> Interface<'a, 'b, 'c, DeviceT> {
                 <Socket as AsSocket<TcpSocket>>::try_as_socket) {
                 <Socket as AsSocket<TcpSocket>>::try_as_socket) {
             match tcp_socket.process(timestamp, &ip_repr, &tcp_repr) {
             match tcp_socket.process(timestamp, &ip_repr, &tcp_repr) {
                 // The packet is valid and handled by socket.
                 // The packet is valid and handled by socket.
-                Ok(reply) => return Ok(reply.map_or(Response::Nop, Response::Tcp)),
+                Ok(reply) => return Ok(reply.map_or(Packet::None, Packet::Tcp)),
                 // The packet isn't addressed to the socket.
                 // The packet isn't addressed to the socket.
                 // Send RST only if no other socket accepts the packet.
                 // Send RST only if no other socket accepts the packet.
                 Err(Error::Rejected) => continue,
                 Err(Error::Rejected) => continue,
@@ -351,48 +416,16 @@ impl<'a, 'b, 'c, DeviceT: Device + 'a> Interface<'a, 'b, 'c, DeviceT> {
 
 
         if tcp_repr.control == TcpControl::Rst {
         if tcp_repr.control == TcpControl::Rst {
             // Never reply to a TCP RST packet with another TCP RST packet.
             // Never reply to a TCP RST packet with another TCP RST packet.
-            Ok(Response::Nop)
+            Ok(Packet::None)
         } else {
         } else {
             // The packet wasn't handled by a socket, send a TCP RST packet.
             // The packet wasn't handled by a socket, send a TCP RST packet.
-            Ok(Response::Tcp(TcpSocket::rst_reply(&ip_repr, &tcp_repr)))
-        }
-    }
-
-    fn dispatch(&mut self, sockets: &mut SocketSet, timestamp: u64) -> Result<bool> {
-        let mut limits = self.device.limits();
-        limits.max_transmission_unit -= EthernetFrame::<&[u8]>::header_len();
-
-        let mut nothing_to_transmit = true;
-        for socket in sockets.iter_mut() {
-            let result = match socket {
-                &mut Socket::Raw(ref mut socket) =>
-                    socket.dispatch(|response|
-                        self.dispatch_response(timestamp, Response::Raw(response))),
-                &mut Socket::Udp(ref mut socket) =>
-                    socket.dispatch(|response|
-                        self.dispatch_response(timestamp, Response::Udp(response))),
-                &mut Socket::Tcp(ref mut socket) =>
-                    socket.dispatch(timestamp, &limits, |response|
-                        self.dispatch_response(timestamp, Response::Tcp(response))),
-                &mut Socket::__Nonexhaustive => unreachable!()
-            };
-
-            match result {
-                Ok(()) => {
-                    nothing_to_transmit = false;
-                    break
-                }
-                Err(Error::Exhausted) => continue,
-                Err(e) => return Err(e)
-            }
+            Ok(Packet::Tcp(TcpSocket::rst_reply(&ip_repr, &tcp_repr)))
         }
         }
-
-        Ok(nothing_to_transmit)
     }
     }
 
 
-    fn dispatch_response(&mut self, timestamp: u64, response: Response) -> Result<()> {
-        match response {
-            Response::Arp(arp_repr) => {
+    fn dispatch(&mut self, timestamp: u64, packet: Packet) -> Result<()> {
+        match packet {
+            Packet::Arp(arp_repr) => {
                 let dst_hardware_addr =
                 let dst_hardware_addr =
                     match arp_repr {
                     match arp_repr {
                         ArpRepr::EthernetIpv4 { target_hardware_addr, .. } => target_hardware_addr,
                         ArpRepr::EthernetIpv4 { target_hardware_addr, .. } => target_hardware_addr,
@@ -407,29 +440,29 @@ impl<'a, 'b, 'c, DeviceT: Device + 'a> Interface<'a, 'b, 'c, DeviceT> {
                     arp_repr.emit(&mut packet);
                     arp_repr.emit(&mut packet);
                 })
                 })
             },
             },
-            Response::Icmpv4(ipv4_repr, icmpv4_repr) => {
+            Packet::Icmpv4(ipv4_repr, icmpv4_repr) => {
                 self.dispatch_ip(timestamp, IpRepr::Ipv4(ipv4_repr), |_ip_repr, payload| {
                 self.dispatch_ip(timestamp, IpRepr::Ipv4(ipv4_repr), |_ip_repr, payload| {
                     icmpv4_repr.emit(&mut Icmpv4Packet::new(payload));
                     icmpv4_repr.emit(&mut Icmpv4Packet::new(payload));
                 })
                 })
             }
             }
-            Response::Raw((ip_repr, raw_packet)) => {
+            Packet::Raw((ip_repr, raw_packet)) => {
                 self.dispatch_ip(timestamp, ip_repr, |_ip_repr, payload| {
                 self.dispatch_ip(timestamp, ip_repr, |_ip_repr, payload| {
                     payload.copy_from_slice(raw_packet);
                     payload.copy_from_slice(raw_packet);
                 })
                 })
             }
             }
-            Response::Udp((ip_repr, udp_repr)) => {
+            Packet::Udp((ip_repr, udp_repr)) => {
                 self.dispatch_ip(timestamp, ip_repr, |ip_repr, payload| {
                 self.dispatch_ip(timestamp, ip_repr, |ip_repr, payload| {
                     udp_repr.emit(&mut UdpPacket::new(payload),
                     udp_repr.emit(&mut UdpPacket::new(payload),
                                   &ip_repr.src_addr(), &ip_repr.dst_addr());
                                   &ip_repr.src_addr(), &ip_repr.dst_addr());
                 })
                 })
             }
             }
-            Response::Tcp((ip_repr, tcp_repr)) => {
+            Packet::Tcp((ip_repr, tcp_repr)) => {
                 self.dispatch_ip(timestamp, ip_repr, |ip_repr, payload| {
                 self.dispatch_ip(timestamp, ip_repr, |ip_repr, payload| {
                     tcp_repr.emit(&mut TcpPacket::new(payload),
                     tcp_repr.emit(&mut TcpPacket::new(payload),
                                   &ip_repr.src_addr(), &ip_repr.dst_addr());
                                   &ip_repr.src_addr(), &ip_repr.dst_addr());
                 })
                 })
             }
             }
-            Response::Nop => Ok(())
+            Packet::None => Ok(())
         }
         }
     }
     }
 
 
@@ -488,13 +521,8 @@ impl<'a, 'b, 'c, DeviceT: Device + 'a> Interface<'a, 'b, 'c, DeviceT> {
             where F: FnOnce(IpRepr, &mut [u8]) {
             where F: FnOnce(IpRepr, &mut [u8]) {
         let ip_repr = ip_repr.lower(&self.protocol_addrs)?;
         let ip_repr = ip_repr.lower(&self.protocol_addrs)?;
 
 
-        // FIXME: use plain try! here once we don't have the horrible nothing_to_transmit hack.
         let dst_hardware_addr =
         let dst_hardware_addr =
-            self.lookup_hardware_addr(timestamp, &ip_repr.src_addr(), &ip_repr.dst_addr());
-        if let Err(Error::Unaddressable) = dst_hardware_addr {
-            return Ok(())
-        }
-        let dst_hardware_addr = dst_hardware_addr?;
+            self.lookup_hardware_addr(timestamp, &ip_repr.src_addr(), &ip_repr.dst_addr())?;
 
 
         self.dispatch_ethernet(timestamp, ip_repr.total_len(), |mut frame| {
         self.dispatch_ethernet(timestamp, ip_repr.total_len(), |mut frame| {
             frame.set_dst_addr(dst_hardware_addr);
             frame.set_dst_addr(dst_hardware_addr);

+ 4 - 4
src/macros.rs

@@ -11,21 +11,21 @@ macro_rules! net_log_enabled {
 }
 }
 
 
 macro_rules! net_trace {
 macro_rules! net_trace {
-    ($($arg:expr),*) => {
+    ($($arg:expr),*) => {{
         #[cfg(feature = "log")]
         #[cfg(feature = "log")]
         trace!($($arg),*);
         trace!($($arg),*);
         #[cfg(not(feature = "log"))]
         #[cfg(not(feature = "log"))]
         $( let _ = $arg );*; // suppress unused variable warnings
         $( let _ = $arg );*; // suppress unused variable warnings
-    }
+    }}
 }
 }
 
 
 macro_rules! net_debug {
 macro_rules! net_debug {
-    ($($arg:expr),*) => {
+    ($($arg:expr),*) => {{
         #[cfg(feature = "log")]
         #[cfg(feature = "log")]
         debug!($($arg),*);
         debug!($($arg),*);
         #[cfg(not(feature = "log"))]
         #[cfg(not(feature = "log"))]
         $( let _ = $arg );*; // suppress unused variable warnings
         $( let _ = $arg );*; // suppress unused variable warnings
-    }
+    }}
 }
 }
 
 
 macro_rules! enum_with_unknown {
 macro_rules! enum_with_unknown {

+ 3 - 0
src/phy/mod.rs

@@ -119,6 +119,9 @@ mod raw_socket;
 #[cfg(all(feature = "tap_interface", target_os = "linux"))]
 #[cfg(all(feature = "tap_interface", target_os = "linux"))]
 mod tap_interface;
 mod tap_interface;
 
 
+#[cfg(any(feature = "raw_socket", feature = "tap_interface"))]
+pub use self::sys::wait;
+
 pub use self::tracer::Tracer;
 pub use self::tracer::Tracer;
 pub use self::fault_injector::FaultInjector;
 pub use self::fault_injector::FaultInjector;
 pub use self::pcap_writer::{PcapLinkType, PcapMode, PcapSink, PcapWriter};
 pub use self::pcap_writer::{PcapLinkType, PcapMode, PcapSink, PcapWriter};

+ 7 - 0
src/phy/raw_socket.rs

@@ -2,6 +2,7 @@ use std::cell::RefCell;
 use std::vec::Vec;
 use std::vec::Vec;
 use std::rc::Rc;
 use std::rc::Rc;
 use std::io;
 use std::io;
+use std::os::unix::io::{RawFd, AsRawFd};
 
 
 use Result;
 use Result;
 use super::{sys, DeviceLimits, Device};
 use super::{sys, DeviceLimits, Device};
@@ -13,6 +14,12 @@ pub struct RawSocket {
     mtu:    usize
     mtu:    usize
 }
 }
 
 
+impl AsRawFd for RawSocket {
+    fn as_raw_fd(&self) -> RawFd {
+        self.lower.borrow().as_raw_fd()
+    }
+}
+
 impl RawSocket {
 impl RawSocket {
     /// Creates a raw socket, bound to the interface called `name`.
     /// Creates a raw socket, bound to the interface called `name`.
     ///
     ///

+ 30 - 1
src/phy/sys/mod.rs

@@ -1,5 +1,6 @@
 use libc;
 use libc;
-use std::io;
+use std::{mem, ptr, io};
+use std::os::unix::io::RawFd;
 
 
 #[cfg(target_os = "linux")]
 #[cfg(target_os = "linux")]
 #[path = "linux.rs"]
 #[path = "linux.rs"]
@@ -15,6 +16,34 @@ pub use self::raw_socket::RawSocketDesc;
 #[cfg(all(feature = "tap_interface", target_os = "linux"))]
 #[cfg(all(feature = "tap_interface", target_os = "linux"))]
 pub use self::tap_interface::TapInterfaceDesc;
 pub use self::tap_interface::TapInterfaceDesc;
 
 
+/// Wait until given file descriptor becomes readable, but no longer than given timeout.
+pub fn wait(fd: RawFd, millis: Option<u64>) -> io::Result<()> {
+    unsafe {
+        let mut readfds = mem::uninitialized::<libc::fd_set>();
+        libc::FD_ZERO(&mut readfds);
+        libc::FD_SET(fd, &mut readfds);
+
+        let mut writefds = mem::uninitialized::<libc::fd_set>();
+        libc::FD_ZERO(&mut writefds);
+
+        let mut exceptfds = mem::uninitialized::<libc::fd_set>();
+        libc::FD_ZERO(&mut exceptfds);
+
+        let mut timeout = libc::timeval { tv_sec: 0, tv_usec: 0 };
+        let timeout_ptr =
+            if let Some(millis) = millis {
+                timeout.tv_usec = (millis * 1_000) as libc::suseconds_t;
+                &mut timeout as *mut _
+            } else {
+                ptr::null_mut()
+            };
+
+        let res = libc::select(fd + 1, &mut readfds, &mut writefds, &mut exceptfds, timeout_ptr);
+        if res == -1 { return Err(io::Error::last_os_error()) }
+        Ok(())
+    }
+}
+
 #[repr(C)]
 #[repr(C)]
 #[derive(Debug)]
 #[derive(Debug)]
 struct ifreq {
 struct ifreq {

+ 9 - 2
src/phy/sys/raw_socket.rs

@@ -1,5 +1,6 @@
-use libc;
 use std::{mem, io};
 use std::{mem, io};
+use std::os::unix::io::{RawFd, AsRawFd};
+use libc;
 use super::*;
 use super::*;
 
 
 #[derive(Debug)]
 #[derive(Debug)]
@@ -8,10 +9,16 @@ pub struct RawSocketDesc {
     ifreq: ifreq
     ifreq: ifreq
 }
 }
 
 
+impl AsRawFd for RawSocketDesc {
+    fn as_raw_fd(&self) -> RawFd {
+        self.lower
+    }
+}
+
 impl RawSocketDesc {
 impl RawSocketDesc {
     pub fn new(name: &str) -> io::Result<RawSocketDesc> {
     pub fn new(name: &str) -> io::Result<RawSocketDesc> {
         let lower = unsafe {
         let lower = unsafe {
-            let lower = libc::socket(libc::AF_PACKET, libc::SOCK_RAW,
+            let lower = libc::socket(libc::AF_PACKET, libc::SOCK_RAW | libc::SOCK_NONBLOCK,
                                      imp::ETH_P_ALL.to_be() as i32);
                                      imp::ETH_P_ALL.to_be() as i32);
             if lower == -1 { return Err(io::Error::last_os_error()) }
             if lower == -1 { return Err(io::Error::last_os_error()) }
             lower
             lower

+ 8 - 26
src/phy/sys/tap_interface.rs

@@ -1,20 +1,25 @@
-use std::mem;
 use std::io;
 use std::io;
+use std::os::unix::io::{RawFd, AsRawFd};
 use libc;
 use libc;
 use super::*;
 use super::*;
 
 
-#[cfg(target_os = "linux")]
 #[derive(Debug)]
 #[derive(Debug)]
 pub struct TapInterfaceDesc {
 pub struct TapInterfaceDesc {
     lower: libc::c_int,
     lower: libc::c_int,
     ifreq: ifreq
     ifreq: ifreq
 }
 }
 
 
+impl AsRawFd for TapInterfaceDesc {
+    fn as_raw_fd(&self) -> RawFd {
+        self.lower
+    }
+}
+
 impl TapInterfaceDesc {
 impl TapInterfaceDesc {
     pub fn new(name: &str) -> io::Result<TapInterfaceDesc> {
     pub fn new(name: &str) -> io::Result<TapInterfaceDesc> {
         let lower = unsafe {
         let lower = unsafe {
             let lower = libc::open("/dev/net/tun\0".as_ptr() as *const libc::c_char,
             let lower = libc::open("/dev/net/tun\0".as_ptr() as *const libc::c_char,
-                                   libc::O_RDWR);
+                                   libc::O_RDWR | libc::O_NONBLOCK);
             if lower == -1 { return Err(io::Error::last_os_error()) }
             if lower == -1 { return Err(io::Error::last_os_error()) }
             lower
             lower
         };
         };
@@ -44,28 +49,7 @@ impl TapInterfaceDesc {
         mtu
         mtu
     }
     }
 
 
-    fn wait(&mut self, ms: u32) -> io::Result<bool> {
-        unsafe {
-            let mut readfds = mem::uninitialized::<libc::fd_set>();
-            libc::FD_ZERO(&mut readfds);
-            libc::FD_SET(self.lower, &mut readfds);
-            let mut writefds = mem::uninitialized::<libc::fd_set>();
-            libc::FD_ZERO(&mut writefds);
-            let mut exceptfds = mem::uninitialized::<libc::fd_set>();
-            libc::FD_ZERO(&mut exceptfds);
-            let mut timeout = libc::timeval { tv_sec: 0, tv_usec: (ms * 1_000) as libc::suseconds_t };
-            let res = libc::select(self.lower + 1, &mut readfds, &mut writefds, &mut exceptfds,
-                                   &mut timeout);
-            if res == -1 { return Err(io::Error::last_os_error()) }
-            Ok(res == 0)
-        }
-    }
-
     pub fn recv(&mut self, buffer: &mut [u8]) -> io::Result<usize> {
     pub fn recv(&mut self, buffer: &mut [u8]) -> io::Result<usize> {
-        // FIXME: here we don't wait forever, in case we need to send several packets in a row
-        // ideally this would be implemented by going full nonblocking
-        if self.wait(100)? { return Err(io::ErrorKind::TimedOut)? }
-
         unsafe {
         unsafe {
             let len = libc::read(self.lower, buffer.as_mut_ptr() as *mut libc::c_void,
             let len = libc::read(self.lower, buffer.as_mut_ptr() as *mut libc::c_void,
                                  buffer.len());
                                  buffer.len());
@@ -75,8 +59,6 @@ impl TapInterfaceDesc {
     }
     }
 
 
     pub fn send(&mut self, buffer: &[u8]) -> io::Result<usize> {
     pub fn send(&mut self, buffer: &[u8]) -> io::Result<usize> {
-        self.wait(100)?;
-
         unsafe {
         unsafe {
             let len = libc::write(self.lower, buffer.as_ptr() as *const libc::c_void,
             let len = libc::write(self.lower, buffer.as_ptr() as *const libc::c_void,
                                   buffer.len());
                                   buffer.len());

+ 9 - 2
src/phy/tap_interface.rs

@@ -2,6 +2,7 @@ use std::cell::RefCell;
 use std::vec::Vec;
 use std::vec::Vec;
 use std::rc::Rc;
 use std::rc::Rc;
 use std::io;
 use std::io;
+use std::os::unix::io::{RawFd, AsRawFd};
 
 
 use {Error, Result};
 use {Error, Result};
 use super::{sys, DeviceLimits, Device};
 use super::{sys, DeviceLimits, Device};
@@ -13,6 +14,12 @@ pub struct TapInterface {
     mtu:    usize
     mtu:    usize
 }
 }
 
 
+impl AsRawFd for TapInterface {
+    fn as_raw_fd(&self) -> RawFd {
+        self.lower.borrow().as_raw_fd()
+    }
+}
+
 impl TapInterface {
 impl TapInterface {
     /// Attaches to a TAP interface called `name`, or creates it if it does not exist.
     /// Attaches to a TAP interface called `name`, or creates it if it does not exist.
     ///
     ///
@@ -49,10 +56,10 @@ impl Device for TapInterface {
                 buffer.resize(size, 0);
                 buffer.resize(size, 0);
                 Ok(buffer)
                 Ok(buffer)
             }
             }
-            Err(ref err) if err.kind() == io::ErrorKind::TimedOut => {
+            Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => {
                 Err(Error::Exhausted)
                 Err(Error::Exhausted)
             }
             }
-            Err(err) => panic!(err)
+            Err(err) => panic!("{}", err)
         }
         }
     }
     }
 
 

+ 4 - 0
src/socket/mod.rs

@@ -78,6 +78,10 @@ impl<'a, 'b> Socket<'a, 'b> {
     pub fn set_debug_id(&mut self, id: usize) {
     pub fn set_debug_id(&mut self, id: usize) {
         dispatch_socket!(self, |socket [mut]| socket.set_debug_id(id))
         dispatch_socket!(self, |socket [mut]| socket.set_debug_id(id))
     }
     }
+
+    pub(crate) fn poll_at(&self) -> Option<u64> {
+        dispatch_socket!(self, |socket []| socket.poll_at())
+    }
 }
 }
 
 
 /// A conversion trait for network sockets.
 /// A conversion trait for network sockets.

+ 30 - 21
src/socket/raw.rs

@@ -199,24 +199,33 @@ impl<'a, 'b> RawSocket<'a, 'b> {
             }
             }
         }
         }
 
 
-        let mut packet_buf = self.tx_buffer.dequeue()?;
-        match prepare(self.ip_protocol, packet_buf.as_mut()) {
-            Ok((ip_repr, raw_packet)) => {
-                net_trace!("[{}]:{}:{}: sending {} octets",
-                           self.debug_id, self.ip_version, self.ip_protocol,
-                           ip_repr.buffer_len() + raw_packet.len());
-                emit((ip_repr, raw_packet))
-            }
-            Err(error) => {
-                net_debug!("[{}]:{}:{}: dropping outgoing packet ({})",
-                           self.debug_id, self.ip_version, self.ip_protocol,
-                           error);
-                // This case is a bit special because in every other socket, no matter what data
-                // is put into the socket, it can be sent, but it's possible to put data into
-                // a raw socket that may not be, and we're generic over the result type, so
-                // we can't possibly return Ok(()) here.
-                Err(Error::Rejected)
+        let debug_id    = self.debug_id;
+        let ip_protocol = self.ip_protocol;
+        let ip_version  = self.ip_version;
+        self.tx_buffer.try_dequeue(|packet_buf| {
+            match prepare(ip_protocol, packet_buf.as_mut()) {
+                Ok((ip_repr, raw_packet)) => {
+                    net_trace!("[{}]:{}:{}: sending {} octets",
+                               debug_id, ip_version, ip_protocol,
+                               ip_repr.buffer_len() + raw_packet.len());
+                    emit((ip_repr, raw_packet))
+                }
+                Err(error) => {
+                    net_debug!("[{}]:{}:{}: dropping outgoing packet ({})",
+                               debug_id, ip_version, ip_protocol,
+                               error);
+                    // Return Ok(()) so the packet is dequeued.
+                    Ok(())
+                }
             }
             }
+        })
+    }
+
+    pub(crate) fn poll_at(&self) -> Option<u64> {
+        if self.tx_buffer.empty() {
+            None
+        } else {
+            Some(0)
         }
         }
     }
     }
 }
 }
@@ -285,13 +294,13 @@ mod test {
             assert_eq!(ip_payload, &PACKET_PAYLOAD);
             assert_eq!(ip_payload, &PACKET_PAYLOAD);
             Err(Error::Unaddressable)
             Err(Error::Unaddressable)
         }), Err(Error::Unaddressable));
         }), Err(Error::Unaddressable));
-        /*assert!(!socket.can_send());*/
+        assert!(!socket.can_send());
 
 
         assert_eq!(socket.dispatch(|(ip_repr, ip_payload)| {
         assert_eq!(socket.dispatch(|(ip_repr, ip_payload)| {
             assert_eq!(ip_repr, HEADER_REPR);
             assert_eq!(ip_repr, HEADER_REPR);
             assert_eq!(ip_payload, &PACKET_PAYLOAD);
             assert_eq!(ip_payload, &PACKET_PAYLOAD);
             Ok(())
             Ok(())
-        }), /*Ok(())*/ Err(Error::Exhausted));
+        }), Ok(()));
         assert!(socket.can_send());
         assert!(socket.can_send());
     }
     }
 
 
@@ -304,14 +313,14 @@ mod test {
 
 
         assert_eq!(socket.send_slice(&wrong_version[..]), Ok(()));
         assert_eq!(socket.send_slice(&wrong_version[..]), Ok(()));
         assert_eq!(socket.dispatch(|_| unreachable!()),
         assert_eq!(socket.dispatch(|_| unreachable!()),
-                   Err(Error::Rejected));
+                   Ok(()));
 
 
         let mut wrong_protocol = PACKET_BYTES.clone();
         let mut wrong_protocol = PACKET_BYTES.clone();
         Ipv4Packet::new(&mut wrong_protocol).set_protocol(IpProtocol::Tcp);
         Ipv4Packet::new(&mut wrong_protocol).set_protocol(IpProtocol::Tcp);
 
 
         assert_eq!(socket.send_slice(&wrong_protocol[..]), Ok(()));
         assert_eq!(socket.send_slice(&wrong_protocol[..]), Ok(()));
         assert_eq!(socket.dispatch(|_| unreachable!()),
         assert_eq!(socket.dispatch(|_| unreachable!()),
-                   Err(Error::Rejected));
+                   Ok(()));
     }
     }
 
 
     #[test]
     #[test]

+ 20 - 2
src/socket/tcp.rs

@@ -200,6 +200,14 @@ impl Timer {
         }
         }
     }
     }
 
 
+    fn poll_at(&self) -> Option<u64> {
+        match *self {
+            Timer::Idle => None,
+            Timer::Retransmit { expires_at, .. } => Some(expires_at),
+            Timer::Close { expires_at, .. } => Some(expires_at),
+        }
+    }
+
     fn reset(&mut self) {
     fn reset(&mut self) {
         *self = Timer::Idle
         *self = Timer::Idle
     }
     }
@@ -1256,6 +1264,16 @@ impl<'a> TcpSocket<'a> {
 
 
         Ok(())
         Ok(())
     }
     }
+
+    pub(crate) fn poll_at(&self) -> Option<u64> {
+        self.timer.poll_at().or_else(|| {
+            if self.tx_buffer.empty() {
+                None
+            } else {
+                Some(0)
+            }
+        })
+    }
 }
 }
 
 
 impl<'a> fmt::Write for TcpSocket<'a> {
 impl<'a> fmt::Write for TcpSocket<'a> {
@@ -2835,14 +2853,14 @@ mod test {
 
 
         limits.max_burst_size = None;
         limits.max_burst_size = None;
         s.send_slice(b"abcdef").unwrap();
         s.send_slice(b"abcdef").unwrap();
-        s.dispatch(0, &limits, |(ip_repr, tcp_repr)| {
+        s.dispatch(0, &limits, |(_ip_repr, tcp_repr)| {
             assert_eq!(tcp_repr.window_len, 32767);
             assert_eq!(tcp_repr.window_len, 32767);
             Ok(())
             Ok(())
         }).unwrap();
         }).unwrap();
 
 
         limits.max_burst_size = Some(4);
         limits.max_burst_size = Some(4);
         s.send_slice(b"abcdef").unwrap();
         s.send_slice(b"abcdef").unwrap();
-        s.dispatch(0, &limits, |(ip_repr, tcp_repr)| {
+        s.dispatch(0, &limits, |(_ip_repr, tcp_repr)| {
             assert_eq!(tcp_repr.window_len, 5920);
             assert_eq!(tcp_repr.window_len, 5920);
             Ok(())
             Ok(())
         }).unwrap();
         }).unwrap();

+ 29 - 18
src/socket/udp.rs

@@ -196,23 +196,34 @@ impl<'a, 'b> UdpSocket<'a, 'b> {
 
 
     pub(crate) fn dispatch<F>(&mut self, emit: F) -> Result<()>
     pub(crate) fn dispatch<F>(&mut self, emit: F) -> Result<()>
             where F: FnOnce((IpRepr, UdpRepr)) -> Result<()> {
             where F: FnOnce((IpRepr, UdpRepr)) -> Result<()> {
-        let packet_buf = self.tx_buffer.dequeue()?;
-        net_trace!("[{}]{}:{}: sending {} octets",
-                   self.debug_id, self.endpoint,
-                   packet_buf.endpoint, packet_buf.size);
+        let debug_id = self.debug_id;
+        let endpoint = self.endpoint;
+        self.tx_buffer.try_dequeue(|packet_buf| {
+            net_trace!("[{}]{}:{}: sending {} octets",
+                       debug_id, endpoint,
+                       packet_buf.endpoint, packet_buf.size);
+
+            let repr = UdpRepr {
+                src_port: endpoint.port,
+                dst_port: packet_buf.endpoint.port,
+                payload:  &packet_buf.as_ref()[..]
+            };
+            let ip_repr = IpRepr::Unspecified {
+                src_addr:    endpoint.addr,
+                dst_addr:    packet_buf.endpoint.addr,
+                protocol:    IpProtocol::Udp,
+                payload_len: repr.buffer_len()
+            };
+            emit((ip_repr, repr))
+        })
+    }
 
 
-        let repr = UdpRepr {
-            src_port: self.endpoint.port,
-            dst_port: packet_buf.endpoint.port,
-            payload:  &packet_buf.as_ref()[..]
-        };
-        let ip_repr = IpRepr::Unspecified {
-            src_addr:    self.endpoint.addr,
-            dst_addr:    packet_buf.endpoint.addr,
-            protocol:    IpProtocol::Udp,
-            payload_len: repr.buffer_len()
-        };
-        emit((ip_repr, repr))
+    pub(crate) fn poll_at(&self) -> Option<u64> {
+        if self.tx_buffer.empty() {
+            None
+        } else {
+            Some(0)
+        }
     }
     }
 }
 }
 
 
@@ -310,13 +321,13 @@ mod test {
             assert_eq!(udp_repr, LOCAL_UDP_REPR);
             assert_eq!(udp_repr, LOCAL_UDP_REPR);
             Err(Error::Unaddressable)
             Err(Error::Unaddressable)
         }), Err(Error::Unaddressable));
         }), Err(Error::Unaddressable));
-        /*assert!(!socket.can_send());*/
+        assert!(!socket.can_send());
 
 
         assert_eq!(socket.dispatch(|(ip_repr, udp_repr)| {
         assert_eq!(socket.dispatch(|(ip_repr, udp_repr)| {
             assert_eq!(ip_repr, LOCAL_IP_REPR);
             assert_eq!(ip_repr, LOCAL_IP_REPR);
             assert_eq!(udp_repr, LOCAL_UDP_REPR);
             assert_eq!(udp_repr, LOCAL_UDP_REPR);
             Ok(())
             Ok(())
-        }), /*Ok(())*/ Err(Error::Exhausted));
+        }), Ok(()));
         assert!(socket.can_send());
         assert!(socket.can_send());
     }
     }
 
 

+ 2 - 2
src/storage/ring_buffer.rs

@@ -61,7 +61,7 @@ impl<'a, T: 'a> RingBuffer<'a, T> {
     /// Call `f` with a buffer element, and enqueue the element if `f` returns successfully, or
     /// Call `f` with a buffer element, and enqueue the element if `f` returns successfully, or
     /// return `Err(Error::Exhausted)` if the buffer is full.
     /// return `Err(Error::Exhausted)` if the buffer is full.
     pub fn try_enqueue<'b, R, F>(&'b mut self, f: F) -> Result<R>
     pub fn try_enqueue<'b, R, F>(&'b mut self, f: F) -> Result<R>
-            where F: Fn(&'b mut T) -> Result<R> {
+            where F: FnOnce(&'b mut T) -> Result<R> {
         if self.full() { return Err(Error::Exhausted) }
         if self.full() { return Err(Error::Exhausted) }
 
 
         let index = self.mask(self.read_at + self.length);
         let index = self.mask(self.read_at + self.length);
@@ -88,7 +88,7 @@ impl<'a, T: 'a> RingBuffer<'a, T> {
     /// Call `f` with a buffer element, and dequeue the element if `f` returns successfully, or
     /// Call `f` with a buffer element, and dequeue the element if `f` returns successfully, or
     /// return `Err(Error::Exhausted)` if the buffer is empty.
     /// return `Err(Error::Exhausted)` if the buffer is empty.
     pub fn try_dequeue<'b, R, F>(&'b mut self, f: F) -> Result<R>
     pub fn try_dequeue<'b, R, F>(&'b mut self, f: F) -> Result<R>
-            where F: Fn(&'b mut T) -> Result<R> {
+            where F: FnOnce(&'b mut T) -> Result<R> {
         if self.empty() { return Err(Error::Exhausted) }
         if self.empty() { return Err(Error::Exhausted) }
 
 
         let next_at = self.incr(self.read_at);
         let next_at = self.incr(self.read_at);