Эх сурвалжийг харах

Fully implement TCP Window Scaling

Closes: #253
Approved by: whitequark
jhwgh1968 6 жил өмнө
parent
commit
92e970b379
3 өөрчлөгдсөн 189 нэмэгдсэн , 12 устгасан
  1. 1 1
      README.md
  2. 1 0
      benches/bench.rs
  3. 187 11
      src/socket/tcp.rs

+ 1 - 1
README.md

@@ -99,13 +99,13 @@ The TCP protocol is supported over IPv4, and server and client TCP sockets are a
 
   * Header checksum is generated and validated.
   * Maximum segment size is negotiated.
+  * Window scaling is negotiated.
   * Multiple packets are transmitted without waiting for an acknowledgement.
   * Reassembly of out-of-order segments is supported, with no more than 4 gaps in sequence space.
   * Keep-alive packets may be sent at a configurable interval.
   * Retransmission timeout starts at a fixed interval of 100 ms and doubles every time.
   * Time-wait timeout has a fixed interval of 10 s.
   * User timeout has a configurable interval.
-  * Window scaling is **not** supported, and the maximum buffer size is 65536.
   * Selective acknowledgements are **not** implemented.
   * Delayed acknowledgements are **not** implemented.
   * Silly window syndrome avoidance is **not** implemented.

+ 1 - 0
benches/bench.rs

@@ -39,6 +39,7 @@ mod wire {
             window_len:   0x0123,
             control:      TcpControl::Syn,
             max_seg_size: None,
+            window_scale: None,
             payload:      &PAYLOAD_BYTES
         };
         let mut bytes = vec![0xa5; repr.buffer_len()];

+ 187 - 11
src/socket/tcp.rs

@@ -2,7 +2,7 @@
 // the parts of RFC 1122 that discuss TCP. Consult RFC 7414 when implementing
 // a new feature.
 
-use core::{cmp, fmt};
+use core::{cmp, fmt, mem};
 
 use {Error, Result};
 use phy::DeviceCapabilities;
@@ -226,6 +226,9 @@ pub struct TcpSocket<'a> {
     remote_last_ack: Option<TcpSeqNumber>,
     /// The last window length sent.
     remote_last_win: u16,
+    /// The sending window scaling factor advertised to remotes which support RFC 1323.
+    /// It is zero if the window <= 64KiB and/or the remote does not support it.
+    remote_win_shift: u8,
     /// The speculative remote window size.
     /// I.e. the actual remote window size minus the count of in-flight octets.
     remote_win_len:  usize,
@@ -245,14 +248,22 @@ pub struct TcpSocket<'a> {
 const DEFAULT_MSS: usize = 536;
 
 impl<'a> TcpSocket<'a> {
+    #[allow(unused_comparisons)] // small usize platforms always pass rx_capacity check
     /// Create a socket using the given buffers.
     pub fn new<T>(rx_buffer: T, tx_buffer: T) -> TcpSocket<'a>
             where T: Into<SocketBuffer<'a>> {
         let (rx_buffer, tx_buffer) = (rx_buffer.into(), tx_buffer.into());
-        if rx_buffer.capacity() > <u16>::max_value() as usize {
-            panic!("buffers larger than {} require window scaling, which is not implemented",
-                   <u16>::max_value())
+        let rx_capacity = rx_buffer.capacity();
+
+        // From RFC 1323:
+        // [...] the above constraints imply that 2 * the max window size must be less
+        // than 2**31 [...] Thus, the shift count must be limited to 14 (which allows
+        // windows of 2**30 = 1 Gbyte).
+        if rx_capacity > (1 << 30) {
+            panic!("receiving buffer too large, cannot exceed 1 GiB")
         }
+        let rx_cap_log2 = mem::size_of::<usize>() * 8 -
+            rx_capacity.leading_zeros() as usize;
 
         TcpSocket {
             meta:            SocketMeta::default(),
@@ -273,6 +284,7 @@ impl<'a> TcpSocket<'a> {
             remote_last_ack: None,
             remote_last_win: 0,
             remote_win_len:  0,
+            remote_win_shift: rx_cap_log2.saturating_sub(16) as u8,
             remote_win_scale: None,
             remote_mss:      DEFAULT_MSS,
             remote_last_ts:  None,
@@ -294,6 +306,16 @@ impl<'a> TcpSocket<'a> {
         self.timeout
     }
 
+    /// Return the current window field value, including scaling according to RFC 1323.
+    ///
+    /// Used in internal calculations as well as packet generation.
+    ///
+    #[inline]
+    fn scaled_window(&self) -> u16 {
+        cmp::min(self.rx_buffer.window() >> self.remote_win_shift as usize,
+                 (1 << 16) - 1) as u16
+    }
+
     /// Set the timeout duration.
     ///
     /// A socket with a timeout duration set will abort the connection if either of the following
@@ -383,6 +405,9 @@ impl<'a> TcpSocket<'a> {
     }
 
     fn reset(&mut self) {
+        let rx_cap_log2 = mem::size_of::<usize>() * 8 -
+            self.rx_buffer.capacity().leading_zeros() as usize;
+
         self.state           = State::Closed;
         self.timer           = Timer::default();
         self.assembler       = Assembler::new(self.rx_buffer.capacity());
@@ -401,6 +426,7 @@ impl<'a> TcpSocket<'a> {
         self.remote_last_win = 0;
         self.remote_win_len  = 0;
         self.remote_win_scale = None;
+        self.remote_win_shift = rx_cap_log2.saturating_sub(16) as u8;
         self.remote_mss      = DEFAULT_MSS;
         self.remote_last_ts  = None;
     }
@@ -811,7 +837,11 @@ impl<'a> TcpSocket<'a> {
         // to be received.
         reply_repr.seq_number = self.remote_last_seq;
         reply_repr.ack_number = self.remote_last_ack;
-        reply_repr.window_len = self.rx_buffer.window() as u16;
+
+        // From RFC 1323:
+        // The window field [...] of every outgoing segment, with the exception of SYN
+        // segments, is right-shifted by [advertised scale value] bits[...]
+        reply_repr.window_len = self.scaled_window();
         self.remote_last_win = reply_repr.window_len;
 
         (ip_reply_repr, reply_repr)
@@ -1030,6 +1060,10 @@ impl<'a> TcpSocket<'a> {
                     self.remote_mss = max_seg_size as usize
                 }
                 self.remote_win_scale = repr.window_scale;
+                // No window scaling means don't do any window shifting
+                if self.remote_win_scale.is_none() {
+                    self.remote_win_shift = 0;
+                }
                 self.set_state(State::SynReceived);
                 self.timer.set_for_idle(timestamp, self.keep_alive);
             }
@@ -1296,7 +1330,8 @@ impl<'a> TcpSocket<'a> {
     }
 
     fn window_to_update(&self) -> bool {
-        self.rx_buffer.window() as u16 > self.remote_last_win
+        (self.rx_buffer.window() >> self.remote_win_shift) as u16 >
+            self.remote_last_win
     }
 
     pub(crate) fn dispatch<F>(&mut self, timestamp: Instant, caps: &DeviceCapabilities,
@@ -1384,7 +1419,7 @@ impl<'a> TcpSocket<'a> {
             control:      TcpControl::None,
             seq_number:   self.remote_last_seq,
             ack_number:   Some(self.remote_seq_no + self.rx_buffer.len()),
-            window_len:   self.rx_buffer.window() as u16,
+            window_len:   self.scaled_window(),
             window_scale: None,
             max_seg_size: None,
             payload:      &[]
@@ -1406,9 +1441,10 @@ impl<'a> TcpSocket<'a> {
                 repr.control = TcpControl::Syn;
                 if self.state == State::SynSent {
                     repr.ack_number = None;
-                    repr.window_scale = Some(0);
+                    repr.window_scale = Some(self.remote_win_shift);
                 } else {
-                    repr.window_scale = self.remote_win_scale.map(|_| 0);
+                    repr.window_scale = self.remote_win_scale.map(
+                        |_| self.remote_win_shift);
                 }
             }
 
@@ -1578,6 +1614,7 @@ impl<'a> fmt::Write for TcpSocket<'a> {
 #[cfg(test)]
 mod test {
     use core::i32;
+    use std::vec::Vec;
     use wire::{IpAddress, IpRepr, IpCidr};
     use wire::ip::test::{MOCK_IP_ADDR_1, MOCK_IP_ADDR_2, MOCK_IP_ADDR_3, MOCK_UNSPECIFIED};
     use super::*;
@@ -1746,11 +1783,15 @@ mod test {
     }
 
     fn socket() -> TcpSocket<'static> {
+        socket_with_buffer_sizes(64, 64)
+    }
+
+    fn socket_with_buffer_sizes(tx_len: usize, rx_len: usize) -> TcpSocket<'static> {
         #[cfg(feature = "log")]
         init_logger();
 
-        let rx_buffer = SocketBuffer::new(vec![0; 64]);
-        let tx_buffer = SocketBuffer::new(vec![0; 64]);
+        let rx_buffer = SocketBuffer::new(vec![0; rx_len]);
+        let tx_buffer = SocketBuffer::new(vec![0; tx_len]);
         TcpSocket::new(rx_buffer, tx_buffer)
     }
 
@@ -1895,6 +1936,46 @@ mod test {
         s
     }
 
+    #[test]
+    fn test_listen_syn_win_scale_buffers() {
+        for (buffer_size, shift_amt) in &[
+            (64, 0),
+            (128, 0),
+            (1024, 0),
+            (65535, 0),
+            (65536, 1),
+            (65537, 1),
+            (131071, 1),
+            (131072, 2),
+            (524287, 3),
+            (524288, 4),
+            (655350, 4),
+            (1048576, 5),
+        ] {
+            let mut s = socket_with_buffer_sizes(64, *buffer_size);
+            s.state = State::Listen;
+            s.local_endpoint  = IpEndpoint::new(IpAddress::default(), LOCAL_PORT);
+            assert_eq!(s.remote_win_shift, *shift_amt);
+            send!(s, TcpRepr {
+                control:    TcpControl::Syn,
+                seq_number: REMOTE_SEQ,
+                ack_number: None,
+                window_scale: Some(0),
+                ..SEND_TEMPL
+            });
+            assert_eq!(s.remote_win_shift, *shift_amt);
+            recv!(s, [TcpRepr {
+                control: TcpControl::Syn,
+                seq_number: LOCAL_SEQ,
+                ack_number: Some(REMOTE_SEQ + 1),
+                max_seg_size: Some(BASE_MSS),
+                window_scale: Some(*shift_amt),
+                window_len: cmp::min(*buffer_size >> *shift_amt, 65535) as u16,
+                ..RECV_TEMPL
+            }]);
+        }
+    }
+
     #[test]
     fn test_listen_sanity() {
         let mut s = socket();
@@ -2064,6 +2145,38 @@ mod test {
         assert_eq!(s.remote_win_scale, None);
     }
 
+    #[test]
+    fn test_syn_received_window_scaling() {
+        for scale in 0..14 {
+            let mut s = socket_listen();
+            send!(s, TcpRepr {
+                control: TcpControl::Syn,
+                seq_number: REMOTE_SEQ,
+                ack_number: None,
+                window_scale: Some(scale),
+                ..SEND_TEMPL
+            });
+            assert_eq!(s.state(), State::SynReceived);
+            assert_eq!(s.local_endpoint(), LOCAL_END);
+            assert_eq!(s.remote_endpoint(), REMOTE_END);
+            recv!(s, [TcpRepr {
+                control: TcpControl::Syn,
+                seq_number: LOCAL_SEQ,
+                ack_number: Some(REMOTE_SEQ + 1),
+                max_seg_size: Some(BASE_MSS),
+                window_scale: Some(0),
+                ..RECV_TEMPL
+            }]);
+            send!(s, TcpRepr {
+                seq_number: REMOTE_SEQ + 1,
+                ack_number: Some(LOCAL_SEQ + 1),
+                window_scale: None,
+                ..SEND_TEMPL
+            });
+            assert_eq!(s.remote_win_scale, Some(scale));
+        }
+    }
+
     #[test]
     fn test_syn_received_close() {
         let mut s = socket_syn_received();
@@ -2220,6 +2333,36 @@ mod test {
         assert_eq!(s.state, State::Closed);
     }
 
+    #[test]
+    fn test_syn_sent_win_scale_buffers() {
+        for (buffer_size, shift_amt) in &[
+            (64, 0),
+            (128, 0),
+            (1024, 0),
+            (65535, 0),
+            (65536, 1),
+            (65537, 1),
+            (131071, 1),
+            (131072, 2),
+            (524287, 3),
+            (524288, 4),
+            (655350, 4),
+            (1048576, 5),
+        ] {
+            let mut s = socket_with_buffer_sizes(64, *buffer_size);
+            assert_eq!(s.remote_win_shift, *shift_amt);
+            s.connect(REMOTE_END, LOCAL_END).unwrap();
+            recv!(s, [TcpRepr {
+                control: TcpControl::Syn,
+                ack_number: None,
+                max_seg_size: Some(BASE_MSS),
+                window_scale: Some(*shift_amt),
+                window_len: cmp::min(*buffer_size >> *shift_amt, 65535) as u16,
+                ..RECV_TEMPL
+            }]);
+        }
+    }
+
     // =========================================================================================//
     // Tests for the ESTABLISHED state.
     // =========================================================================================//
@@ -2242,6 +2385,39 @@ mod test {
         assert_eq!(s.rx_buffer.dequeue_many(6), &b"abcdef"[..]);
     }
 
+    #[test]
+    fn test_established_sliding_window_recv() {
+        let mut s = socket_established();
+        // Update our scaling parameters for a TCP with a scaled buffer.
+        assert_eq!(s.rx_buffer.len(), 0);
+        s.rx_buffer = SocketBuffer::new(vec![0; 262143]);
+        s.assembler = Assembler::new(s.rx_buffer.capacity());
+        s.remote_win_scale = Some(0);
+        s.remote_last_win = 65535;
+        s.remote_win_shift = 2;
+
+        // Create a TCP segment that will mostly fill an IP frame.
+        let mut segment: Vec<u8> = Vec::with_capacity(1400);
+        for _ in 0..100 { segment.extend_from_slice(b"abcdefghijklmn") }
+        assert_eq!(segment.len(), 1400);
+
+        // Send the frame
+        send!(s, TcpRepr {
+            seq_number: REMOTE_SEQ + 1,
+            ack_number: Some(LOCAL_SEQ + 1),
+            payload: &segment,
+            ..SEND_TEMPL
+        });
+
+        // Ensure that the received window size is shifted right by 2.
+        recv!(s, [TcpRepr {
+            seq_number: LOCAL_SEQ + 1,
+            ack_number: Some(REMOTE_SEQ + 1 + 1400),
+            window_len: 65185,
+            ..RECV_TEMPL
+        }]);
+    }
+
     #[test]
     fn test_established_send() {
         let mut s = socket_established();