Просмотр исходного кода

Merge pull request #1026 from smoltcp-rs/netsim

tcp: add netsim test
Dario Nieuwenhuis 3 месяцев назад
Родитель
Сommit
df66f34f7c
6 измененных файлов с 1005 добавлено и 52 удалено
  1. 9 1
      .github/workflows/test.yml
  2. 8 0
      Cargo.toml
  3. 8 0
      ci.sh
  4. 601 51
      src/socket/tcp.rs
  5. 364 0
      tests/netsim.rs
  6. 15 0
      tests/snapshots/netsim__netsim.snap

+ 9 - 1
.github/workflows/test.yml

@@ -7,7 +7,7 @@ name: Test
 jobs:
   tests:
     runs-on: ubuntu-22.04
-    needs: [check-msrv, test-msrv, test-stable, clippy]
+    needs: [check-msrv, test-msrv, test-stable, clippy, test-netsim]
     steps:
       - name: Done
         run: exit 0
@@ -48,6 +48,14 @@ jobs:
       - name: Run Tests nightly
         run: ./ci.sh test nightly
 
+  test-netsim:
+    runs-on: ubuntu-22.04
+    continue-on-error: true
+    steps:
+      - uses: actions/checkout@v4
+      - name: Run network-simulation tests
+        run: ./ci.sh netsim
+
   test-build-16bit:
     runs-on: ubuntu-22.04
     continue-on-error: true

+ 8 - 0
Cargo.toml

@@ -35,6 +35,8 @@ getopts = "0.2"
 rand = "0.8"
 url = "2.0"
 rstest = "0.17"
+insta = "1.41.1"
+rand_chacha = "0.3.1"
 
 [features]
 std = ["managed/std", "alloc"]
@@ -109,6 +111,8 @@ default = [
 
 "_proto-fragmentation" = []
 
+"_netsim" = []
+
 # BEGIN AUTOGENERATED CONFIG FEATURES
 # Generated by gen_config.py. DO NOT EDIT.
 iface-max-addr-count-1 = []
@@ -267,6 +271,10 @@ rpl-parents-buffer-count-32 = []
 
 # END AUTOGENERATED CONFIG FEATURES
 
+[[test]]
+name = "netsim"
+required-features = ["_netsim"]
+
 [[example]]
 name = "packet2pcap"
 path = "utils/packet2pcap.rs"

+ 8 - 0
ci.sh

@@ -60,6 +60,10 @@ test() {
     fi
 }
 
+netsim() {
+    cargo test --release --features _netsim netsim
+}
+
 check() {
     local version=$1
     rustup toolchain install $version
@@ -139,3 +143,7 @@ fi
 if [[ $1 == "coverage" || $1 == "all" ]]; then
     coverage
 fi
+
+if [[ $1 == "netsim" || $1 == "all" ]]; then
+    netsim
+fi

+ 601 - 51
src/socket/tcp.rs

@@ -277,10 +277,20 @@ impl RttEstimator {
 #[derive(Debug, Clone, Copy, PartialEq)]
 #[cfg_attr(feature = "defmt", derive(defmt::Format))]
 enum Timer {
-    Idle { keep_alive_at: Option<Instant> },
-    Retransmit { expires_at: Instant },
+    Idle {
+        keep_alive_at: Option<Instant>,
+    },
+    Retransmit {
+        expires_at: Instant,
+    },
     FastRetransmit,
-    Close { expires_at: Instant },
+    ZeroWindowProbe {
+        expires_at: Instant,
+        delay: Duration,
+    },
+    Close {
+        expires_at: Instant,
+    },
 }
 
 const ACK_DELAY_DEFAULT: Duration = Duration::from_millis(10);
@@ -317,6 +327,13 @@ impl Timer {
         }
     }
 
+    fn should_zero_window_probe(&self, timestamp: Instant) -> bool {
+        match *self {
+            Timer::ZeroWindowProbe { expires_at, .. } if timestamp >= expires_at => true,
+            _ => false,
+        }
+    }
+
     fn poll_at(&self) -> PollAt {
         match *self {
             Timer::Idle {
@@ -325,6 +342,7 @@ impl Timer {
             Timer::Idle {
                 keep_alive_at: None,
             } => PollAt::Ingress,
+            Timer::ZeroWindowProbe { expires_at, .. } => PollAt::Time(expires_at),
             Timer::Retransmit { expires_at, .. } => PollAt::Time(expires_at),
             Timer::FastRetransmit => PollAt::Now,
             Timer::Close { expires_at } => PollAt::Time(expires_at),
@@ -353,7 +371,10 @@ impl Timer {
 
     fn set_for_retransmit(&mut self, timestamp: Instant, delay: Duration) {
         match *self {
-            Timer::Idle { .. } | Timer::FastRetransmit { .. } | Timer::Retransmit { .. } => {
+            Timer::Idle { .. }
+            | Timer::FastRetransmit { .. }
+            | Timer::Retransmit { .. }
+            | Timer::ZeroWindowProbe { .. } => {
                 *self = Timer::Retransmit {
                     expires_at: timestamp + delay,
                 }
@@ -372,12 +393,34 @@ impl Timer {
         }
     }
 
-    fn is_retransmit(&self) -> bool {
-        match *self {
-            Timer::Retransmit { .. } | Timer::FastRetransmit => true,
-            _ => false,
+    fn set_for_zero_window_probe(&mut self, timestamp: Instant, delay: Duration) {
+        *self = Timer::ZeroWindowProbe {
+            expires_at: timestamp + delay,
+            delay,
         }
     }
+
+    fn rewind_zero_window_probe(&mut self, timestamp: Instant) {
+        if let Timer::ZeroWindowProbe { mut delay, .. } = *self {
+            delay = (delay * 2).min(Duration::from_millis(RTTE_MAX_RTO as _));
+            *self = Timer::ZeroWindowProbe {
+                expires_at: timestamp + delay,
+                delay,
+            }
+        }
+    }
+
+    fn is_idle(&self) -> bool {
+        matches!(self, Timer::Idle { .. })
+    }
+
+    fn is_zero_window_probe(&self) -> bool {
+        matches!(self, Timer::ZeroWindowProbe { .. })
+    }
+
+    fn is_retransmit(&self) -> bool {
+        matches!(self, Timer::Retransmit { .. } | Timer::FastRetransmit)
+    }
 }
 
 #[derive(Debug, PartialEq, Eq, Clone, Copy)]
@@ -1182,6 +1225,17 @@ impl<'a> Socket<'a> {
                 self.remote_last_ts = None
             }
 
+            // if remote win is zero and we go from having no data to some data pending to
+            // send, start the zero window probe timer.
+            if self.remote_win_len == 0 && self.timer.is_idle() {
+                let delay = self.rtte.retransmission_timeout();
+                tcp_trace!("starting zero-window-probe timer for t+{}", delay);
+
+                // We don't have access to the current time here, so use Instant::ZERO instead.
+                // this will cause the first ZWP to be sent immediately, but that's okay.
+                self.timer.set_for_zero_window_probe(Instant::ZERO, delay);
+            }
+
             #[cfg(any(test, feature = "verbose"))]
             tcp_trace!(
                 "tx buffer: enqueueing {} octets (now {})",
@@ -1713,7 +1767,7 @@ impl<'a> Socket<'a> {
                         ack_of_fin = true;
                     }
 
-                    ack_all = self.remote_last_seq == ack_number
+                    ack_all = self.remote_last_seq <= ack_number;
                 }
 
                 self.rtte.on_ack(cx.now(), ack_number);
@@ -1796,7 +1850,6 @@ impl<'a> Socket<'a> {
             // ACK packets in the SYN-RECEIVED state change it to ESTABLISHED.
             (State::SynReceived, TcpControl::None) => {
                 self.set_state(State::Established);
-                self.timer.set_for_idle(cx.now(), self.keep_alive);
             }
 
             // FIN packets in the SYN-RECEIVED state change it to CLOSE-WAIT.
@@ -1806,7 +1859,6 @@ impl<'a> Socket<'a> {
                 self.remote_seq_no += 1;
                 self.rx_fin_received = true;
                 self.set_state(State::CloseWait);
-                self.timer.set_for_idle(cx.now(), self.keep_alive);
             }
 
             // SYN|ACK packets in the SYN-SENT state change it to ESTABLISHED.
@@ -1847,26 +1899,15 @@ impl<'a> Socket<'a> {
                 } else {
                     self.set_state(State::SynReceived);
                 }
-                self.timer.set_for_idle(cx.now(), self.keep_alive);
             }
 
-            // RFC 6298: (5.2) ACK of all outstanding data turn off the retransmit timer.
-            // (5.3) ACK of new data in ESTABLISHED state restart the retransmit timer.
-            (State::Established, TcpControl::None) => {
-                if ack_all {
-                    self.timer.set_for_idle(cx.now(), self.keep_alive);
-                } else if ack_len > 0 {
-                    self.timer
-                        .set_for_retransmit(cx.now(), self.rtte.retransmission_timeout());
-                }
-            }
+            (State::Established, TcpControl::None) => {}
 
             // FIN packets in ESTABLISHED state indicate the remote side has closed.
             (State::Established, TcpControl::Fin) => {
                 self.remote_seq_no += 1;
                 self.rx_fin_received = true;
                 self.set_state(State::CloseWait);
-                self.timer.set_for_idle(cx.now(), self.keep_alive);
             }
 
             // ACK packets in FIN-WAIT-1 state change it to FIN-WAIT-2, if we've already
@@ -1875,9 +1916,6 @@ impl<'a> Socket<'a> {
                 if ack_of_fin {
                     self.set_state(State::FinWait2);
                 }
-                if ack_all {
-                    self.timer.set_for_idle(cx.now(), self.keep_alive);
-                }
             }
 
             // FIN packets in FIN-WAIT-1 state change it to CLOSING, or to TIME-WAIT
@@ -1890,14 +1928,10 @@ impl<'a> Socket<'a> {
                     self.timer.set_for_close(cx.now());
                 } else {
                     self.set_state(State::Closing);
-                    self.timer.set_for_idle(cx.now(), self.keep_alive);
                 }
             }
 
-            // Data packets in FIN-WAIT-2 reset the idle timer.
-            (State::FinWait2, TcpControl::None) => {
-                self.timer.set_for_idle(cx.now(), self.keep_alive);
-            }
+            (State::FinWait2, TcpControl::None) => {}
 
             // FIN packets in FIN-WAIT-2 state change it to TIME-WAIT.
             (State::FinWait2, TcpControl::Fin) => {
@@ -1912,15 +1946,10 @@ impl<'a> Socket<'a> {
                 if ack_of_fin {
                     self.set_state(State::TimeWait);
                     self.timer.set_for_close(cx.now());
-                } else {
-                    self.timer.set_for_idle(cx.now(), self.keep_alive);
                 }
             }
 
-            // ACK packets in CLOSE-WAIT state reset the retransmit timer.
-            (State::CloseWait, TcpControl::None) => {
-                self.timer.set_for_idle(cx.now(), self.keep_alive);
-            }
+            (State::CloseWait, TcpControl::None) => {}
 
             // ACK packets in LAST-ACK state change it to CLOSED.
             (State::LastAck, TcpControl::None) => {
@@ -1928,8 +1957,6 @@ impl<'a> Socket<'a> {
                     // Clear the remote endpoint, or we'll send an RST there.
                     self.set_state(State::Closed);
                     self.tuple = None;
-                } else {
-                    self.timer.set_for_idle(cx.now(), self.keep_alive);
                 }
             }
 
@@ -2040,6 +2067,39 @@ impl<'a> Socket<'a> {
             self.last_remote_tsval = timestamp.tsval;
         }
 
+        // update timers.
+        match self.timer {
+            Timer::Retransmit { .. } | Timer::FastRetransmit => {
+                if ack_all {
+                    // RFC 6298: (5.2) ACK of all outstanding data turn off the retransmit timer.
+                    self.timer.set_for_idle(cx.now(), self.keep_alive);
+                } else if ack_len > 0 {
+                    // (5.3) ACK of new data in ESTABLISHED state restart the retransmit timer.
+                    let rto = self.rtte.retransmission_timeout();
+                    self.timer.set_for_retransmit(cx.now(), rto);
+                }
+            }
+            Timer::Idle { .. } => {
+                // any packet on idle refresh the keepalive timer.
+                self.timer.set_for_idle(cx.now(), self.keep_alive);
+            }
+            _ => {}
+        }
+
+        // start/stop the Zero Window Probe timer.
+        if self.remote_win_len == 0
+            && !self.tx_buffer.is_empty()
+            && (self.timer.is_idle() || ack_len > 0)
+        {
+            let delay = self.rtte.retransmission_timeout();
+            tcp_trace!("starting zero-window-probe timer for t+{}", delay);
+            self.timer.set_for_zero_window_probe(cx.now(), delay);
+        }
+        if self.remote_win_len != 0 && self.timer.is_zero_window_probe() {
+            tcp_trace!("stopping zero-window-probe timer");
+            self.timer.set_for_idle(cx.now(), self.keep_alive);
+        }
+
         let payload_len = payload.len();
         if payload_len == 0 {
             return None;
@@ -2326,6 +2386,8 @@ impl<'a> Socket<'a> {
         } else if self.timer.should_keep_alive(cx.now()) {
             // If we need to transmit a keep-alive packet, do it.
             tcp_trace!("keep-alive timer expired");
+        } else if self.timer.should_zero_window_probe(cx.now()) {
+            tcp_trace!("sending zero-window probe");
         } else if self.timer.should_close(cx.now()) {
             // If we have spent enough time in the TIME-WAIT state, close the socket.
             tcp_trace!("TIME-WAIT timer expired");
@@ -2368,6 +2430,8 @@ impl<'a> Socket<'a> {
             payload: &[],
         };
 
+        let mut is_zero_window_probe = false;
+
         match self.state {
             // We transmit an RST in the CLOSED state. If we ended up in the CLOSED state
             // with a specified endpoint, it means that the socket was aborted.
@@ -2409,7 +2473,7 @@ impl<'a> Socket<'a> {
                 let win_right_edge = self.local_seq_no + self.remote_win_len;
 
                 // Max amount of octets we're allowed to send according to the remote window.
-                let win_limit = if win_right_edge >= self.remote_last_seq {
+                let mut win_limit = if win_right_edge >= self.remote_last_seq {
                     win_right_edge - self.remote_last_seq
                 } else {
                     // This can happen if we've sent some data and later the remote side
@@ -2420,6 +2484,12 @@ impl<'a> Socket<'a> {
                     0
                 };
 
+                // To send a zero-window-probe, force the window limit to at least 1 byte.
+                if win_limit == 0 && self.timer.should_zero_window_probe(cx.now()) {
+                    win_limit = 1;
+                    is_zero_window_probe = true;
+                }
+
                 // Maximum size we're allowed to send. This can be limited by 3 factors:
                 // 1. remote window
                 // 2. MSS the remote is willing to accept, probably determined by their MTU
@@ -2518,6 +2588,12 @@ impl<'a> Socket<'a> {
         }
         self.ack_delay_timer = AckDelayTimer::Idle;
 
+        // Leave the rest of the state intact if sending a zero-window probe.
+        if is_zero_window_probe {
+            self.timer.rewind_zero_window_probe(cx.now());
+            return Ok(());
+        }
+
         // Leave the rest of the state intact if sending a keep-alive packet, since those
         // carry a fake segment.
         if is_keep_alive {
@@ -2537,12 +2613,12 @@ impl<'a> Socket<'a> {
                 .post_transmit(cx.now(), repr.segment_len());
         }
 
-        if !self.seq_to_transmit(cx) && repr.segment_len() > 0 && !self.timer.is_retransmit() {
-            // RFC 6298: (5.1) If we've transmitted all data we could (and there was
-            // something at all, data or flag, to transmit, not just an ACK), start the
-            // retransmit timer if it is not already running.
-            self.timer
-                .set_for_retransmit(cx.now(), self.rtte.retransmission_timeout());
+        if repr.segment_len() > 0 && !self.timer.is_retransmit() {
+            // RFC 6298 (5.1) Every time a packet containing data is sent (including a
+            // retransmission), if the timer is not running, start it running
+            // so that it will expire after RTO seconds.
+            let rto = self.rtte.retransmission_timeout();
+            self.timer.set_for_retransmit(cx.now(), rto);
         }
 
         if self.state == State::Closed {
@@ -2806,11 +2882,14 @@ mod test {
     fn recv_nothing(socket: &mut TestSocket, timestamp: Instant) {
         socket.cx.set_now(timestamp);
 
-        let result: Result<(), ()> = socket
-            .socket
-            .dispatch(&mut socket.cx, |_, (_ip_repr, _tcp_repr)| {
-                panic!("Should not send a packet")
-            });
+        let mut fail = false;
+        let result: Result<(), ()> = socket.socket.dispatch(&mut socket.cx, |_, _| {
+            fail = true;
+            Ok(())
+        });
+        if fail {
+            panic!("Should not send a packet")
+        }
 
         assert_eq!(result, Ok(()))
     }
@@ -2833,6 +2912,10 @@ mod test {
             $( recv!($socket, Ok($repr)); )*
             recv_nothing!($socket)
         });
+        ($socket:ident, time $time:expr, [$( $repr:expr ),*]) => ({
+            $( recv!($socket, time $time, Ok($repr)); )*
+            recv_nothing!($socket, time $time)
+        });
         ($socket:ident, $result:expr) =>
             (recv!($socket, time 0, $result));
         ($socket:ident, time $time:expr, $result:expr) =>
@@ -2948,6 +3031,9 @@ mod test {
         s.state = State::Closing;
         s.remote_last_seq = LOCAL_SEQ + 1 + 1;
         s.remote_seq_no = REMOTE_SEQ + 1 + 1;
+        s.timer = Timer::Retransmit {
+            expires_at: Instant::from_millis_const(1000),
+        };
         s
     }
 
@@ -6443,6 +6529,164 @@ mod test {
         }));
     }
 
+    #[test]
+    fn test_data_retransmit_ack_more_than_expected() {
+        let mut s = socket_established();
+        s.remote_mss = 6;
+        s.send_slice(b"aaaaaabbbbbbcccccc").unwrap();
+
+        recv!(s, time 0, Ok(TcpRepr {
+            seq_number: LOCAL_SEQ + 1,
+            ack_number: Some(REMOTE_SEQ + 1),
+            payload:    &b"aaaaaa"[..],
+            ..RECV_TEMPL
+        }));
+        recv!(s, time 0, Ok(TcpRepr {
+            seq_number: LOCAL_SEQ + 1 + 6,
+            ack_number: Some(REMOTE_SEQ + 1),
+            payload:    &b"bbbbbb"[..],
+            ..RECV_TEMPL
+        }));
+        recv!(s, time 0, Ok(TcpRepr {
+            seq_number: LOCAL_SEQ + 1 + 12,
+            ack_number: Some(REMOTE_SEQ + 1),
+            payload:    &b"cccccc"[..],
+            ..RECV_TEMPL
+        }));
+        recv_nothing!(s, time 0);
+
+        recv_nothing!(s, time 50);
+
+        // retransmit timer expires, we want to retransmit all 3 packets
+        // but we only manage to retransmit 2 (due to e.g. lack of device buffer space)
+        assert!(s.timer.is_retransmit());
+        recv!(s, time 1000, Ok(TcpRepr {
+            seq_number: LOCAL_SEQ + 1,
+            ack_number: Some(REMOTE_SEQ + 1),
+            payload:    &b"aaaaaa"[..],
+            ..RECV_TEMPL
+        }));
+        recv!(s, time 1000, Ok(TcpRepr {
+            seq_number: LOCAL_SEQ + 1 + 6,
+            ack_number: Some(REMOTE_SEQ + 1),
+            payload:    &b"bbbbbb"[..],
+            ..RECV_TEMPL
+        }));
+
+        // ack first packet.
+        send!(
+            s,
+            time 3000,
+            TcpRepr {
+                seq_number: REMOTE_SEQ + 1,
+                ack_number: Some(LOCAL_SEQ + 1 + 6),
+                ..SEND_TEMPL
+            }
+        );
+
+        // this should keep retransmit timer on, because there's
+        // still unacked data.
+        assert!(s.timer.is_retransmit());
+
+        // ack all three packets.
+        // This might confuse the TCP stack because after the retransmit
+        // it "thinks" the 3rd packet hasn't been transmitted yet, but it is getting acked.
+        send!(
+            s,
+            time 3000,
+            TcpRepr {
+                seq_number: REMOTE_SEQ + 1,
+                ack_number: Some(LOCAL_SEQ + 1 + 18),
+                ..SEND_TEMPL
+            }
+        );
+
+        // this should exit retransmit mode.
+        assert!(!s.timer.is_retransmit());
+        // and consider all data ACKed.
+        assert!(s.tx_buffer.is_empty());
+        recv_nothing!(s, time 5000);
+    }
+
+    #[test]
+    fn test_retransmit_fin() {
+        let mut s = socket_established();
+        s.close();
+        recv!(s, time 0, Ok(TcpRepr {
+            control: TcpControl::Fin,
+            seq_number: LOCAL_SEQ + 1,
+            ack_number: Some(REMOTE_SEQ + 1),
+            ..RECV_TEMPL
+        }));
+
+        recv_nothing!(s, time 999);
+        recv!(s, time 1000, Ok(TcpRepr {
+            control: TcpControl::Fin,
+            seq_number: LOCAL_SEQ + 1,
+            ack_number: Some(REMOTE_SEQ + 1),
+            ..RECV_TEMPL
+        }));
+    }
+
+    #[test]
+    fn test_retransmit_fin_wait() {
+        let mut s = socket_fin_wait_1();
+        // we send FIN
+        recv!(
+            s,
+            [TcpRepr {
+                control: TcpControl::Fin,
+                seq_number: LOCAL_SEQ + 1,
+                ack_number: Some(REMOTE_SEQ + 1),
+                ..RECV_TEMPL
+            }]
+        );
+        // remote also sends FIN, does NOT ack ours.
+        send!(
+            s,
+            TcpRepr {
+                control: TcpControl::Fin,
+                seq_number: REMOTE_SEQ + 1,
+                ack_number: Some(LOCAL_SEQ + 1),
+                ..SEND_TEMPL
+            }
+        );
+        // we ack it
+        recv!(
+            s,
+            [TcpRepr {
+                control: TcpControl::None,
+                seq_number: LOCAL_SEQ + 2,
+                ack_number: Some(REMOTE_SEQ + 2),
+                ..RECV_TEMPL
+            }]
+        );
+
+        // we haven't got an ACK for our FIN, we should retransmit.
+        recv_nothing!(s, time 999);
+        recv!(
+            s,
+            time 1000,
+            [TcpRepr {
+                control: TcpControl::Fin,
+                seq_number: LOCAL_SEQ + 1,
+                ack_number: Some(REMOTE_SEQ + 2),
+                ..RECV_TEMPL
+            }]
+        );
+        recv_nothing!(s, time 2999);
+        recv!(
+            s,
+            time 3000,
+            [TcpRepr {
+                control: TcpControl::Fin,
+                seq_number: LOCAL_SEQ + 1,
+                ack_number: Some(REMOTE_SEQ + 2),
+                ..RECV_TEMPL
+            }]
+        );
+    }
+
     // =========================================================================================//
     // Tests for window management.
     // =========================================================================================//
@@ -6908,6 +7152,312 @@ mod test {
         assert!(s.window_to_update());
     }
 
+    // =========================================================================================//
+    // Tests for zero-window probes.
+    // =========================================================================================//
+
+    #[test]
+    fn test_zero_window_probe_enter_on_win_update() {
+        let mut s = socket_established();
+
+        assert!(!s.timer.is_zero_window_probe());
+
+        s.send_slice(b"abcdef123456!@#$%^").unwrap();
+
+        assert!(!s.timer.is_zero_window_probe());
+
+        send!(
+            s,
+            TcpRepr {
+                seq_number: REMOTE_SEQ + 1,
+                ack_number: Some(LOCAL_SEQ + 1),
+                window_len: 0,
+                ..SEND_TEMPL
+            }
+        );
+
+        assert!(s.timer.is_zero_window_probe());
+    }
+
+    #[test]
+    fn test_zero_window_probe_enter_on_send() {
+        let mut s = socket_established();
+
+        send!(
+            s,
+            TcpRepr {
+                seq_number: REMOTE_SEQ + 1,
+                ack_number: Some(LOCAL_SEQ + 1),
+                window_len: 0,
+                ..SEND_TEMPL
+            }
+        );
+
+        assert!(!s.timer.is_zero_window_probe());
+
+        s.send_slice(b"abcdef123456!@#$%^").unwrap();
+
+        assert!(s.timer.is_zero_window_probe());
+    }
+
+    #[test]
+    fn test_zero_window_probe_exit() {
+        let mut s = socket_established();
+
+        s.send_slice(b"abcdef123456!@#$%^").unwrap();
+
+        assert!(!s.timer.is_zero_window_probe());
+
+        send!(
+            s,
+            TcpRepr {
+                seq_number: REMOTE_SEQ + 1,
+                ack_number: Some(LOCAL_SEQ + 1),
+                window_len: 0,
+                ..SEND_TEMPL
+            }
+        );
+
+        assert!(s.timer.is_zero_window_probe());
+
+        send!(
+            s,
+            TcpRepr {
+                seq_number: REMOTE_SEQ + 1,
+                ack_number: Some(LOCAL_SEQ + 1),
+                window_len: 6,
+                ..SEND_TEMPL
+            }
+        );
+
+        assert!(!s.timer.is_zero_window_probe());
+    }
+
+    #[test]
+    fn test_zero_window_probe_exit_ack() {
+        let mut s = socket_established();
+
+        s.send_slice(b"abcdef123456!@#$%^").unwrap();
+        send!(
+            s,
+            TcpRepr {
+                seq_number: REMOTE_SEQ + 1,
+                ack_number: Some(LOCAL_SEQ + 1),
+                window_len: 0,
+                ..SEND_TEMPL
+            }
+        );
+
+        recv!(
+            s,
+            time 1000,
+            [TcpRepr {
+                seq_number: LOCAL_SEQ + 1,
+                ack_number: Some(REMOTE_SEQ + 1),
+                payload: &b"a"[..],
+                ..RECV_TEMPL
+            }]
+        );
+
+        send!(
+            s,
+            time 1010,
+            TcpRepr {
+                seq_number: REMOTE_SEQ + 1,
+                ack_number: Some(LOCAL_SEQ + 2),
+                window_len: 6,
+                ..SEND_TEMPL
+            }
+        );
+
+        recv!(
+            s,
+            time 1010,
+            [TcpRepr {
+                seq_number: LOCAL_SEQ + 2,
+                ack_number: Some(REMOTE_SEQ + 1),
+                payload: &b"bcdef1"[..],
+                ..RECV_TEMPL
+            }]
+        );
+    }
+
+    #[test]
+    fn test_zero_window_probe_backoff_nack_reply() {
+        let mut s = socket_established();
+        s.send_slice(b"abcdef123456!@#$%^").unwrap();
+        send!(
+            s,
+            TcpRepr {
+                seq_number: REMOTE_SEQ + 1,
+                ack_number: Some(LOCAL_SEQ + 1),
+                window_len: 0,
+                ..SEND_TEMPL
+            }
+        );
+
+        recv_nothing!(s, time 999);
+        recv!(
+            s,
+            time 1000,
+            [TcpRepr {
+                seq_number: LOCAL_SEQ + 1,
+                ack_number: Some(REMOTE_SEQ + 1),
+                payload: &b"a"[..],
+                ..RECV_TEMPL
+            }]
+        );
+        send!(
+            s,
+            time 1100,
+            TcpRepr {
+                seq_number: REMOTE_SEQ + 1,
+                ack_number: Some(LOCAL_SEQ + 1),
+                window_len: 0,
+                ..SEND_TEMPL
+            }
+        );
+
+        recv_nothing!(s, time 2999);
+        recv!(
+            s,
+            time 3000,
+            [TcpRepr {
+                seq_number: LOCAL_SEQ + 1,
+                ack_number: Some(REMOTE_SEQ + 1),
+                payload: &b"a"[..],
+                ..RECV_TEMPL
+            }]
+        );
+        send!(
+            s,
+            time 3100,
+            TcpRepr {
+                seq_number: REMOTE_SEQ + 1,
+                ack_number: Some(LOCAL_SEQ + 1),
+                window_len: 0,
+                ..SEND_TEMPL
+            }
+        );
+
+        recv_nothing!(s, time 6999);
+        recv!(
+            s,
+            time 7000,
+            [TcpRepr {
+                seq_number: LOCAL_SEQ + 1,
+                ack_number: Some(REMOTE_SEQ + 1),
+                payload: &b"a"[..],
+                ..RECV_TEMPL
+            }]
+        );
+    }
+
+    #[test]
+    fn test_zero_window_probe_backoff_no_reply() {
+        let mut s = socket_established();
+        s.send_slice(b"abcdef123456!@#$%^").unwrap();
+        send!(
+            s,
+            TcpRepr {
+                seq_number: REMOTE_SEQ + 1,
+                ack_number: Some(LOCAL_SEQ + 1),
+                window_len: 0,
+                ..SEND_TEMPL
+            }
+        );
+
+        recv_nothing!(s, time 999);
+        recv!(
+            s,
+            time 1000,
+            [TcpRepr {
+                seq_number: LOCAL_SEQ + 1,
+                ack_number: Some(REMOTE_SEQ + 1),
+                payload: &b"a"[..],
+                ..RECV_TEMPL
+            }]
+        );
+
+        recv_nothing!(s, time 2999);
+        recv!(
+            s,
+            time 3000,
+            [TcpRepr {
+                seq_number: LOCAL_SEQ + 1,
+                ack_number: Some(REMOTE_SEQ + 1),
+                payload: &b"a"[..],
+                ..RECV_TEMPL
+            }]
+        );
+    }
+
+    #[test]
+    fn test_zero_window_probe_shift() {
+        let mut s = socket_established();
+
+        s.send_slice(b"abcdef123456!@#$%^").unwrap();
+        send!(
+            s,
+            TcpRepr {
+                seq_number: REMOTE_SEQ + 1,
+                ack_number: Some(LOCAL_SEQ + 1),
+                window_len: 0,
+                ..SEND_TEMPL
+            }
+        );
+
+        recv_nothing!(s, time 999);
+        recv!(
+            s,
+            time 1000,
+            [TcpRepr {
+                seq_number: LOCAL_SEQ + 1,
+                ack_number: Some(REMOTE_SEQ + 1),
+                payload: &b"a"[..],
+                ..RECV_TEMPL
+            }]
+        );
+
+        recv_nothing!(s, time 2999);
+        recv!(
+            s,
+            time 3000,
+            [TcpRepr {
+                seq_number: LOCAL_SEQ + 1,
+                ack_number: Some(REMOTE_SEQ + 1),
+                payload: &b"a"[..],
+                ..RECV_TEMPL
+            }]
+        );
+
+        // ack the ZWP byte, but still advertise zero window.
+        // this should restart the ZWP timer.
+        send!(
+            s,
+            time 3100,
+            TcpRepr {
+                seq_number: REMOTE_SEQ + 1,
+                ack_number: Some(LOCAL_SEQ + 2),
+                window_len: 0,
+                ..SEND_TEMPL
+            }
+        );
+
+        // ZWP should be sent at 3100+1000 = 4100
+        recv_nothing!(s, time 4099);
+        recv!(
+            s,
+            time 4100,
+            [TcpRepr {
+                seq_number: LOCAL_SEQ + 2,
+                ack_number: Some(REMOTE_SEQ + 1),
+                payload: &b"b"[..],
+                ..RECV_TEMPL
+            }]
+        );
+    }
+
     // =========================================================================================//
     // Tests for timeouts.
     // =========================================================================================//

+ 364 - 0
tests/netsim.rs

@@ -0,0 +1,364 @@
+use std::cell::RefCell;
+use std::collections::BinaryHeap;
+use std::fmt::Write as _;
+use std::io::Write as _;
+use std::sync::Mutex;
+
+use rand::{Rng, SeedableRng};
+use rand_chacha::ChaCha20Rng;
+use smoltcp::iface::{Config, Interface, SocketSet};
+use smoltcp::phy::Tracer;
+use smoltcp::phy::{self, ChecksumCapabilities, Device, DeviceCapabilities, Medium};
+use smoltcp::socket::tcp;
+use smoltcp::time::{Duration, Instant};
+use smoltcp::wire::{EthernetAddress, HardwareAddress, IpAddress, IpCidr};
+
+const MAC_A: HardwareAddress = HardwareAddress::Ethernet(EthernetAddress([2, 0, 0, 0, 0, 1]));
+const MAC_B: HardwareAddress = HardwareAddress::Ethernet(EthernetAddress([2, 0, 0, 0, 0, 2]));
+const IP_A: IpAddress = IpAddress::v4(10, 0, 0, 1);
+const IP_B: IpAddress = IpAddress::v4(10, 0, 0, 2);
+
+const BYTES: usize = 10 * 1024 * 1024;
+
+static CLOCK: Mutex<(Instant, char)> = Mutex::new((Instant::ZERO, ' '));
+
+#[test]
+fn netsim() {
+    setup_logging();
+
+    let buffers = [128, 256, 512, 1024, 2048, 4096, 8192, 16384, 32768];
+    let losses = [0.0, 0.001, 0.01, 0.02, 0.05, 0.10, 0.20, 0.30];
+
+    let mut s = String::new();
+
+    write!(&mut s, "buf\\loss").unwrap();
+    for loss in losses {
+        write!(&mut s, "{loss:9.3} ").unwrap();
+    }
+    writeln!(&mut s).unwrap();
+
+    for buffer in buffers {
+        write!(&mut s, "{buffer:7}").unwrap();
+        for loss in losses {
+            let r = run_test(TestCase {
+                rtt: Duration::from_millis(100),
+                buffer,
+                loss,
+            });
+            write!(&mut s, " {r:9.2}").unwrap();
+        }
+        writeln!(&mut s).unwrap();
+    }
+
+    insta::assert_snapshot!(s);
+}
+
+struct TestCase {
+    rtt: Duration,
+    loss: f64,
+    buffer: usize,
+}
+
+fn run_test(case: TestCase) -> f64 {
+    let mut time = Instant::ZERO;
+
+    let params = QueueParams {
+        latency: case.rtt / 2,
+        loss: case.loss,
+    };
+    let queue_a_to_b = RefCell::new(PacketQueue::new(params.clone(), 0));
+    let queue_b_to_a = RefCell::new(PacketQueue::new(params.clone(), 1));
+    let device_a = QueueDevice::new(&queue_a_to_b, &queue_b_to_a, Medium::Ethernet);
+    let device_b = QueueDevice::new(&queue_b_to_a, &queue_a_to_b, Medium::Ethernet);
+
+    let mut device_a = Tracer::new(device_a, |_timestamp, _printer| log::trace!("{}", _printer));
+    let mut device_b = Tracer::new(device_b, |_timestamp, _printer| log::trace!("{}", _printer));
+
+    let mut iface_a = Interface::new(Config::new(MAC_A), &mut device_a, time);
+    iface_a.update_ip_addrs(|a| a.push(IpCidr::new(IP_A, 8)).unwrap());
+    let mut iface_b = Interface::new(Config::new(MAC_B), &mut device_b, time);
+    iface_b.update_ip_addrs(|a| a.push(IpCidr::new(IP_B, 8)).unwrap());
+
+    // Create sockets
+    let socket_a = {
+        let tcp_rx_buffer = tcp::SocketBuffer::new(vec![0; case.buffer]);
+        let tcp_tx_buffer = tcp::SocketBuffer::new(vec![0; case.buffer]);
+        tcp::Socket::new(tcp_rx_buffer, tcp_tx_buffer)
+    };
+
+    let socket_b = {
+        let tcp_rx_buffer = tcp::SocketBuffer::new(vec![0; case.buffer]);
+        let tcp_tx_buffer = tcp::SocketBuffer::new(vec![0; case.buffer]);
+        tcp::Socket::new(tcp_rx_buffer, tcp_tx_buffer)
+    };
+
+    let mut sockets_a: [_; 2] = Default::default();
+    let mut sockets_a = SocketSet::new(&mut sockets_a[..]);
+    let socket_a_handle = sockets_a.add(socket_a);
+
+    let mut sockets_b: [_; 2] = Default::default();
+    let mut sockets_b = SocketSet::new(&mut sockets_b[..]);
+    let socket_b_handle = sockets_b.add(socket_b);
+
+    let mut did_listen = false;
+    let mut did_connect = false;
+    let mut processed = 0;
+    while processed < BYTES {
+        *CLOCK.lock().unwrap() = (time, ' ');
+        log::info!("loop");
+        //println!("t = {}", time);
+
+        *CLOCK.lock().unwrap() = (time, 'A');
+
+        iface_a.poll(time, &mut device_a, &mut sockets_a);
+
+        let socket = sockets_a.get_mut::<tcp::Socket>(socket_a_handle);
+        if !socket.is_active() && !socket.is_listening() && !did_listen {
+            //println!("listening");
+            socket.listen(1234).unwrap();
+            did_listen = true;
+        }
+
+        while socket.can_recv() {
+            let received = socket.recv(|buffer| (buffer.len(), buffer.len())).unwrap();
+            //println!("got {:?}", received,);
+            processed += received;
+        }
+
+        *CLOCK.lock().unwrap() = (time, 'B');
+        iface_b.poll(time, &mut device_b, &mut sockets_b);
+        let socket = sockets_b.get_mut::<tcp::Socket>(socket_b_handle);
+        let cx = iface_b.context();
+        if !socket.is_open() && !did_connect {
+            //println!("connecting");
+            socket.connect(cx, (IP_A, 1234), 65000).unwrap();
+            did_connect = true;
+        }
+
+        while socket.can_send() {
+            //println!("sending");
+            socket.send(|buffer| (buffer.len(), ())).unwrap();
+        }
+
+        *CLOCK.lock().unwrap() = (time, ' ');
+
+        let mut next_time = queue_a_to_b.borrow_mut().next_expiration();
+        next_time = next_time.min(queue_b_to_a.borrow_mut().next_expiration());
+        if let Some(t) = iface_a.poll_at(time, &sockets_a) {
+            next_time = next_time.min(t);
+        }
+        if let Some(t) = iface_b.poll_at(time, &sockets_b) {
+            next_time = next_time.min(t);
+        }
+        assert!(next_time.total_micros() != i64::MAX);
+        time = time.max(next_time);
+    }
+
+    let duration = time - Instant::ZERO;
+    processed as f64 / duration.total_micros() as f64 * 1e6
+}
+
+struct Packet {
+    timestamp: Instant,
+    id: u64,
+    data: Vec<u8>,
+}
+
+impl PartialEq for Packet {
+    fn eq(&self, other: &Self) -> bool {
+        (other.timestamp, other.id) == (self.timestamp, self.id)
+    }
+}
+
+impl Eq for Packet {}
+
+impl PartialOrd for Packet {
+    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
+        Some(self.cmp(other))
+    }
+}
+
+impl Ord for Packet {
+    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
+        (other.timestamp, other.id).cmp(&(self.timestamp, self.id))
+    }
+}
+
+#[derive(Clone)]
+struct QueueParams {
+    latency: Duration,
+    loss: f64,
+}
+
+struct PacketQueue {
+    queue: BinaryHeap<Packet>,
+    next_id: u64,
+    params: QueueParams,
+    rng: ChaCha20Rng,
+}
+
+impl PacketQueue {
+    pub fn new(params: QueueParams, seed: u64) -> Self {
+        Self {
+            queue: BinaryHeap::new(),
+            next_id: 0,
+            params,
+            rng: ChaCha20Rng::seed_from_u64(seed),
+        }
+    }
+
+    pub fn next_expiration(&self) -> Instant {
+        self.queue
+            .peek()
+            .map(|p| p.timestamp)
+            .unwrap_or(Instant::from_micros(i64::MAX))
+    }
+
+    pub fn push(&mut self, data: Vec<u8>, timestamp: Instant) {
+        if self.rng.gen::<f64>() < self.params.loss {
+            log::info!("PACKET LOST!");
+            return;
+        }
+
+        self.queue.push(Packet {
+            data,
+            id: self.next_id,
+            timestamp: timestamp + self.params.latency,
+        });
+        self.next_id += 1;
+    }
+
+    pub fn pop(&mut self, timestamp: Instant) -> Option<Vec<u8>> {
+        let p = self.queue.peek()?;
+        if p.timestamp > timestamp {
+            return None;
+        }
+        Some(self.queue.pop().unwrap().data)
+    }
+}
+
+pub struct QueueDevice<'a> {
+    tx_queue: &'a RefCell<PacketQueue>,
+    rx_queue: &'a RefCell<PacketQueue>,
+    medium: Medium,
+}
+
+impl<'a> QueueDevice<'a> {
+    fn new(
+        tx_queue: &'a RefCell<PacketQueue>,
+        rx_queue: &'a RefCell<PacketQueue>,
+        medium: Medium,
+    ) -> Self {
+        Self {
+            tx_queue,
+            rx_queue,
+            medium,
+        }
+    }
+}
+
+impl Device for QueueDevice<'_> {
+    type RxToken<'a>
+        = RxToken
+    where
+        Self: 'a;
+    type TxToken<'a>
+        = TxToken<'a>
+    where
+        Self: 'a;
+
+    fn capabilities(&self) -> DeviceCapabilities {
+        let mut caps = DeviceCapabilities::default();
+        caps.max_transmission_unit = 1514;
+        caps.medium = self.medium;
+        caps.checksum = ChecksumCapabilities::ignored();
+        caps
+    }
+
+    fn receive(&mut self, timestamp: Instant) -> Option<(Self::RxToken<'_>, Self::TxToken<'_>)> {
+        self.rx_queue
+            .borrow_mut()
+            .pop(timestamp)
+            .map(move |buffer| {
+                let rx = RxToken { buffer };
+                let tx = TxToken {
+                    queue: self.tx_queue,
+                    timestamp,
+                };
+                (rx, tx)
+            })
+    }
+
+    fn transmit(&mut self, timestamp: Instant) -> Option<Self::TxToken<'_>> {
+        Some(TxToken {
+            queue: self.tx_queue,
+            timestamp,
+        })
+    }
+}
+
+pub struct RxToken {
+    buffer: Vec<u8>,
+}
+
+impl phy::RxToken for RxToken {
+    fn consume<R, F>(self, f: F) -> R
+    where
+        F: FnOnce(&[u8]) -> R,
+    {
+        f(&self.buffer)
+    }
+}
+
+pub struct TxToken<'a> {
+    queue: &'a RefCell<PacketQueue>,
+    timestamp: Instant,
+}
+
+impl<'a> phy::TxToken for TxToken<'a> {
+    fn consume<R, F>(self, len: usize, f: F) -> R
+    where
+        F: FnOnce(&mut [u8]) -> R,
+    {
+        let mut buffer = vec![0; len];
+        let result = f(&mut buffer);
+        self.queue.borrow_mut().push(buffer, self.timestamp);
+        result
+    }
+}
+
+pub fn setup_logging() {
+    env_logger::Builder::new()
+        .format(move |buf, record| {
+            let (elapsed, side) = *CLOCK.lock().unwrap();
+
+            let timestamp = format!("[{elapsed} {side}]");
+            if record.target().starts_with("smoltcp::") {
+                writeln!(
+                    buf,
+                    "{} ({}): {}",
+                    timestamp,
+                    record.target().replace("smoltcp::", ""),
+                    record.args()
+                )
+            } else if record.level() == log::Level::Trace {
+                let message = format!("{}", record.args());
+                writeln!(
+                    buf,
+                    "{} {}",
+                    timestamp,
+                    message.replace('\n', "\n             ")
+                )
+            } else {
+                writeln!(
+                    buf,
+                    "{} ({}): {}",
+                    timestamp,
+                    record.target(),
+                    record.args()
+                )
+            }
+        })
+        .parse_env("RUST_LOG")
+        .init();
+}

+ 15 - 0
tests/snapshots/netsim__netsim.snap

@@ -0,0 +1,15 @@
+---
+source: tests/netsim.rs
+expression: s
+snapshot_kind: text
+---
+buf\loss    0.000     0.001     0.010     0.020     0.050     0.100     0.200     0.300 
+    128   1279.98   1255.76   1054.15    886.36    538.66    227.84     33.99      7.18
+    256   2559.91   2507.27   2100.03   1770.30   1070.71    468.24     66.71     14.35
+    512   5119.63   5011.95   4172.36   3531.57   2098.73    942.38    144.73     29.45
+   1024  10238.50  10023.19   8340.90   7084.25   4003.34   1869.94    290.74     60.92
+   2048  17535.11  17171.82  14093.50  12063.90   7205.27   3379.12    824.76    131.54
+   4096  35062.41  33852.31  27011.08  22073.09  13680.70   7631.11   1617.81    302.65
+   8192  77374.28  72409.99  58428.68  48310.75  29123.30  14314.36   2880.39    551.60
+  16384 161842.28 159448.56 141467.31 127073.06  78239.08  38637.20   7565.64   1112.31
+  32768 322944.88 314313.90 266384.37 245985.29 138762.29  83162.99  10739.10   1951.95