瀏覽代碼

Merge pull request #935 from lrh2000/window-update

Don't delay ACKs for significant window updates
Catherine 8 月之前
父節點
當前提交
0847643198
共有 4 個文件被更改,包括 225 次插入5 次删除
  1. 4 0
      Cargo.toml
  2. 24 0
      README.md
  3. 101 0
      examples/loopback_benchmark.rs
  4. 96 5
      src/socket/tcp.rs

+ 4 - 0
Cargo.toml

@@ -292,6 +292,10 @@ required-features = ["std", "medium-ethernet", "medium-ip", "phy-tuntap_interfac
 name = "loopback"
 required-features = ["log", "medium-ethernet", "proto-ipv4", "socket-tcp"]
 
+[[example]]
+name = "loopback_benchmark"
+required-features = ["std", "log", "medium-ethernet", "proto-ipv4", "socket-tcp"]
+
 [[example]]
 name = "multicast"
 required-features = ["std", "medium-ethernet", "medium-ip", "phy-tuntap_interface", "proto-ipv4", "proto-igmp", "socket-udp"]

+ 24 - 0
README.md

@@ -553,6 +553,30 @@ is possible; otherwise, nothing at all will be displayed and no options are acce
 
 [wireshark]: https://wireshark.org
 
+### examples/loopback\_benchmark.rs
+
+_examples/loopback_benchmark.rs_ is another simple throughput benchmark.
+
+Read its [source code](/examples/loopback_benchmark.rs), then run it as:
+
+```sh
+cargo run --release --example loopback_benchmark
+```
+
+It establishes a connection to itself via a loopback interface and transfers a large amount
+of data in one direction.
+
+A typical result (achieved on a Intel Core i5-13500H CPU and a Linux 6.9.9 x86_64 kernel running
+on a LENOVO XiaoXinPro 14 IRH8 laptop) is as follows:
+
+```
+$ cargo run --release --example loopback_benchmark
+done in 0.558 s, bandwidth is 15.395083 Gbps
+```
+
+Note: Although the loopback interface can be used in bare-metal environments,
+this benchmark _does_ rely on `std` to be able to measure the time cost.
+
 ## License
 
 _smoltcp_ is distributed under the terms of 0-clause BSD license.

+ 101 - 0
examples/loopback_benchmark.rs

@@ -0,0 +1,101 @@
+mod utils;
+
+use log::debug;
+
+use smoltcp::iface::{Config, Interface, SocketSet};
+use smoltcp::phy::{Device, Loopback, Medium};
+use smoltcp::socket::tcp;
+use smoltcp::time::Instant;
+use smoltcp::wire::{EthernetAddress, IpAddress, IpCidr};
+
+fn main() {
+    let device = Loopback::new(Medium::Ethernet);
+
+    let mut device = {
+        utils::setup_logging("info");
+
+        let (mut opts, mut free) = utils::create_options();
+        utils::add_middleware_options(&mut opts, &mut free);
+
+        let mut matches = utils::parse_options(&opts, free);
+        utils::parse_middleware_options(&mut matches, device, /*loopback=*/ true)
+    };
+
+    // Create interface
+    let config = match device.capabilities().medium {
+        Medium::Ethernet => {
+            Config::new(EthernetAddress([0x02, 0x00, 0x00, 0x00, 0x00, 0x01]).into())
+        }
+        Medium::Ip => Config::new(smoltcp::wire::HardwareAddress::Ip),
+        Medium::Ieee802154 => todo!(),
+    };
+
+    let mut iface = Interface::new(config, &mut device, Instant::now());
+    iface.update_ip_addrs(|ip_addrs| {
+        ip_addrs
+            .push(IpCidr::new(IpAddress::v4(127, 0, 0, 1), 8))
+            .unwrap();
+    });
+
+    // Create sockets
+    let server_socket = {
+        let tcp_rx_buffer = tcp::SocketBuffer::new(vec![0; 65536]);
+        let tcp_tx_buffer = tcp::SocketBuffer::new(vec![0; 65536]);
+        tcp::Socket::new(tcp_rx_buffer, tcp_tx_buffer)
+    };
+
+    let client_socket = {
+        let tcp_rx_buffer = tcp::SocketBuffer::new(vec![0; 65536]);
+        let tcp_tx_buffer = tcp::SocketBuffer::new(vec![0; 65536]);
+        tcp::Socket::new(tcp_rx_buffer, tcp_tx_buffer)
+    };
+
+    let mut sockets: [_; 2] = Default::default();
+    let mut sockets = SocketSet::new(&mut sockets[..]);
+    let server_handle = sockets.add(server_socket);
+    let client_handle = sockets.add(client_socket);
+
+    let start_time = Instant::now();
+
+    let mut did_listen = false;
+    let mut did_connect = false;
+    let mut processed = 0;
+    while processed < 1024 * 1024 * 1024 {
+        iface.poll(Instant::now(), &mut device, &mut sockets);
+
+        let socket = sockets.get_mut::<tcp::Socket>(server_handle);
+        if !socket.is_active() && !socket.is_listening() && !did_listen {
+            debug!("listening");
+            socket.listen(1234).unwrap();
+            did_listen = true;
+        }
+
+        while socket.can_recv() {
+            let received = socket.recv(|buffer| (buffer.len(), buffer.len())).unwrap();
+            debug!("got {:?}", received,);
+            processed += received;
+        }
+
+        let socket = sockets.get_mut::<tcp::Socket>(client_handle);
+        let cx = iface.context();
+        if !socket.is_open() && !did_connect {
+            debug!("connecting");
+            socket
+                .connect(cx, (IpAddress::v4(127, 0, 0, 1), 1234), 65000)
+                .unwrap();
+            did_connect = true;
+        }
+
+        while socket.can_send() {
+            debug!("sending");
+            socket.send(|buffer| (buffer.len(), ())).unwrap();
+        }
+    }
+
+    let duration = Instant::now() - start_time;
+    println!(
+        "done in {} s, bandwidth is {} Gbps",
+        duration.total_millis() as f64 / 1000.0,
+        (processed as u64 * 8 / duration.total_millis()) as f64 / 1000000.0
+    );
+}

+ 96 - 5
src/socket/tcp.rs

@@ -674,7 +674,6 @@ impl<'a> Socket<'a> {
     /// Return the current window field value, including scaling according to RFC 1323.
     ///
     /// Used in internal calculations as well as packet generation.
-    ///
     #[inline]
     fn scaled_window(&self) -> u16 {
         cmp::min(
@@ -683,6 +682,25 @@ impl<'a> Socket<'a> {
         ) as u16
     }
 
+    /// Return the last window field value, including scaling according to RFC 1323.
+    ///
+    /// Used in internal calculations as well as packet generation.
+    ///
+    /// Unlike `remote_last_win`, we take into account new packets received (but not acknowledged)
+    /// since the last window update and adjust the window length accordingly. This ensures a fair
+    /// comparison between the last window length and the new window length we're going to
+    /// advertise.
+    #[inline]
+    fn last_scaled_window(&self) -> Option<u16> {
+        let last_ack = self.remote_last_ack?;
+        let next_ack = self.remote_seq_no + self.rx_buffer.len();
+
+        let last_win = (self.remote_last_win as usize) << self.remote_win_shift;
+        let last_win_adjusted = last_ack + last_win - next_ack;
+
+        Some(cmp::min(last_win_adjusted >> self.remote_win_shift, (1 << 16) - 1) as u16)
+    }
+
     /// Set the timeout duration.
     ///
     /// A socket with a timeout duration set will abort the connection if either of the following
@@ -2165,13 +2183,26 @@ impl<'a> Socket<'a> {
         }
     }
 
+    /// Return whether we should send ACK immediately due to significant window updates.
+    ///
+    /// ACKs with significant window updates should be sent immediately to let the sender know that
+    /// more data can be sent. According to the Linux kernel implementation, "significant" means
+    /// doubling the receive window. The Linux kernel implementation can be found at
+    /// <https://elixir.bootlin.com/linux/v6.9.9/source/net/ipv4/tcp.c#L1472>.
     fn window_to_update(&self) -> bool {
         match self.state {
             State::SynSent
             | State::SynReceived
             | State::Established
             | State::FinWait1
-            | State::FinWait2 => self.scaled_window() > self.remote_last_win,
+            | State::FinWait2 => {
+                let new_win = self.scaled_window();
+                if let Some(last_win) = self.last_scaled_window() {
+                    new_win > 0 && new_win / 2 >= last_win
+                } else {
+                    false
+                }
+            }
             _ => false,
         }
     }
@@ -2237,7 +2268,7 @@ impl<'a> Socket<'a> {
         } else if self.ack_to_transmit() && self.delayed_ack_expired(cx.now()) {
             // If we have data to acknowledge, do it.
             tcp_trace!("outgoing segment will acknowledge");
-        } else if self.window_to_update() && self.delayed_ack_expired(cx.now()) {
+        } else if self.window_to_update() {
             // If we have window length increase to advertise, do it.
             tcp_trace!("outgoing segment will update window");
         } else if self.state == State::Closed {
@@ -2491,8 +2522,11 @@ impl<'a> Socket<'a> {
         } else if self.seq_to_transmit(cx) {
             // We have a data or flag packet to transmit.
             PollAt::Now
+        } else if self.window_to_update() {
+            // The receive window has been raised significantly.
+            PollAt::Now
         } else {
-            let want_ack = self.ack_to_transmit() || self.window_to_update();
+            let want_ack = self.ack_to_transmit();
 
             let delayed_ack_poll_at = match (want_ack, self.ack_delay_timer) {
                 (false, _) => PollAt::Ingress,
@@ -2826,7 +2860,7 @@ mod test {
         s.local_seq_no = LOCAL_SEQ + 1;
         s.remote_last_seq = LOCAL_SEQ + 1;
         s.remote_last_ack = Some(REMOTE_SEQ + 1);
-        s.remote_last_win = 64;
+        s.remote_last_win = s.scaled_window();
         s
     }
 
@@ -6366,6 +6400,63 @@ mod test {
         }));
     }
 
+    #[test]
+    fn test_window_update_with_delay_ack() {
+        let mut s = socket_established_with_buffer_sizes(6, 6);
+        s.ack_delay = Some(Duration::from_millis(10));
+
+        send!(
+            s,
+            TcpRepr {
+                seq_number: REMOTE_SEQ + 1,
+                ack_number: Some(LOCAL_SEQ + 1),
+                payload: &b"abcdef"[..],
+                ..SEND_TEMPL
+            }
+        );
+
+        recv_nothing!(s, time 5);
+
+        s.recv(|buffer| {
+            assert_eq!(&buffer[..2], b"ab");
+            (2, ())
+        })
+        .unwrap();
+        recv!(
+            s,
+            time 5,
+            Ok(TcpRepr {
+                seq_number: LOCAL_SEQ + 1,
+                ack_number: Some(REMOTE_SEQ + 1 + 6),
+                window_len: 2,
+                ..RECV_TEMPL
+            })
+        );
+
+        s.recv(|buffer| {
+            assert_eq!(&buffer[..1], b"c");
+            (1, ())
+        })
+        .unwrap();
+        recv_nothing!(s, time 5);
+
+        s.recv(|buffer| {
+            assert_eq!(&buffer[..1], b"d");
+            (1, ())
+        })
+        .unwrap();
+        recv!(
+            s,
+            time 5,
+            Ok(TcpRepr {
+                seq_number: LOCAL_SEQ + 1,
+                ack_number: Some(REMOTE_SEQ + 1 + 6),
+                window_len: 4,
+                ..RECV_TEMPL
+            })
+        );
+    }
+
     #[test]
     fn test_fill_peer_window() {
         let mut s = socket_established();