浏览代码

Split `poll_at`/`poll_delay` out of `poll`.

The previous model was flawed. Consider the following case:
  * The main loop looks as follows (pseudocode):
      loop {
        let _ = (tcp:1234).read_all()
        wait(iface.poll())
      }
  * The remote end is continuously transmitting data and at some
    point fills the window of (tcp:1234), stopping the transmission
    afterwards.
  * The local end processes the packets and, as a part of egress
    routine, emits an ACK. That also updates the window, and
    the socket's poll_at() routine returns None, since there is
    nothing to transmit or retransmit.
  * The local end now waits indefinitely even though it can start
    processing the data in the socket buffers right now.
whitequark 7 年之前
父节点
当前提交
3868dcdb14
共有 9 个文件被更改,包括 93 次插入50 次删除
  1. 1 1
      Cargo.toml
  2. 4 3
      examples/client.rs
  3. 4 3
      examples/httpclient.rs
  4. 6 6
      examples/loopback.rs
  5. 4 1
      examples/ping.rs
  6. 4 3
      examples/server.rs
  7. 8 4
      examples/stress.rs
  8. 3 1
      examples/utils.rs
  9. 59 28
      src/iface/ethernet.rs

+ 1 - 1
Cargo.toml

@@ -67,7 +67,7 @@ required-features = ["std", "phy-tap_interface", "socket-tcp", "socket-udp"]
 
 [[example]]
 name = "loopback"
-required-features = ["alloc", "proto-ipv4"]
+required-features = ["log", "socket-tcp"]
 
 # This is really a test, but it requires root privileges for setup (the tap interface)
 # so it is built as an example.

+ 4 - 3
examples/client.rs

@@ -59,6 +59,9 @@ fn main() {
 
     let mut tcp_active = false;
     loop {
+        let timestamp = utils::millis_since(startup_time);
+        iface.poll(&mut sockets, timestamp).expect("poll error");
+
         {
             let mut socket = sockets.get::<TcpSocket>(tcp_handle);
             if socket.is_active() && !tcp_active {
@@ -92,8 +95,6 @@ fn main() {
             }
         }
 
-        let timestamp = utils::millis_since(startup_time);
-        let poll_at = iface.poll(&mut sockets, timestamp).expect("poll error");
-        phy_wait(fd, poll_at.map(|at| at.saturating_sub(timestamp))).expect("wait error");
+        phy_wait(fd, iface.poll_delay(&sockets, timestamp)).expect("wait error");
     }
 }

+ 4 - 3
examples/httpclient.rs

@@ -59,6 +59,9 @@ fn main() {
     let mut state = State::Connect;
 
     loop {
+        let timestamp = utils::millis_since(startup_time);
+        iface.poll(&mut sockets, timestamp).expect("poll error");
+
         {
             let mut socket = sockets.get::<TcpSocket>(tcp_handle);
 
@@ -94,8 +97,6 @@ fn main() {
             }
         }
 
-        let timestamp = utils::millis_since(startup_time);
-        let poll_at = iface.poll(&mut sockets, timestamp).expect("poll error");
-        phy_wait(fd, poll_at.map(|at| at.saturating_sub(timestamp))).expect("wait error");
+        phy_wait(fd, iface.poll_delay(&sockets, timestamp)).expect("wait error");
     }
 }

+ 6 - 6
examples/loopback.rs

@@ -124,6 +124,8 @@ fn main() {
     let mut did_connect = false;
     let mut done = false;
     while !done && clock.elapsed() < 10_000 {
+        iface.poll(&mut socket_set, clock.elapsed()).expect("poll error");
+
         {
             let mut socket = socket_set.get::<TcpSocket>(server_handle);
             if !socket.is_active() && !socket.is_listening() {
@@ -161,16 +163,14 @@ fn main() {
             }
         }
 
-        match iface.poll(&mut socket_set, clock.elapsed()) {
-            Ok(Some(poll_at)) if poll_at < clock.elapsed() =>
+        match iface.poll_delay(&socket_set, clock.elapsed()) {
+            Some(0) =>
                 debug!("resuming"),
-            Ok(Some(poll_at)) => {
-                let delay = poll_at - clock.elapsed();
+            Some(delay) => {
                 debug!("sleeping for {} ms", delay);
                 clock.advance(delay)
             }
-            Ok(None) => clock.advance(1),
-            Err(e) => debug!("poll error: {}", e)
+            None => clock.advance(1)
         }
     }
 

+ 4 - 1
examples/ping.rs

@@ -77,6 +77,9 @@ fn main() {
     let endpoint = IpAddress::Ipv4(remote_addr);
 
     loop {
+        let timestamp = utils::millis_since(startup_time);
+        iface.poll(&mut sockets, timestamp).expect("poll error");
+
         {
             let mut socket = sockets.get::<IcmpSocket>(icmp_handle);
             if !socket.is_open() {
@@ -141,7 +144,7 @@ fn main() {
 
         let timestamp = utils::millis_since(startup_time);
 
-        let poll_at = iface.poll(&mut sockets, timestamp).expect("poll error");
+        let poll_at = iface.poll_at(&sockets, timestamp);
         let resume_at = [poll_at, Some(send_at)].iter().flat_map(|x| *x).min();
         phy_wait(fd, resume_at.map(|at| at.saturating_sub(timestamp))).expect("wait error");
     }

+ 4 - 3
examples/server.rs

@@ -71,6 +71,9 @@ fn main() {
 
     let mut tcp_6970_active = false;
     loop {
+        let timestamp = utils::millis_since(startup_time);
+        iface.poll(&mut sockets, timestamp).expect("poll error");
+
         // udp:6969: respond "hello"
         {
             let mut socket = sockets.get::<UdpSocket>(udp_handle);
@@ -187,8 +190,6 @@ fn main() {
             }
         }
 
-        let timestamp = utils::millis_since(startup_time);
-        let poll_at = iface.poll(&mut sockets, timestamp).expect("poll error");
-        phy_wait(fd, poll_at.map(|at| at.saturating_sub(timestamp))).expect("wait error");
+        phy_wait(fd, iface.poll_delay(&sockets, timestamp)).expect("wait error");
     }
 }

+ 8 - 4
examples/stress.rs

@@ -41,7 +41,7 @@ fn client(kind: Client) {
         match result {
             Ok(0) => break,
             Ok(result) => {
-                print!("(P:{})", result);
+                // print!("(P:{})", result);
                 processed += result
             }
             Err(err) => panic!("cannot process: {}", err)
@@ -54,6 +54,9 @@ fn client(kind: Client) {
 static CLIENT_DONE: AtomicBool = ATOMIC_BOOL_INIT;
 
 fn main() {
+    #[cfg(feature = "log")]
+    utils::setup_logging("info");
+
     let (mut opts, mut free) = utils::create_options();
     utils::add_tap_options(&mut opts, &mut free);
     utils::add_middleware_options(&mut opts, &mut free);
@@ -97,6 +100,9 @@ fn main() {
 
     let mut processed = 0;
     while !CLIENT_DONE.load(Ordering::SeqCst) {
+        let timestamp = utils::millis_since(startup_time);
+        iface.poll(&mut sockets, timestamp).expect("poll error");
+
         // tcp:1234: emit data
         {
             let mut socket = sockets.get::<TcpSocket>(tcp1_handle);
@@ -133,8 +139,6 @@ fn main() {
             }
         }
 
-        let timestamp = utils::millis_since(startup_time);
-        let poll_at = iface.poll(&mut sockets, timestamp).expect("poll error");
-        phy_wait(fd, poll_at.map(|at| at.saturating_sub(timestamp))).expect("wait error");
+        phy_wait(fd, iface.poll_delay(&sockets, timestamp)).expect("wait error");
     }
 }

+ 3 - 1
examples/utils.rs

@@ -14,7 +14,9 @@ use log::{LogLevel, LogLevelFilter, LogRecord};
 use env_logger::LogBuilder;
 use getopts::{Options, Matches};
 
-use smoltcp::phy::{Device, EthernetTracer, FaultInjector, TapInterface};
+use smoltcp::phy::{Device, EthernetTracer, FaultInjector};
+#[cfg(feature = "phy-tap_interface")]
+use smoltcp::phy::TapInterface;
 use smoltcp::phy::{PcapWriter, PcapSink, PcapMode, PcapLinkType};
 
 #[cfg(feature = "log")]

+ 59 - 28
src/iface/ethernet.rs

@@ -261,10 +261,9 @@ impl<'b, 'c, DeviceT> Interface<'b, 'c, DeviceT>
     /// The timestamp must be a number of milliseconds, monotonically increasing
     /// since an arbitrary moment in time, such as system startup.
     ///
-    /// This function returns a _soft deadline_ for calling it the next time.
-    /// That is, if `iface.poll(&mut sockets, 1000)` returns `Ok(Some(2000))`,
-    /// it harmless (but wastes energy) to call it 500 ms later, and potentially
-    /// harmful (impacting quality of service) to call it 1500 ms later.
+    /// This function returns a boolean value indicating whether any packets were
+    /// processed or emitted, and thus, whether the readiness of any socket might
+    /// have changed.
     ///
     /// # Errors
     /// This method will routinely return errors in response to normal network
@@ -276,18 +275,50 @@ impl<'b, 'c, DeviceT> Interface<'b, 'c, DeviceT>
     /// packets containing any unsupported protocol, option, or form, which is
     /// a very common occurrence and on a production system it should not even
     /// be logged.
-    pub fn poll(&mut self, sockets: &mut SocketSet, timestamp: u64) -> Result<Option<u64>> {
-        self.socket_egress(sockets, timestamp)?;
-
-        if self.socket_ingress(sockets, timestamp)? {
-            Ok(Some(0))
-        } else {
-            Ok(sockets.iter().filter_map(|socket| {
-                let socket_poll_at = socket.poll_at();
-                socket.meta().poll_at(socket_poll_at, |ip_addr|
-                    self.inner.has_neighbor(&ip_addr, timestamp))
-            }).min())
+    pub fn poll(&mut self, sockets: &mut SocketSet, timestamp: u64) -> Result<bool> {
+        let mut readiness_may_have_changed = false;
+        loop {
+            let processed_any = self.socket_ingress(sockets, timestamp)?;
+            let emitted_any   = self.socket_egress(sockets, timestamp)?;
+            if processed_any || emitted_any {
+                readiness_may_have_changed = true;
+            } else {
+                break
+            }
         }
+        Ok(readiness_may_have_changed)
+    }
+
+    /// Return a _soft deadline_ for calling [poll] the next time.
+    /// That is, if `iface.poll_at(&sockets, 1000)` returns `Ok(Some(2000))`,
+    /// you should call call [poll] in 1000 ms; it is harmless (but wastes energy)
+    /// to call it 500 ms later, and potentially harmful (impacting quality of service)
+    /// to call it 1500 ms later.
+    ///
+    /// The timestamp argument is the same as for [poll].
+    ///
+    /// [poll]: #method.poll
+    pub fn poll_at(&self, sockets: &SocketSet, timestamp: u64) -> Option<u64> {
+        sockets.iter().filter_map(|socket| {
+            let socket_poll_at = socket.poll_at();
+            socket.meta().poll_at(socket_poll_at, |ip_addr|
+                self.inner.has_neighbor(&ip_addr, timestamp))
+        }).min()
+    }
+
+    /// Return an _advisory wait time_ for calling [poll] the next time.
+    /// That is, if `iface.poll_at(&sockets, 1000)` returns `Ok(Some(1000))`,
+    /// you should call call [poll] in 1000 ms; it is harmless (but wastes energy)
+    /// to call it 500 ms later, and potentially harmful (impacting quality of service)
+    /// to call it 1500 ms later.
+    ///
+    /// This is a shortcut for `poll_at(..., timestamp).map(|at| at.saturating_sub(timestamp))`.
+    ///
+    /// The timestamp argument is the same as for [poll].
+    ///
+    /// [poll]: #method.poll
+    pub fn poll_delay(&self, sockets: &SocketSet, timestamp: u64) -> Option<u64> {
+        self.poll_at(sockets, timestamp).map(|at| at.saturating_sub(timestamp))
     }
 
     fn socket_ingress(&mut self, sockets: &mut SocketSet, timestamp: u64) -> Result<bool> {
@@ -298,29 +329,29 @@ impl<'b, 'c, DeviceT> Interface<'b, 'c, DeviceT>
                 None => break,
                 Some(tokens) => tokens,
             };
-            let dispatch_result = rx_token.consume(timestamp, |frame| {
-                let response = inner.process_ethernet(sockets, timestamp, &frame).map_err(|err| {
+            rx_token.consume(timestamp, |frame| {
+                inner.process_ethernet(sockets, timestamp, &frame).map_err(|err| {
                     net_debug!("cannot process ingress packet: {}", err);
                     net_debug!("packet dump follows:\n{}",
                                PrettyPrinter::<EthernetFrame<&[u8]>>::new("", &frame));
                     err
-                })?;
-                processed_any = true;
-
-                inner.dispatch(tx_token, timestamp, response)
-            });
-            dispatch_result.map_err(|err| {
-                net_debug!("cannot dispatch response packet: {}", err);
-                err
+                }).and_then(|response| {
+                    processed_any = true;
+                    inner.dispatch(tx_token, timestamp, response).map_err(|err| {
+                        net_debug!("cannot dispatch response packet: {}", err);
+                        err
+                    })
+                })
             })?;
         }
         Ok(processed_any)
     }
 
-    fn socket_egress(&mut self, sockets: &mut SocketSet, timestamp: u64) -> Result<()> {
+    fn socket_egress(&mut self, sockets: &mut SocketSet, timestamp: u64) -> Result<bool> {
         let mut caps = self.device.capabilities();
         caps.max_transmission_unit -= EthernetFrame::<&[u8]>::header_len();
 
+        let mut emitted_any = false;
         for mut socket in sockets.iter_mut() {
             if !socket.meta_mut().egress_permitted(|ip_addr|
                     self.inner.has_neighbor(&ip_addr, timestamp)) {
@@ -392,10 +423,10 @@ impl<'b, 'c, DeviceT> Interface<'b, 'c, DeviceT>
                                socket.meta().handle, err);
                     return Err(err)
                 }
-                (Ok(()), Ok(())) => ()
+                (Ok(()), Ok(())) => emitted_any = true
             }
         }
-        Ok(())
+        Ok(emitted_any)
     }
 }