netsim.rs 10 KB


  1. use std::cell::RefCell;
  2. use std::collections::BinaryHeap;
  3. use std::fmt::Write as _;
  4. use std::io::Write as _;
  5. use std::sync::Mutex;
  6. use rand::{Rng, SeedableRng};
  7. use rand_chacha::ChaCha20Rng;
  8. use smoltcp::iface::{Config, Interface, SocketSet};
  9. use smoltcp::phy::Tracer;
  10. use smoltcp::phy::{self, ChecksumCapabilities, Device, DeviceCapabilities, Medium};
  11. use smoltcp::socket::tcp;
  12. use smoltcp::time::{Duration, Instant};
  13. use smoltcp::wire::{EthernetAddress, HardwareAddress, IpAddress, IpCidr};
  14. const MAC_A: HardwareAddress = HardwareAddress::Ethernet(EthernetAddress([2, 0, 0, 0, 0, 1]));
  15. const MAC_B: HardwareAddress = HardwareAddress::Ethernet(EthernetAddress([2, 0, 0, 0, 0, 2]));
  16. const IP_A: IpAddress = IpAddress::v4(10, 0, 0, 1);
  17. const IP_B: IpAddress = IpAddress::v4(10, 0, 0, 2);
  18. const BYTES: usize = 10 * 1024 * 1024;
  19. static CLOCK: Mutex<(Instant, char)> = Mutex::new((Instant::ZERO, ' '));
  20. #[test]
  21. fn netsim() {
  22. setup_logging();
  23. let buffers = [128, 256, 512, 1024, 2048, 4096, 8192, 16384, 32768];
  24. let losses = [0.0, 0.001, 0.01, 0.02, 0.05, 0.10, 0.20, 0.30];
  25. let mut s = String::new();
  26. write!(&mut s, "buf\\loss").unwrap();
  27. for loss in losses {
  28. write!(&mut s, "{loss:9.3} ").unwrap();
  29. }
  30. writeln!(&mut s).unwrap();
  31. for buffer in buffers {
  32. write!(&mut s, "{buffer:7}").unwrap();
  33. for loss in losses {
  34. let r = run_test(TestCase {
  35. rtt: Duration::from_millis(100),
  36. buffer,
  37. loss,
  38. });
  39. write!(&mut s, " {r:9.2}").unwrap();
  40. }
  41. writeln!(&mut s).unwrap();
  42. }
  43. insta::assert_snapshot!(s);
  44. }
  45. struct TestCase {
  46. rtt: Duration,
  47. loss: f64,
  48. buffer: usize,
  49. }
  50. fn run_test(case: TestCase) -> f64 {
  51. let mut time = Instant::ZERO;
  52. let params = QueueParams {
  53. latency: case.rtt / 2,
  54. loss: case.loss,
  55. };
  56. let queue_a_to_b = RefCell::new(PacketQueue::new(params.clone(), 0));
  57. let queue_b_to_a = RefCell::new(PacketQueue::new(params.clone(), 1));
  58. let device_a = QueueDevice::new(&queue_a_to_b, &queue_b_to_a, Medium::Ethernet);
  59. let device_b = QueueDevice::new(&queue_b_to_a, &queue_a_to_b, Medium::Ethernet);
  60. let mut device_a = Tracer::new(device_a, |_timestamp, _printer| log::trace!("{}", _printer));
  61. let mut device_b = Tracer::new(device_b, |_timestamp, _printer| log::trace!("{}", _printer));
  62. let mut iface_a = Interface::new(Config::new(MAC_A), &mut device_a, time);
  63. iface_a.update_ip_addrs(|a| a.push(IpCidr::new(IP_A, 8)).unwrap());
  64. let mut iface_b = Interface::new(Config::new(MAC_B), &mut device_b, time);
  65. iface_b.update_ip_addrs(|a| a.push(IpCidr::new(IP_B, 8)).unwrap());
  66. // Create sockets
  67. let socket_a = {
  68. let tcp_rx_buffer = tcp::SocketBuffer::new(vec![0; case.buffer]);
  69. let tcp_tx_buffer = tcp::SocketBuffer::new(vec![0; case.buffer]);
  70. tcp::Socket::new(tcp_rx_buffer, tcp_tx_buffer)
  71. };
  72. let socket_b = {
  73. let tcp_rx_buffer = tcp::SocketBuffer::new(vec![0; case.buffer]);
  74. let tcp_tx_buffer = tcp::SocketBuffer::new(vec![0; case.buffer]);
  75. tcp::Socket::new(tcp_rx_buffer, tcp_tx_buffer)
  76. };
  77. let mut sockets_a: [_; 2] = Default::default();
  78. let mut sockets_a = SocketSet::new(&mut sockets_a[..]);
  79. let socket_a_handle = sockets_a.add(socket_a);
  80. let mut sockets_b: [_; 2] = Default::default();
  81. let mut sockets_b = SocketSet::new(&mut sockets_b[..]);
  82. let socket_b_handle = sockets_b.add(socket_b);
  83. let mut did_listen = false;
  84. let mut did_connect = false;
  85. let mut processed = 0;
  86. while processed < BYTES {
  87. *CLOCK.lock().unwrap() = (time, ' ');
  88. log::info!("loop");
  89. //println!("t = {}", time);
  90. *CLOCK.lock().unwrap() = (time, 'A');
  91. iface_a.poll(time, &mut device_a, &mut sockets_a);
  92. let socket = sockets_a.get_mut::<tcp::Socket>(socket_a_handle);
  93. if !socket.is_active() && !socket.is_listening() && !did_listen {
  94. //println!("listening");
  95. socket.listen(1234).unwrap();
  96. did_listen = true;
  97. }
  98. while socket.can_recv() {
  99. let received = socket.recv(|buffer| (buffer.len(), buffer.len())).unwrap();
  100. //println!("got {:?}", received,);
  101. processed += received;
  102. }
  103. *CLOCK.lock().unwrap() = (time, 'B');
  104. iface_b.poll(time, &mut device_b, &mut sockets_b);
  105. let socket = sockets_b.get_mut::<tcp::Socket>(socket_b_handle);
  106. let cx = iface_b.context();
  107. if !socket.is_open() && !did_connect {
  108. //println!("connecting");
  109. socket.connect(cx, (IP_A, 1234), 65000).unwrap();
  110. did_connect = true;
  111. }
  112. while socket.can_send() {
  113. //println!("sending");
  114. socket.send(|buffer| (buffer.len(), ())).unwrap();
  115. }
  116. *CLOCK.lock().unwrap() = (time, ' ');
  117. let mut next_time = queue_a_to_b.borrow_mut().next_expiration();
  118. next_time = next_time.min(queue_b_to_a.borrow_mut().next_expiration());
  119. if let Some(t) = iface_a.poll_at(time, &sockets_a) {
  120. next_time = next_time.min(t);
  121. }
  122. if let Some(t) = iface_b.poll_at(time, &sockets_b) {
  123. next_time = next_time.min(t);
  124. }
  125. assert!(next_time.total_micros() != i64::MAX);
  126. time = time.max(next_time);
  127. }
  128. let duration = time - Instant::ZERO;
  129. processed as f64 / duration.total_micros() as f64 * 1e6
  130. }
  131. struct Packet {
  132. timestamp: Instant,
  133. id: u64,
  134. data: Vec<u8>,
  135. }
  136. impl PartialEq for Packet {
  137. fn eq(&self, other: &Self) -> bool {
  138. (other.timestamp, other.id) == (self.timestamp, self.id)
  139. }
  140. }
  141. impl Eq for Packet {}
  142. impl PartialOrd for Packet {
  143. fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
  144. Some(self.cmp(other))
  145. }
  146. }
  147. impl Ord for Packet {
  148. fn cmp(&self, other: &Self) -> std::cmp::Ordering {
  149. (other.timestamp, other.id).cmp(&(self.timestamp, self.id))
  150. }
  151. }
  152. #[derive(Clone)]
  153. struct QueueParams {
  154. latency: Duration,
  155. loss: f64,
  156. }
  157. struct PacketQueue {
  158. queue: BinaryHeap<Packet>,
  159. next_id: u64,
  160. params: QueueParams,
  161. rng: ChaCha20Rng,
  162. }
  163. impl PacketQueue {
  164. pub fn new(params: QueueParams, seed: u64) -> Self {
  165. Self {
  166. queue: BinaryHeap::new(),
  167. next_id: 0,
  168. params,
  169. rng: ChaCha20Rng::seed_from_u64(seed),
  170. }
  171. }
  172. pub fn next_expiration(&self) -> Instant {
  173. self.queue
  174. .peek()
  175. .map(|p| p.timestamp)
  176. .unwrap_or(Instant::from_micros(i64::MAX))
  177. }
  178. pub fn push(&mut self, data: Vec<u8>, timestamp: Instant) {
  179. if self.rng.gen::<f64>() < self.params.loss {
  180. log::info!("PACKET LOST!");
  181. return;
  182. }
  183. self.queue.push(Packet {
  184. data,
  185. id: self.next_id,
  186. timestamp: timestamp + self.params.latency,
  187. });
  188. self.next_id += 1;
  189. }
  190. pub fn pop(&mut self, timestamp: Instant) -> Option<Vec<u8>> {
  191. let p = self.queue.peek()?;
  192. if p.timestamp > timestamp {
  193. return None;
  194. }
  195. Some(self.queue.pop().unwrap().data)
  196. }
  197. }
  198. pub struct QueueDevice<'a> {
  199. tx_queue: &'a RefCell<PacketQueue>,
  200. rx_queue: &'a RefCell<PacketQueue>,
  201. medium: Medium,
  202. }
  203. impl<'a> QueueDevice<'a> {
  204. fn new(
  205. tx_queue: &'a RefCell<PacketQueue>,
  206. rx_queue: &'a RefCell<PacketQueue>,
  207. medium: Medium,
  208. ) -> Self {
  209. Self {
  210. tx_queue,
  211. rx_queue,
  212. medium,
  213. }
  214. }
  215. }
  216. impl Device for QueueDevice<'_> {
  217. type RxToken<'a>
  218. = RxToken
  219. where
  220. Self: 'a;
  221. type TxToken<'a>
  222. = TxToken<'a>
  223. where
  224. Self: 'a;
  225. fn capabilities(&self) -> DeviceCapabilities {
  226. let mut caps = DeviceCapabilities::default();
  227. caps.max_transmission_unit = 1514;
  228. caps.medium = self.medium;
  229. caps.checksum = ChecksumCapabilities::ignored();
  230. caps
  231. }
  232. fn receive(&mut self, timestamp: Instant) -> Option<(Self::RxToken<'_>, Self::TxToken<'_>)> {
  233. self.rx_queue
  234. .borrow_mut()
  235. .pop(timestamp)
  236. .map(move |buffer| {
  237. let rx = RxToken { buffer };
  238. let tx = TxToken {
  239. queue: self.tx_queue,
  240. timestamp,
  241. };
  242. (rx, tx)
  243. })
  244. }
  245. fn transmit(&mut self, timestamp: Instant) -> Option<Self::TxToken<'_>> {
  246. Some(TxToken {
  247. queue: self.tx_queue,
  248. timestamp,
  249. })
  250. }
  251. }
  252. pub struct RxToken {
  253. buffer: Vec<u8>,
  254. }
  255. impl phy::RxToken for RxToken {
  256. fn consume<R, F>(self, f: F) -> R
  257. where
  258. F: FnOnce(&[u8]) -> R,
  259. {
  260. f(&self.buffer)
  261. }
  262. }
  263. pub struct TxToken<'a> {
  264. queue: &'a RefCell<PacketQueue>,
  265. timestamp: Instant,
  266. }
  267. impl<'a> phy::TxToken for TxToken<'a> {
  268. fn consume<R, F>(self, len: usize, f: F) -> R
  269. where
  270. F: FnOnce(&mut [u8]) -> R,
  271. {
  272. let mut buffer = vec![0; len];
  273. let result = f(&mut buffer);
  274. self.queue.borrow_mut().push(buffer, self.timestamp);
  275. result
  276. }
  277. }
  278. pub fn setup_logging() {
  279. env_logger::Builder::new()
  280. .format(move |buf, record| {
  281. let (elapsed, side) = *CLOCK.lock().unwrap();
  282. let timestamp = format!("[{elapsed} {side}]");
  283. if record.target().starts_with("smoltcp::") {
  284. writeln!(
  285. buf,
  286. "{} ({}): {}",
  287. timestamp,
  288. record.target().replace("smoltcp::", ""),
  289. record.args()
  290. )
  291. } else if record.level() == log::Level::Trace {
  292. let message = format!("{}", record.args());
  293. writeln!(
  294. buf,
  295. "{} {}",
  296. timestamp,
  297. message.replace('\n', "\n ")
  298. )
  299. } else {
  300. writeln!(
  301. buf,
  302. "{} ({}): {}",
  303. timestamp,
  304. record.target(),
  305. record.args()
  306. )
  307. }
  308. })
  309. .parse_env("RUST_LOG")
  310. .init();
  311. }