Browse Source

Add packet shaping to the fault injector.

whitequark 7 years ago
parent
commit
21900288cc
4 changed files with 178 additions and 42 deletions
  1. 17 5
      README.md
  2. 19 1
      examples/utils.rs
  3. 141 36
      src/phy/fault_injector.rs
  4. 1 0
      src/socket/tcp.rs

+ 17 - 5
README.md

@@ -137,6 +137,23 @@ sudo ip link set tap0 up
 sudo ip addr add 192.168.69.100/24 dev tap0
 ```
 
+### Fault injection
+
+In order to demonstrate the response of _smoltcp_ to adverse network conditions, all examples
+implement fault injection, available through command-line options:
+
+  * The `--drop-chance` option randomly drops packets, with given probability in percents.
+  * The `--corrupt-chance` option randomly mutates one octet in a packet, with given
+    probability in percents.
+  * The `--size-limit` option drops packets larger than specified size.
+  * The `--tx-rate-limit` and `--rx-rate-limit` options set the amount of tokens for
+    a token bucket rate limiter, in packets per bucket.
+  * The `--shaping-interval` option sets the refill interval of a token bucket rate limiter,
+    in milliseconds.
+
+A good starting value for `--drop-chance` and `--corrupt-chance` is 15%. A good starting
+value for `--?x-rate-limit` is 4 and `--shaping-interval` is 50 ms.
+
 ### examples/tcpdump.rs
 
 _examples/tcpdump.rs_ is a tiny clone of the _tcpdump_ utility.
@@ -175,9 +192,6 @@ It responds to:
 
 The buffers are only 64 bytes long, for convenience of testing resource exhaustion conditions.
 
-Fault injection is available through the `--drop-chance` and `--corrupt-chance` options,
-with values in percents. A good starting value is 15%.
-
 ### examples/client.rs
 
 _examples/client.rs_ emulates a network host that can initiate requests.
@@ -193,8 +207,6 @@ cargo run --example client -- tap0 ADDRESS PORT
 It connects to the given address (not a hostname) and port (e.g. `socat stdio tcp4-listen 1234`),
 and will respond with reversed chunks of the input indefinitely.
 
-Fault injection is available, as described above.
-
 License
 -------
 

+ 19 - 1
examples/utils.rs

@@ -1,6 +1,6 @@
 use std::str::{self, FromStr};
 use std::env;
-use std::time::{Instant, SystemTime, UNIX_EPOCH};
+use std::time::{Instant, Duration, SystemTime, UNIX_EPOCH};
 use std::process;
 use log::{LogLevelFilter, LogRecord};
 use env_logger::{LogBuilder};
@@ -36,6 +36,12 @@ pub fn setup_device(more_args: &[&str])
     let mut opts = getopts::Options::new();
     opts.optopt("", "drop-chance", "Chance of dropping a packet (%)", "CHANCE");
     opts.optopt("", "corrupt-chance", "Chance of corrupting a packet (%)", "CHANCE");
+    opts.optopt("", "size-limit", "Drop packets larger than given size (octets)", "SIZE");
+    opts.optopt("", "tx-rate-limit", "Drop packets after transmit rate exceeds given limit \
+                                      (packets per interval)", "RATE");
+    opts.optopt("", "rx-rate-limit", "Drop packets after transmit rate exceeds given limit \
+                                      (packets per interval)", "RATE");
+    opts.optopt("", "shaping-interval", "Sets the interval for rate limiting (ms)", "RATE");
     opts.optflag("h", "help", "print this help menu");
 
     let matches = opts.parse(env::args().skip(1)).unwrap();
@@ -50,6 +56,14 @@ pub fn setup_device(more_args: &[&str])
                                              .unwrap_or("0".to_string())).unwrap();
     let corrupt_chance = u8::from_str(&matches.opt_str("corrupt-chance")
                                              .unwrap_or("0".to_string())).unwrap();
+    let size_limit = usize::from_str(&matches.opt_str("size-limit")
+                                             .unwrap_or("0".to_string())).unwrap();
+    let tx_rate_limit = u64::from_str(&matches.opt_str("tx-rate-limit")
+                                              .unwrap_or("0".to_string())).unwrap();
+    let rx_rate_limit = u64::from_str(&matches.opt_str("rx-rate-limit")
+                                              .unwrap_or("0".to_string())).unwrap();
+    let shaping_interval = u32::from_str(&matches.opt_str("shaping-interval")
+                                                 .unwrap_or("0".to_string())).unwrap();
 
     let seed = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().subsec_nanos();
 
@@ -61,6 +75,10 @@ pub fn setup_device(more_args: &[&str])
     let mut device = FaultInjector::new(device, seed);
     device.set_drop_chance(drop_chance);
     device.set_corrupt_chance(corrupt_chance);
+    device.set_max_packet_size(size_limit);
+    device.set_max_tx_rate(tx_rate_limit);
+    device.set_max_rx_rate(rx_rate_limit);
+    device.set_bucket_interval(Duration::from_millis(shaping_interval as u64));
     let device = Tracer::<_, EthernetFrame<&'static [u8]>>::new(device, trace_writer);
 
     (device, matches.free[1..].to_owned())

+ 141 - 36
src/phy/fault_injector.rs

@@ -1,3 +1,5 @@
+use std::time::{Instant, Duration};
+
 use Error;
 use super::{DeviceLimits, Device};
 
@@ -12,54 +14,102 @@ fn xorshift32(state: &mut u32) -> u32 {
     x
 }
 
-fn check_rng(state: &mut u32, pct: u8) -> bool {
-    xorshift32(state) % 100 < pct as u32
-}
-
-fn corrupt<T: AsMut<[u8]>>(state: &mut u32, mut buffer: T) {
-    let mut buffer = buffer.as_mut();
-    // We introduce a single bitflip, as the most likely, and the hardest to detect, error.
-    let index = (xorshift32(state) as usize) % buffer.len();
-    let bit   = 1 << (xorshift32(state) % 8) as u8;
-    buffer[index] ^= bit;
-}
-
 // This could be fixed once associated consts are stable.
 const MTU: usize = 1536;
 
-#[derive(Debug, Clone, Copy)]
+#[derive(Debug, Clone, Copy, Default)]
 struct Config {
     corrupt_pct: u8,
     drop_pct:    u8,
-    reorder_pct: u8
+    reorder_pct: u8,
+    max_size:    usize,
+    max_tx_rate: u64,
+    max_rx_rate: u64,
+    interval:    Duration,
+}
+
+#[derive(Debug, Clone, Copy)]
+struct State {
+    rng_seed:    u32,
+    refilled_at: Instant,
+    tx_bucket:   u64,
+    rx_bucket:   u64,
+}
+
+impl State {
+    fn maybe(&mut self, pct: u8) -> bool {
+        xorshift32(&mut self.rng_seed) % 100 < pct as u32
+    }
+
+    fn corrupt<T: AsMut<[u8]>>(&mut self, mut buffer: T) {
+        let mut buffer = buffer.as_mut();
+        // We introduce a single bitflip, as the most likely, and the hardest to detect, error.
+        let index = (xorshift32(&mut self.rng_seed) as usize) % buffer.len();
+        let bit   = 1 << (xorshift32(&mut self.rng_seed) % 8) as u8;
+        buffer[index] ^= bit;
+    }
+
+    fn refill(&mut self, config: &Config) {
+        if self.refilled_at.elapsed() > config.interval {
+            self.tx_bucket = config.max_tx_rate;
+            self.rx_bucket = config.max_rx_rate;
+            self.refilled_at = Instant::now();
+        }
+    }
+
+    fn maybe_transmit(&mut self, config: &Config) -> bool {
+        if config.max_tx_rate == 0 { return true }
+
+        self.refill(config);
+        if self.tx_bucket > 0 {
+            self.tx_bucket -= 1;
+            true
+        } else {
+            false
+        }
+    }
+
+    fn maybe_receive(&mut self, config: &Config) -> bool {
+        if config.max_rx_rate == 0 { return true }
+
+        self.refill(config);
+        if self.rx_bucket > 0 {
+            self.rx_bucket -= 1;
+            true
+        } else {
+            false
+        }
+    }
 }
 
 /// A fault injector device.
 ///
-/// A fault injector is a device that randomly drops or corrupts packets traversing it,
-/// according to preset probabilities.
+/// A fault injector is a device that alters packets traversing through it to simulate
+/// adverse network conditions (such as random packet loss or corruption), or software
+/// or hardware limitations (such as a limited number or size of usable network buffers).
 #[derive(Debug)]
 pub struct FaultInjector<T: Device> {
     lower:  T,
-    state:  u32,
+    state:  State,
     config: Config
 }
 
 impl<T: Device> FaultInjector<T> {
-    /// Create a tracer device, using the given random number generator seed.
+    /// Create a fault injector device, using the given random number generator seed.
     pub fn new(lower: T, seed: u32) -> FaultInjector<T> {
         FaultInjector {
             lower: lower,
-            state: seed,
-            config: Config {
-                corrupt_pct: 0,
-                drop_pct:    0,
-                reorder_pct: 0
-            }
+            state: State {
+                rng_seed:    seed,
+                refilled_at: Instant::now(),
+                tx_bucket:   0,
+                rx_bucket:   0,
+            },
+            config: Config::default()
         }
     }
 
-    /// Return the underlying device, consuming the tracer.
+    /// Return the underlying device, consuming the fault injector.
     pub fn into_lower(self) -> T {
         self.lower
     }
@@ -74,6 +124,26 @@ impl<T: Device> FaultInjector<T> {
         self.config.drop_pct
     }
 
+    /// Return the maximum packet size, in octets.
+    pub fn max_packet_size(&self) -> usize {
+        self.config.max_size
+    }
+
+    /// Return the maximum packet transmission rate, in packets per second.
+    pub fn max_tx_rate(&self) -> u64 {
+        self.config.max_rx_rate
+    }
+
+    /// Return the maximum packet reception rate, in packets per second.
+    pub fn max_rx_rate(&self) -> u64 {
+        self.config.max_tx_rate
+    }
+
+    /// Return the interval for packet rate limiting, in milliseconds.
+    pub fn bucket_interval(&self) -> Duration {
+        self.config.interval
+    }
+
     /// Set the probability of corrupting a packet, in percents.
     ///
     /// # Panics
@@ -91,6 +161,27 @@ impl<T: Device> FaultInjector<T> {
         if pct > 100 { panic!("percentage out of range") }
         self.config.drop_pct = pct
     }
+
+    /// Set the maximum packet size, in octets.
+    pub fn set_max_packet_size(&mut self, size: usize) {
+        self.config.max_size = size
+    }
+
+    /// Set the maximum packet transmission rate, in packets per interval.
+    pub fn set_max_tx_rate(&mut self, rate: u64) {
+        self.config.max_tx_rate = rate
+    }
+
+    /// Set the maximum packet reception rate, in packets per interval.
+    pub fn set_max_rx_rate(&mut self, rate: u64) {
+        self.config.max_rx_rate = rate
+    }
+
+    /// Set the interval for packet rate limiting, in milliseconds.
+    pub fn set_bucket_interval(&mut self, interval: Duration) {
+        self.state.refilled_at = Instant::now() - self.config.interval;
+        self.config.interval = interval
+    }
 }
 
 impl<T: Device> Device for FaultInjector<T>
@@ -108,28 +199,42 @@ impl<T: Device> Device for FaultInjector<T>
 
     fn receive(&mut self) -> Result<Self::RxBuffer, Error> {
         let mut buffer = try!(self.lower.receive());
-        if check_rng(&mut self.state, self.config.drop_pct) {
-            net_trace!("rx: dropping a packet");
+        if self.state.maybe(self.config.drop_pct) {
+            net_trace!("rx: randomly dropping a packet");
             return Err(Error::Exhausted)
         }
-        if check_rng(&mut self.state, self.config.corrupt_pct) {
-            net_trace!("rx: corrupting a packet");
-            corrupt(&mut self.state, &mut buffer)
+        if self.state.maybe(self.config.corrupt_pct) {
+            net_trace!("rx: randomly corrupting a packet");
+            self.state.corrupt(&mut buffer)
+        }
+        if self.config.max_size > 0 && buffer.as_ref().len() > self.config.max_size {
+            net_trace!("rx: dropping a packet that is too large");
+            return Err(Error::Exhausted)
+        }
+        if !self.state.maybe_receive(&self.config) {
+            net_trace!("rx: dropping a packet because of rate limiting");
+            return Err(Error::Exhausted)
         }
         Ok(buffer)
     }
 
     fn transmit(&mut self, length: usize) -> Result<Self::TxBuffer, Error> {
         let buffer;
-        if check_rng(&mut self.state, self.config.drop_pct) {
-            net_trace!("tx: dropping a packet");
+        if self.state.maybe(self.config.drop_pct) {
+            net_trace!("tx: randomly dropping a packet");
+            buffer = None;
+        } else if self.config.max_size > 0 && length > self.config.max_size {
+            net_trace!("tx: dropping a packet that is too large");
+            buffer = None;
+        } else if !self.state.maybe_transmit(&self.config) {
+            net_trace!("tx: dropping a packet because of rate limiting");
             buffer = None;
         } else {
             buffer = Some(try!(self.lower.transmit(length)));
         }
         Ok(TxBuffer {
             buffer: buffer,
-            state:  xorshift32(&mut self.state),
+            state:  self.state.clone(),
             config: self.config,
             junk:   [0; MTU],
             length: length
@@ -139,7 +244,7 @@ impl<T: Device> Device for FaultInjector<T>
 
 #[doc(hidden)]
 pub struct TxBuffer<T: AsRef<[u8]> + AsMut<[u8]>> {
-    state:  u32,
+    state:  State,
     config: Config,
     buffer: Option<T>,
     junk:   [u8; MTU],
@@ -170,9 +275,9 @@ impl<T: AsRef<[u8]> + AsMut<[u8]>> Drop for TxBuffer<T> {
     fn drop(&mut self) {
         match self.buffer {
             Some(ref mut buf) => {
-                if check_rng(&mut self.state, self.config.corrupt_pct) {
+                if self.state.maybe(self.config.corrupt_pct) {
                     net_trace!("tx: corrupting a packet");
-                    corrupt(&mut self.state, buf)
+                    self.state.corrupt(buf)
                 }
             },
             None => ()

+ 1 - 0
src/socket/tcp.rs

@@ -717,6 +717,7 @@ impl<'a> TcpSocket<'a> {
             (_, TcpRepr { seq_number, .. }) => {
                 let next_remote_seq = self.remote_seq_no + self.rx_buffer.len();
                 if seq_number > next_remote_seq {
+                    self.retransmit.reset();
                     net_trace!("[{}]{}:{}: unacceptable SEQ ({} not in {}..)",
                                self.debug_id, self.local_endpoint, self.remote_endpoint,
                                seq_number, next_remote_seq);