Эх сурвалжийг харах

tcp: add zero window probe support.

Dario Nieuwenhuis 5 сар өмнө
parent
commit
ddc495ba7c
1 өөрчлөгдсөн 399 нэмэгдсэн , 9 устгасан
  1. 399 9
      src/socket/tcp.rs

+ 399 - 9
src/socket/tcp.rs

@@ -277,10 +277,20 @@ impl RttEstimator {
 #[derive(Debug, Clone, Copy, PartialEq)]
 #[cfg_attr(feature = "defmt", derive(defmt::Format))]
 enum Timer {
-    Idle { keep_alive_at: Option<Instant> },
-    Retransmit { expires_at: Instant },
+    Idle {
+        keep_alive_at: Option<Instant>,
+    },
+    Retransmit {
+        expires_at: Instant,
+    },
     FastRetransmit,
-    Close { expires_at: Instant },
+    ZeroWindowProbe {
+        expires_at: Instant,
+        delay: Duration,
+    },
+    Close {
+        expires_at: Instant,
+    },
 }
 
 const ACK_DELAY_DEFAULT: Duration = Duration::from_millis(10);
@@ -317,6 +327,13 @@ impl Timer {
         }
     }
 
+    fn should_zero_window_probe(&self, timestamp: Instant) -> bool {
+        match *self {
+            Timer::ZeroWindowProbe { expires_at, .. } if timestamp >= expires_at => true,
+            _ => false,
+        }
+    }
+
     fn poll_at(&self) -> PollAt {
         match *self {
             Timer::Idle {
@@ -325,6 +342,7 @@ impl Timer {
             Timer::Idle {
                 keep_alive_at: None,
             } => PollAt::Ingress,
+            Timer::ZeroWindowProbe { expires_at, .. } => PollAt::Time(expires_at),
             Timer::Retransmit { expires_at, .. } => PollAt::Time(expires_at),
             Timer::FastRetransmit => PollAt::Now,
             Timer::Close { expires_at } => PollAt::Time(expires_at),
@@ -353,7 +371,10 @@ impl Timer {
 
     fn set_for_retransmit(&mut self, timestamp: Instant, delay: Duration) {
         match *self {
-            Timer::Idle { .. } | Timer::FastRetransmit { .. } | Timer::Retransmit { .. } => {
+            Timer::Idle { .. }
+            | Timer::FastRetransmit { .. }
+            | Timer::Retransmit { .. }
+            | Timer::ZeroWindowProbe { .. } => {
                 *self = Timer::Retransmit {
                     expires_at: timestamp + delay,
                 }
@@ -372,12 +393,34 @@ impl Timer {
         }
     }
 
-    fn is_retransmit(&self) -> bool {
-        match *self {
-            Timer::Retransmit { .. } | Timer::FastRetransmit => true,
-            _ => false,
+    fn set_for_zero_window_probe(&mut self, timestamp: Instant, delay: Duration) {
+        *self = Timer::ZeroWindowProbe {
+            expires_at: timestamp + delay,
+            delay,
+        }
+    }
+
+    fn rewind_zero_window_probe(&mut self, timestamp: Instant) {
+        if let Timer::ZeroWindowProbe { mut delay, .. } = *self {
+            delay = (delay * 2).min(Duration::from_millis(RTTE_MAX_RTO as _));
+            *self = Timer::ZeroWindowProbe {
+                expires_at: timestamp + delay,
+                delay,
+            }
         }
     }
+
+    fn is_idle(&self) -> bool {
+        matches!(self, Timer::Idle { .. })
+    }
+
+    fn is_zero_window_probe(&self) -> bool {
+        matches!(self, Timer::ZeroWindowProbe { .. })
+    }
+
+    fn is_retransmit(&self) -> bool {
+        matches!(self, Timer::Retransmit { .. } | Timer::FastRetransmit)
+    }
 }
 
 #[derive(Debug, PartialEq, Eq, Clone, Copy)]
@@ -1182,6 +1225,17 @@ impl<'a> Socket<'a> {
                 self.remote_last_ts = None
             }
 
+            // if remote win is zero and we go from having no data to some data pending to
+            // send, start the zero window probe timer.
+            if self.remote_win_len == 0 && self.timer.is_idle() {
+                let delay = self.rtte.retransmission_timeout();
+                tcp_trace!("starting zero-window-probe timer for t+{}", delay);
+
+                // We don't have access to the current time here, so use Instant::ZERO instead.
+                // this will cause the first ZWP to be sent immediately, but that's okay.
+                self.timer.set_for_zero_window_probe(Instant::ZERO, delay);
+            }
+
             #[cfg(any(test, feature = "verbose"))]
             tcp_trace!(
                 "tx buffer: enqueueing {} octets (now {})",
@@ -2032,6 +2086,20 @@ impl<'a> Socket<'a> {
             _ => {}
         }
 
+        // start/stop the Zero Window Probe timer.
+        if self.remote_win_len == 0
+            && !self.tx_buffer.is_empty()
+            && (self.timer.is_idle() || ack_len > 0)
+        {
+            let delay = self.rtte.retransmission_timeout();
+            tcp_trace!("starting zero-window-probe timer for t+{}", delay);
+            self.timer.set_for_zero_window_probe(cx.now(), delay);
+        }
+        if self.remote_win_len != 0 && self.timer.is_zero_window_probe() {
+            tcp_trace!("stopping zero-window-probe timer");
+            self.timer.set_for_idle(cx.now(), self.keep_alive);
+        }
+
         let payload_len = payload.len();
         if payload_len == 0 {
             return None;
@@ -2318,6 +2386,8 @@ impl<'a> Socket<'a> {
         } else if self.timer.should_keep_alive(cx.now()) {
             // If we need to transmit a keep-alive packet, do it.
             tcp_trace!("keep-alive timer expired");
+        } else if self.timer.should_zero_window_probe(cx.now()) {
+            tcp_trace!("sending zero-window probe");
         } else if self.timer.should_close(cx.now()) {
             // If we have spent enough time in the TIME-WAIT state, close the socket.
             tcp_trace!("TIME-WAIT timer expired");
@@ -2360,6 +2430,8 @@ impl<'a> Socket<'a> {
             payload: &[],
         };
 
+        let mut is_zero_window_probe = false;
+
         match self.state {
             // We transmit an RST in the CLOSED state. If we ended up in the CLOSED state
             // with a specified endpoint, it means that the socket was aborted.
@@ -2401,7 +2473,7 @@ impl<'a> Socket<'a> {
                 let win_right_edge = self.local_seq_no + self.remote_win_len;
 
                 // Max amount of octets we're allowed to send according to the remote window.
-                let win_limit = if win_right_edge >= self.remote_last_seq {
+                let mut win_limit = if win_right_edge >= self.remote_last_seq {
                     win_right_edge - self.remote_last_seq
                 } else {
                     // This can happen if we've sent some data and later the remote side
@@ -2412,6 +2484,12 @@ impl<'a> Socket<'a> {
                     0
                 };
 
+                // To send a zero-window-probe, force the window limit to at least 1 byte.
+                if win_limit == 0 && self.timer.should_zero_window_probe(cx.now()) {
+                    win_limit = 1;
+                    is_zero_window_probe = true;
+                }
+
                 // Maximum size we're allowed to send. This can be limited by 3 factors:
                 // 1. remote window
                 // 2. MSS the remote is willing to accept, probably determined by their MTU
@@ -2510,6 +2588,12 @@ impl<'a> Socket<'a> {
         }
         self.ack_delay_timer = AckDelayTimer::Idle;
 
+        // Leave the rest of the state intact if sending a zero-window probe.
+        if is_zero_window_probe {
+            self.timer.rewind_zero_window_probe(cx.now());
+            return Ok(());
+        }
+
         // Leave the rest of the state intact if sending a keep-alive packet, since those
         // carry a fake segment.
         if is_keep_alive {
@@ -7068,6 +7152,312 @@ mod test {
         assert!(s.window_to_update());
     }
 
+    // =========================================================================================//
+    // Tests for zero-window probes.
+    // =========================================================================================//
+
+    #[test]
+    fn test_zero_window_probe_enter_on_win_update() {
+        let mut s = socket_established();
+
+        assert!(!s.timer.is_zero_window_probe());
+
+        s.send_slice(b"abcdef123456!@#$%^").unwrap();
+
+        assert!(!s.timer.is_zero_window_probe());
+
+        send!(
+            s,
+            TcpRepr {
+                seq_number: REMOTE_SEQ + 1,
+                ack_number: Some(LOCAL_SEQ + 1),
+                window_len: 0,
+                ..SEND_TEMPL
+            }
+        );
+
+        assert!(s.timer.is_zero_window_probe());
+    }
+
+    #[test]
+    fn test_zero_window_probe_enter_on_send() {
+        let mut s = socket_established();
+
+        send!(
+            s,
+            TcpRepr {
+                seq_number: REMOTE_SEQ + 1,
+                ack_number: Some(LOCAL_SEQ + 1),
+                window_len: 0,
+                ..SEND_TEMPL
+            }
+        );
+
+        assert!(!s.timer.is_zero_window_probe());
+
+        s.send_slice(b"abcdef123456!@#$%^").unwrap();
+
+        assert!(s.timer.is_zero_window_probe());
+    }
+
+    #[test]
+    fn test_zero_window_probe_exit() {
+        let mut s = socket_established();
+
+        s.send_slice(b"abcdef123456!@#$%^").unwrap();
+
+        assert!(!s.timer.is_zero_window_probe());
+
+        send!(
+            s,
+            TcpRepr {
+                seq_number: REMOTE_SEQ + 1,
+                ack_number: Some(LOCAL_SEQ + 1),
+                window_len: 0,
+                ..SEND_TEMPL
+            }
+        );
+
+        assert!(s.timer.is_zero_window_probe());
+
+        send!(
+            s,
+            TcpRepr {
+                seq_number: REMOTE_SEQ + 1,
+                ack_number: Some(LOCAL_SEQ + 1),
+                window_len: 6,
+                ..SEND_TEMPL
+            }
+        );
+
+        assert!(!s.timer.is_zero_window_probe());
+    }
+
+    #[test]
+    fn test_zero_window_probe_exit_ack() {
+        let mut s = socket_established();
+
+        s.send_slice(b"abcdef123456!@#$%^").unwrap();
+        send!(
+            s,
+            TcpRepr {
+                seq_number: REMOTE_SEQ + 1,
+                ack_number: Some(LOCAL_SEQ + 1),
+                window_len: 0,
+                ..SEND_TEMPL
+            }
+        );
+
+        recv!(
+            s,
+            time 1000,
+            [TcpRepr {
+                seq_number: LOCAL_SEQ + 1,
+                ack_number: Some(REMOTE_SEQ + 1),
+                payload: &b"a"[..],
+                ..RECV_TEMPL
+            }]
+        );
+
+        send!(
+            s,
+            time 1010,
+            TcpRepr {
+                seq_number: REMOTE_SEQ + 1,
+                ack_number: Some(LOCAL_SEQ + 2),
+                window_len: 6,
+                ..SEND_TEMPL
+            }
+        );
+
+        recv!(
+            s,
+            time 1010,
+            [TcpRepr {
+                seq_number: LOCAL_SEQ + 2,
+                ack_number: Some(REMOTE_SEQ + 1),
+                payload: &b"bcdef1"[..],
+                ..RECV_TEMPL
+            }]
+        );
+    }
+
+    #[test]
+    fn test_zero_window_probe_backoff_nack_reply() {
+        let mut s = socket_established();
+        s.send_slice(b"abcdef123456!@#$%^").unwrap();
+        send!(
+            s,
+            TcpRepr {
+                seq_number: REMOTE_SEQ + 1,
+                ack_number: Some(LOCAL_SEQ + 1),
+                window_len: 0,
+                ..SEND_TEMPL
+            }
+        );
+
+        recv_nothing!(s, time 999);
+        recv!(
+            s,
+            time 1000,
+            [TcpRepr {
+                seq_number: LOCAL_SEQ + 1,
+                ack_number: Some(REMOTE_SEQ + 1),
+                payload: &b"a"[..],
+                ..RECV_TEMPL
+            }]
+        );
+        send!(
+            s,
+            time 1100,
+            TcpRepr {
+                seq_number: REMOTE_SEQ + 1,
+                ack_number: Some(LOCAL_SEQ + 1),
+                window_len: 0,
+                ..SEND_TEMPL
+            }
+        );
+
+        recv_nothing!(s, time 2999);
+        recv!(
+            s,
+            time 3000,
+            [TcpRepr {
+                seq_number: LOCAL_SEQ + 1,
+                ack_number: Some(REMOTE_SEQ + 1),
+                payload: &b"a"[..],
+                ..RECV_TEMPL
+            }]
+        );
+        send!(
+            s,
+            time 3100,
+            TcpRepr {
+                seq_number: REMOTE_SEQ + 1,
+                ack_number: Some(LOCAL_SEQ + 1),
+                window_len: 0,
+                ..SEND_TEMPL
+            }
+        );
+
+        recv_nothing!(s, time 6999);
+        recv!(
+            s,
+            time 7000,
+            [TcpRepr {
+                seq_number: LOCAL_SEQ + 1,
+                ack_number: Some(REMOTE_SEQ + 1),
+                payload: &b"a"[..],
+                ..RECV_TEMPL
+            }]
+        );
+    }
+
+    #[test]
+    fn test_zero_window_probe_backoff_no_reply() {
+        let mut s = socket_established();
+        s.send_slice(b"abcdef123456!@#$%^").unwrap();
+        send!(
+            s,
+            TcpRepr {
+                seq_number: REMOTE_SEQ + 1,
+                ack_number: Some(LOCAL_SEQ + 1),
+                window_len: 0,
+                ..SEND_TEMPL
+            }
+        );
+
+        recv_nothing!(s, time 999);
+        recv!(
+            s,
+            time 1000,
+            [TcpRepr {
+                seq_number: LOCAL_SEQ + 1,
+                ack_number: Some(REMOTE_SEQ + 1),
+                payload: &b"a"[..],
+                ..RECV_TEMPL
+            }]
+        );
+
+        recv_nothing!(s, time 2999);
+        recv!(
+            s,
+            time 3000,
+            [TcpRepr {
+                seq_number: LOCAL_SEQ + 1,
+                ack_number: Some(REMOTE_SEQ + 1),
+                payload: &b"a"[..],
+                ..RECV_TEMPL
+            }]
+        );
+    }
+
+    #[test]
+    fn test_zero_window_probe_shift() {
+        let mut s = socket_established();
+
+        s.send_slice(b"abcdef123456!@#$%^").unwrap();
+        send!(
+            s,
+            TcpRepr {
+                seq_number: REMOTE_SEQ + 1,
+                ack_number: Some(LOCAL_SEQ + 1),
+                window_len: 0,
+                ..SEND_TEMPL
+            }
+        );
+
+        recv_nothing!(s, time 999);
+        recv!(
+            s,
+            time 1000,
+            [TcpRepr {
+                seq_number: LOCAL_SEQ + 1,
+                ack_number: Some(REMOTE_SEQ + 1),
+                payload: &b"a"[..],
+                ..RECV_TEMPL
+            }]
+        );
+
+        recv_nothing!(s, time 2999);
+        recv!(
+            s,
+            time 3000,
+            [TcpRepr {
+                seq_number: LOCAL_SEQ + 1,
+                ack_number: Some(REMOTE_SEQ + 1),
+                payload: &b"a"[..],
+                ..RECV_TEMPL
+            }]
+        );
+
+        // ack the ZWP byte, but still advertise zero window.
+        // this should restart the ZWP timer.
+        send!(
+            s,
+            time 3100,
+            TcpRepr {
+                seq_number: REMOTE_SEQ + 1,
+                ack_number: Some(LOCAL_SEQ + 2),
+                window_len: 0,
+                ..SEND_TEMPL
+            }
+        );
+
+        // ZWP should be sent at 3100+1000 = 4100
+        recv_nothing!(s, time 4099);
+        recv!(
+            s,
+            time 4100,
+            [TcpRepr {
+                seq_number: LOCAL_SEQ + 2,
+                ack_number: Some(REMOTE_SEQ + 1),
+                payload: &b"b"[..],
+                ..RECV_TEMPL
+            }]
+        );
+    }
+
     // =========================================================================================//
     // Tests for timeouts.
     // =========================================================================================//