瀏覽代碼

Merge pull request #404 from smoltcp-rs/delayed-ack

tcp: add Delayed ACK
Dario Nieuwenhuis 4 年之前
父節點
當前提交
e8fe5cdd1b
共有 2 個文件被更改,包括 192 次插入16 次删除
  1. 0 10
      src/socket/mod.rs
  2. 192 6
      src/socket/tcp.rs

+ 0 - 10
src/socket/mod.rs

@@ -71,16 +71,6 @@ pub(crate) enum PollAt {
     Ingress,
 }
 
-impl PollAt {
-    #[cfg(feature = "socket-tcp")]
-    fn is_ingress(&self) -> bool {
-        match *self {
-            PollAt::Ingress => true,
-            _ => false,
-        }
-    }
-}
-
 /// A network socket.
 ///
 /// This enumeration abstracts the various types of sockets based on the IP protocol.

+ 192 - 6
src/socket/tcp.rs

@@ -162,6 +162,7 @@ enum Timer {
     }
 }
 
+const ACK_DELAY_DEFAULT: Duration = Duration { millis: 10 };
 const CLOSE_DELAY:      Duration = Duration { millis: 10_000 };
 
 impl Default for Timer {
@@ -341,6 +342,12 @@ pub struct TcpSocket<'a> {
     /// each other which have the same ACK number.
     local_rx_dup_acks: u8,
 
+    /// Duration for Delayed ACK. If None no ACKs will be delayed.
+    ack_delay:       Option<Duration>,
+    /// Delayed ack timer. If set, packets containing exclusively
+    /// ACK or window updates (ie, no data) won't be sent until expiry.
+    ack_delay_until: Option<Instant>,
+
     #[cfg(feature = "async")]
     rx_waker: WakerRegistration,
     #[cfg(feature = "async")]
@@ -397,6 +404,8 @@ impl<'a> TcpSocket<'a> {
             local_rx_last_ack: None,
             local_rx_last_seq: None,
             local_rx_dup_acks: 0,
+            ack_delay:       Some(ACK_DELAY_DEFAULT),
+            ack_delay_until: None,
 
             #[cfg(feature = "async")]
             rx_waker: WakerRegistration::new(),
@@ -453,6 +462,13 @@ impl<'a> TcpSocket<'a> {
         self.timeout
     }
 
+    /// Return the ACK delay duration.
+    ///
+    /// See also the [set_ack_delay](#method.set_ack_delay) method.
+    pub fn ack_delay(&self) -> Option<Duration> {
+        self.ack_delay
+    }
+
     /// Return the current window field value, including scaling according to RFC 1323.
     ///
     /// Used in internal calculations as well as packet generation.
@@ -478,6 +494,13 @@ impl<'a> TcpSocket<'a> {
         self.timeout = duration
     }
 
+    /// Set the ACK delay duration.
+    ///
+    /// By default, the ACK delay is set to 10ms.
+    pub fn set_ack_delay(&mut self, duration: Option<Duration>) {
+        self.ack_delay = duration
+    }
+
     /// Return the keep-alive interval.
     ///
     /// See also the [set_keep_alive](#method.set_keep_alive) method.
@@ -578,6 +601,8 @@ impl<'a> TcpSocket<'a> {
         self.remote_win_shift = rx_cap_log2.saturating_sub(16) as u8;
         self.remote_mss      = DEFAULT_MSS;
         self.remote_last_ts  = None;
+        self.ack_delay       = Some(ACK_DELAY_DEFAULT);
+        self.ack_delay_until = None;
 
         #[cfg(feature = "async")]
         {
@@ -1541,6 +1566,30 @@ impl<'a> TcpSocket<'a> {
                        self.assembler);
         }
 
+        // Handle delayed acks
+        if let Some(ack_delay) = self.ack_delay {
+            if self.ack_to_transmit() || self.window_to_update() {
+                self.ack_delay_until = match self.ack_delay_until {
+                    None => {
+                        net_trace!("{}:{}:{}: starting delayed ack timer",
+                            self.meta.handle, self.local_endpoint, self.remote_endpoint
+                        );
+
+                        Some(timestamp + ack_delay)
+                    }
+                    // RFC1122 says "in a stream of full-sized segments there SHOULD be an ACK
+                    // for at least every second segment".
+                    // For now, we send an ACK every second received packet, full-sized or not.
+                    Some(_) => {
+                        net_trace!("{}:{}:{}: delayed ack timer already started, forcing expiry",
+                            self.meta.handle, self.local_endpoint, self.remote_endpoint
+                        );
+                        None
+                    }
+                };
+            }
+        }
+
         // Per RFC 5681, we should send an immediate ACK when either:
         //  1) an out-of-order segment is received, or
         //  2) a segment arrives that fills in all or part of a gap in sequence space.
@@ -1590,6 +1639,13 @@ impl<'a> TcpSocket<'a> {
         can_data || can_fin
     }
 
+    fn delayed_ack_expired(&self, timestamp: Instant) -> bool {
+        match self.ack_delay_until {
+            None => true,
+            Some(t) => t <= timestamp,
+        }
+    }
+
     fn ack_to_transmit(&self) -> bool {
         if let Some(remote_last_ack) = self.remote_last_ack {
             remote_last_ack < self.remote_seq_no + self.rx_buffer.len()
@@ -1644,11 +1700,11 @@ impl<'a> TcpSocket<'a> {
             // If we have data to transmit and it fits into partner's window, do it.
             net_trace!("{}:{}:{}: outgoing segment will send data or flags",
                        self.meta.handle, self.local_endpoint, self.remote_endpoint);
-        } else if self.ack_to_transmit() {
+        } else if self.ack_to_transmit() && self.delayed_ack_expired(timestamp) {
             // If we have data to acknowledge, do it.
             net_trace!("{}:{}:{}: outgoing segment will acknowledge",
                        self.meta.handle, self.local_endpoint, self.remote_endpoint);
-        } else if self.window_to_update() {
+        } else if self.window_to_update() && self.delayed_ack_expired(timestamp) {
             // If we have window length increase to advertise, do it.
             net_trace!("{}:{}:{}: outgoing segment will update window",
                        self.meta.handle, self.local_endpoint, self.remote_endpoint);
@@ -1812,6 +1868,15 @@ impl<'a> TcpSocket<'a> {
         // the keep-alive timer.
         self.timer.rewind_keep_alive(timestamp, self.keep_alive);
 
+        // Reset delayed-ack timer
+        if self.ack_delay_until.is_some() {
+            net_trace!("{}:{}:{}: stop delayed ack timer",
+                self.meta.handle, self.local_endpoint, self.remote_endpoint
+            );
+
+            self.ack_delay_until = None;
+        }
+
         // Leave the rest of the state intact if sending a keep-alive packet, since those
         // carry a fake segment.
         if is_keep_alive { return Ok(()) }
@@ -1851,10 +1916,17 @@ impl<'a> TcpSocket<'a> {
         } else if self.state == State::Closed {
             // Socket was aborted, we have an RST packet to transmit.
             PollAt::Now
-        } else if self.seq_to_transmit() || self.ack_to_transmit() || self.window_to_update() {
+        } else if self.seq_to_transmit() {
             // We have a data or flag packet to transmit.
             PollAt::Now
         } else {
+            let want_ack = self.ack_to_transmit() || self.window_to_update();
+            let delayed_ack_poll_at = match (want_ack, self.ack_delay_until) {
+                (false, _) => PollAt::Ingress,
+                (true, None) => PollAt::Now,
+                (true, Some(t)) => PollAt::Time(t),
+            };
+
             let timeout_poll_at = match (self.remote_last_ts, self.timeout) {
                 // If we're transmitting or retransmitting data, we need to poll at the moment
                 // when the timeout would expire.
@@ -1864,9 +1936,8 @@ impl<'a> TcpSocket<'a> {
             };
 
             // We wait for the earliest of our timers to fire.
-            *[self.timer.poll_at(), timeout_poll_at]
+            *[self.timer.poll_at(), timeout_poll_at, delayed_ack_poll_at]
                 .iter()
-                .filter(|x| !x.is_ingress())
                 .min().unwrap_or(&PollAt::Ingress)
         }
     }
@@ -2076,7 +2147,9 @@ mod test {
 
         let rx_buffer = SocketBuffer::new(vec![0; rx_len]);
         let tx_buffer = SocketBuffer::new(vec![0; tx_len]);
-        TcpSocket::new(rx_buffer, tx_buffer)
+        let mut socket = TcpSocket::new(rx_buffer, tx_buffer);
+        socket.set_ack_delay(None);
+        socket
     }
 
     fn socket_syn_received_with_buffer_sizes(
@@ -5084,6 +5157,119 @@ mod test {
         assert_eq!(s.recv(|_| (0, ())), Err(Error::Illegal));
     }
 
+    // =========================================================================================//
+    // Tests for delayed ACK
+    // =========================================================================================//
+
+    #[test]
+    fn test_delayed_ack() {
+        let mut s = socket_established();
+        s.set_ack_delay(Some(ACK_DELAY_DEFAULT));
+        send!(s, TcpRepr {
+            seq_number: REMOTE_SEQ + 1,
+            ack_number: Some(LOCAL_SEQ + 1),
+            payload:    &b"abc"[..],
+            ..SEND_TEMPL
+        });
+
+        // No ACK is immediately sent.
+        recv!(s, Err(Error::Exhausted));
+
+        // After 10ms, it is sent.
+        recv!(s, time 11, Ok(TcpRepr {
+            seq_number: LOCAL_SEQ + 1,
+            ack_number: Some(REMOTE_SEQ + 1 + 3),
+            window_len: 61,
+            ..RECV_TEMPL
+        }));
+    }
+
+    #[test]
+    fn test_delayed_ack_win() {
+        let mut s = socket_established();
+        s.set_ack_delay(Some(ACK_DELAY_DEFAULT));
+        send!(s, TcpRepr {
+            seq_number: REMOTE_SEQ + 1,
+            ack_number: Some(LOCAL_SEQ + 1),
+            payload:    &b"abc"[..],
+            ..SEND_TEMPL
+        });
+
+        // Reading the data off the buffer should cause a window update.
+        s.recv(|data| {
+            assert_eq!(data, b"abc");
+            (3, ())
+        }).unwrap();
+
+        // However, no ACK or window update is immediately sent.
+        recv!(s, Err(Error::Exhausted));
+
+        // After 10ms, it is sent.
+        recv!(s, time 11, Ok(TcpRepr {
+            seq_number: LOCAL_SEQ + 1,
+            ack_number: Some(REMOTE_SEQ + 1 + 3),
+            ..RECV_TEMPL
+        }));
+    }
+
+    #[test]
+    fn test_delayed_ack_reply() {
+        let mut s = socket_established();
+        s.set_ack_delay(Some(ACK_DELAY_DEFAULT));
+        send!(s, TcpRepr {
+            seq_number: REMOTE_SEQ + 1,
+            ack_number: Some(LOCAL_SEQ + 1),
+            payload:    &b"abc"[..],
+            ..SEND_TEMPL
+        });
+
+        s.recv(|data| {
+            assert_eq!(data, b"abc");
+            (3, ())
+        }).unwrap();
+
+        s.send_slice(&b"xyz"[..]).unwrap();
+
+        // Writing data to the socket causes ACK to not be delayed,
+        // because it is immediately sent with the data.
+        recv!(s, Ok(TcpRepr {
+            seq_number: LOCAL_SEQ + 1,
+            ack_number: Some(REMOTE_SEQ + 1 + 3),
+            payload:    &b"xyz"[..],
+            ..RECV_TEMPL
+        }));
+    }
+
+    #[test]
+    fn test_delayed_ack_every_second_packet() {
+        let mut s = socket_established();
+        s.set_ack_delay(Some(ACK_DELAY_DEFAULT));
+        send!(s, TcpRepr {
+            seq_number: REMOTE_SEQ + 1,
+            ack_number: Some(LOCAL_SEQ + 1),
+            payload:    &b"abc"[..],
+            ..SEND_TEMPL
+        });
+
+        // No ACK is immediately sent.
+        recv!(s, Err(Error::Exhausted));
+
+        send!(s, TcpRepr {
+            seq_number: REMOTE_SEQ + 1 + 3,
+            ack_number: Some(LOCAL_SEQ + 1),
+            payload:    &b"def"[..],
+            ..SEND_TEMPL
+        });
+
+        // Every 2nd packet, ACK is sent without delay.
+        recv!(s, Ok(TcpRepr {
+            seq_number: LOCAL_SEQ + 1,
+            ack_number: Some(REMOTE_SEQ + 1 + 6),
+            window_len: 58,
+            ..RECV_TEMPL
+        }));
+    }
+
     // =========================================================================================//
     // Tests for packet filtering.
     // =========================================================================================//