瀏覽代碼

iface: make poll() process all packets, add fine-grained poll functions.

This makes `.poll()` behave the same as before #954. Users affected by DoS
concerns can use the finer-grained egress-only and single-packet-ingress-only fns.
Dario Nieuwenhuis 6 月之前
父節點
當前提交
dd43c8f189
共有 4 個文件被更改,包括 145 次插入74 次删除
  1. 2 7
      src/iface/interface/ipv4.rs
  2. 140 52
      src/iface/interface/mod.rs
  3. 1 4
      src/iface/interface/multicast.rs
  4. 2 11
      src/iface/interface/sixlowpan.rs

+ 2 - 7
src/iface/interface/ipv4.rs

@@ -7,17 +7,14 @@ impl Interface {
     /// processed or emitted, and thus, whether the readiness of any socket might
     /// have changed.
     #[cfg(feature = "proto-ipv4-fragmentation")]
-    pub(super) fn ipv4_egress<D>(&mut self, device: &mut D) -> bool
-    where
-        D: Device + ?Sized,
-    {
+    pub(super) fn ipv4_egress(&mut self, device: &mut (impl Device + ?Sized)) {
         // Reset the buffer when we transmitted everything.
         if self.fragmenter.finished() {
             self.fragmenter.reset();
         }
 
         if self.fragmenter.is_empty() {
-            return false;
+            return;
         }
 
         let pkt = &self.fragmenter;
@@ -25,10 +22,8 @@ impl Interface {
             if let Some(tx_token) = device.transmit(self.inner.now) {
                 self.inner
                     .dispatch_ipv4_frag(tx_token, &mut self.fragmenter);
-                return true;
             }
         }
-        false
     }
 }
 

+ 140 - 52
src/iface/interface/mod.rs

@@ -65,6 +65,44 @@ macro_rules! check {
 }
 use check;
 
+/// Result returned by [`Interface::poll`].
+///
+/// This contains information on whether socket states might have changed.
+#[derive(Copy, Clone, PartialEq, Eq, Debug)]
+#[cfg_attr(feature = "defmt", derive(defmt::Format))]
+pub enum PollResult {
+    /// Socket state is guaranteed to not have changed.
+    None,
+    /// You should check the state of sockets again for received data or completion of operations.
+    SocketStateChanged,
+}
+
+/// Result returned by [`Interface::poll_ingress_single`].
+///
+/// This contains information on whether a packet was processed or not,
+/// and whether it might've affected socket states.
+#[derive(Copy, Clone, PartialEq, Eq, Debug)]
+#[cfg_attr(feature = "defmt", derive(defmt::Format))]
+pub enum PollIngressSingleResult {
+    /// No packet was processed. You don't need to call [`Interface::poll_ingress_single`]
+    /// again, until more packets arrive.
+    ///
+    /// Socket state is guaranteed to not have changed.
+    None,
+    /// A packet was processed.
+    ///
+    /// There may be more packets in the device's RX queue, so you should call [`Interface::poll_ingress_single`] again.
+    ///
+    /// Socket state is guaranteed to not have changed.
+    PacketProcessed,
+    /// A packet was processed, which might have caused socket state to change.
+    ///
+    /// There may be more packets in the device's RX queue, so you should call [`Interface::poll_ingress_single`] again.
+    ///
+    /// You should check the state of sockets again for received data or completion of operations.
+    SocketStateChanged,
+}
+
 /// A  network interface.
 ///
 /// The network interface logically owns a number of other data structures; to avoid
@@ -150,10 +188,7 @@ impl Interface {
     /// # Panics
     /// This function panics if the [`Config::hardware_address`] does not match
     /// the medium of the device.
-    pub fn new<D>(config: Config, device: &mut D, now: Instant) -> Self
-    where
-        D: Device + ?Sized,
-    {
+    pub fn new(config: Config, device: &mut (impl Device + ?Sized), now: Instant) -> Self {
         let caps = device.capabilities();
         assert_eq!(
             config.hardware_addr.medium(),
@@ -375,59 +410,107 @@ impl Interface {
         self.fragments.reassembly_timeout = timeout;
     }
 
-    /// Transmit packets queued in the given sockets, and receive packets queued
+    /// Transmit packets queued in the sockets, and receive packets queued
     /// in the device.
     ///
-    /// This function returns a boolean value indicating whether any packets were
-    /// processed or emitted, and thus, whether the readiness of any socket might
-    /// have changed.
+    /// This function returns a value indicating whether the state of any socket
+    /// might have changed.
+    ///
+    /// ## DoS warning
     ///
-    /// # Note
-    /// This function performs a bounded amount of work per call to avoid
-    /// starving other tasks of CPU time. If it returns true, there may still be
-    /// packets to be received or transmitted. Depending on system design,
-    /// calling this function in a loop may cause a denial of service if
-    /// packets cannot be processed faster than they arrive.
-    pub fn poll<D>(
+    /// This function processes all packets in the device's queue. This can
+    /// be an unbounded amount of work if packets arrive faster than they're
+    /// processed.
+    ///
+    /// If this is a concern for your application (i.e. your environment doesn't
+    /// have preemptive scheduling, or `poll()` is called from a main loop where
+    /// other important things are processed), you may use the lower-level methods
+    /// [`poll_egress()`](Self::poll_egress) and [`poll_ingress_single()`](Self::poll_ingress_single).
+    /// This allows you to insert yields or process other events between processing
+    /// individual ingress packets.
+    pub fn poll(
         &mut self,
         timestamp: Instant,
-        device: &mut D,
+        device: &mut (impl Device + ?Sized),
         sockets: &mut SocketSet<'_>,
-    ) -> bool
-    where
-        D: Device + ?Sized,
-    {
+    ) -> PollResult {
         self.inner.now = timestamp;
 
+        let mut res = PollResult::None;
+
         #[cfg(feature = "_proto-fragmentation")]
         self.fragments.assembler.remove_expired(timestamp);
 
+        // Process ingress while there's packets available.
+        loop {
+            match self.socket_ingress(device, sockets) {
+                PollIngressSingleResult::None => break,
+                PollIngressSingleResult::PacketProcessed => {}
+                PollIngressSingleResult::SocketStateChanged => res = PollResult::SocketStateChanged,
+            }
+        }
+
+        // Process egress.
+        match self.poll_egress(timestamp, device, sockets) {
+            PollResult::None => {}
+            PollResult::SocketStateChanged => res = PollResult::SocketStateChanged,
+        }
+
+        res
+    }
+
+    /// Transmit packets queued in the sockets.
+    ///
+    /// This function returns a value indicating whether the state of any socket
+    /// might have changed.
+    ///
+    /// This is guaranteed to always perform a bounded amount of work.
+    pub fn poll_egress(
+        &mut self,
+        timestamp: Instant,
+        device: &mut (impl Device + ?Sized),
+        sockets: &mut SocketSet<'_>,
+    ) -> PollResult {
+        self.inner.now = timestamp;
+
         match self.inner.caps.medium {
             #[cfg(feature = "medium-ieee802154")]
-            Medium::Ieee802154 =>
-            {
+            Medium::Ieee802154 => {
                 #[cfg(feature = "proto-sixlowpan-fragmentation")]
-                if self.sixlowpan_egress(device) {
-                    return true;
-                }
+                self.sixlowpan_egress(device);
             }
             #[cfg(any(feature = "medium-ethernet", feature = "medium-ip"))]
-            _ =>
-            {
+            _ => {
                 #[cfg(feature = "proto-ipv4-fragmentation")]
-                if self.ipv4_egress(device) {
-                    return true;
-                }
+                self.ipv4_egress(device);
             }
         }
 
-        let mut readiness_may_have_changed = self.socket_ingress(device, sockets);
-        readiness_may_have_changed |= self.socket_egress(device, sockets);
-
         #[cfg(feature = "multicast")]
         self.multicast_egress(device);
 
-        readiness_may_have_changed
+        self.socket_egress(device, sockets)
+    }
+
+    /// Process one incoming packet queued in the device.
+    ///
+    /// Returns a value indicating:
+    /// - whether a packet was processed, in which case you have to call this method again in case there's more packets queued.
+    /// - whether the state of any socket might have changed.
+    ///
+    /// Since it processes at most one packet, this is guaranteed to always perform a bounded amount of work.
+    pub fn poll_ingress_single(
+        &mut self,
+        timestamp: Instant,
+        device: &mut (impl Device + ?Sized),
+        sockets: &mut SocketSet<'_>,
+    ) -> PollIngressSingleResult {
+        self.inner.now = timestamp;
+
+        #[cfg(feature = "_proto-fragmentation")]
+        self.fragments.assembler.remove_expired(timestamp);
+
+        self.socket_ingress(device, sockets)
     }
 
     /// Return a _soft deadline_ for calling [poll] the next time.
@@ -480,20 +563,19 @@ impl Interface {
         }
     }
 
-    fn socket_ingress<D>(&mut self, device: &mut D, sockets: &mut SocketSet<'_>) -> bool
-    where
-        D: Device + ?Sized,
-    {
-        let mut processed_any = false;
-
+    fn socket_ingress(
+        &mut self,
+        device: &mut (impl Device + ?Sized),
+        sockets: &mut SocketSet<'_>,
+    ) -> PollIngressSingleResult {
         let Some((rx_token, tx_token)) = device.receive(self.inner.now) else {
-            return processed_any;
+            return PollIngressSingleResult::None;
         };
 
         let rx_meta = rx_token.meta();
         rx_token.consume(|frame| {
             if frame.is_empty() {
-                return;
+                return PollIngressSingleResult::PacketProcessed;
             }
 
             match self.inner.caps.medium {
@@ -543,16 +625,22 @@ impl Interface {
                     }
                 }
             }
-            processed_any = true;
-        });
 
-        processed_any
+            // TODO: Propagate the PollIngressSingleResult from deeper.
+            // There's many received packets that we process but can't cause sockets
+            // to change state. For example IP fragments, multicast stuff, ICMP pings
+            // if they dont't match any raw socket...
+            // We should return `PacketProcessed` for these to save the user from
+            // doing useless socket polls.
+            PollIngressSingleResult::SocketStateChanged
+        })
     }
 
-    fn socket_egress<D>(&mut self, device: &mut D, sockets: &mut SocketSet<'_>) -> bool
-    where
-        D: Device + ?Sized,
-    {
+    fn socket_egress(
+        &mut self,
+        device: &mut (impl Device + ?Sized),
+        sockets: &mut SocketSet<'_>,
+    ) -> PollResult {
         let _caps = device.capabilities();
 
         enum EgressError {
@@ -560,7 +648,7 @@ impl Interface {
             Dispatch,
         }
 
-        let mut emitted_any = false;
+        let mut result = PollResult::None;
         for item in sockets.items_mut() {
             if !item
                 .meta
@@ -581,7 +669,7 @@ impl Interface {
                     .dispatch_ip(t, meta, response, &mut self.fragmenter)
                     .map_err(|_| EgressError::Dispatch)?;
 
-                emitted_any = true;
+                result = PollResult::SocketStateChanged;
 
                 Ok(())
             };
@@ -663,7 +751,7 @@ impl Interface {
                 Ok(()) => {}
             }
         }
-        emitted_any
+        result
     }
 }
 

+ 1 - 4
src/iface/interface/multicast.rs

@@ -145,10 +145,7 @@ impl Interface {
     /// - Send join/leave packets according to the multicast group state.
     /// - Depending on `igmp_report_state` and the therein contained
     ///   timeouts, send IGMP membership reports.
-    pub(crate) fn multicast_egress<D>(&mut self, device: &mut D)
-    where
-        D: Device + ?Sized,
-    {
+    pub(crate) fn multicast_egress(&mut self, device: &mut (impl Device + ?Sized)) {
         // Process multicast joins.
         while let Some((&addr, _)) = self
             .inner

+ 2 - 11
src/iface/interface/sixlowpan.rs

@@ -7,22 +7,15 @@ pub(crate) const MAX_DECOMPRESSED_LEN: usize = 1500;
 
 impl Interface {
     /// Process fragments that still need to be sent for 6LoWPAN packets.
-    ///
-    /// This function returns a boolean value indicating whether any packets were
-    /// processed or emitted, and thus, whether the readiness of any socket might
-    /// have changed.
     #[cfg(feature = "proto-sixlowpan-fragmentation")]
-    pub(super) fn sixlowpan_egress<D>(&mut self, device: &mut D) -> bool
-    where
-        D: Device + ?Sized,
-    {
+    pub(super) fn sixlowpan_egress(&mut self, device: &mut (impl Device + ?Sized)) {
         // Reset the buffer when we transmitted everything.
         if self.fragmenter.finished() {
             self.fragmenter.reset();
         }
 
         if self.fragmenter.is_empty() {
-            return false;
+            return;
         }
 
         let pkt = &self.fragmenter;
@@ -30,10 +23,8 @@ impl Interface {
             if let Some(tx_token) = device.transmit(self.inner.now) {
                 self.inner
                     .dispatch_ieee802154_frag(tx_token, &mut self.fragmenter);
-                return true;
             }
         }
-        false
     }
 
     /// Get the 6LoWPAN address contexts.