فهرست منبع

add fragmentation mechanism

Thibaut Vandervelden 3 سال پیش
والد
کامیت
53c46d78a9
7فایلهای تغییر یافته به همراه644 افزوده شده و 31 حذف شده
  1. 553 0
      src/iface/fragmentation.rs
  2. 2 0
      src/iface/interface.rs
  3. 7 0
      src/iface/mod.rs
  4. 36 0
      src/lib.rs
  5. 4 4
      src/socket/tcp.rs
  6. 2 2
      src/socket/udp.rs
  7. 40 25
      src/storage/assembler.rs

+ 553 - 0
src/iface/fragmentation.rs

@@ -0,0 +1,553 @@
+use managed::{ManagedMap, ManagedSlice};
+
+use crate::storage::Assembler;
+use crate::time::Instant;
+use crate::Error;
+use crate::Result;
+
+pub trait PacketAssemblerInfo: PartialEq {
+    /// Calculate a new offset based on some other information.
+    fn calc_offset(&self, offset: usize) -> usize;
+}
+
+#[derive(Debug, PartialEq)]
+pub struct NoInfo;
+
+impl PacketAssemblerInfo for NoInfo {
+    #[inline]
+    fn calc_offset(&self, offset: usize) -> usize {
+        offset
+    }
+}
+
+/// Holds different fragments of one packet, used for assembling fragmented packets.
+#[derive(Debug)]
+#[cfg_attr(feature = "defmt", derive(defmt::Format))]
+pub struct PacketAssembler<'a, Info: PacketAssemblerInfo = NoInfo> {
+    buffer: ManagedSlice<'a, u8>,
+    assembler: AssemblerState<Info>,
+}
+
+/// Holds the state of the assembling of one packet.
+#[derive(Debug, PartialEq)]
+#[cfg_attr(feature = "defmt", derive(defmt::Format))]
+enum AssemblerState<Info: PacketAssemblerInfo = NoInfo> {
+    NotInit,
+    Assembling {
+        assembler: Assembler,
+        total_size: usize,
+        info: Info,
+        last_updated: Instant,
+        started_on: Instant,
+    },
+}
+
+impl<'a, Info: PacketAssemblerInfo> PacketAssembler<'a, Info> {
+    /// Create a new empty buffer for fragments.
+    pub fn new<S>(storage: S) -> Self
+    where
+        S: Into<ManagedSlice<'a, u8>>,
+    {
+        let s = storage.into();
+        PacketAssembler {
+            buffer: s,
+            assembler: AssemblerState::NotInit,
+        }
+    }
+
+    /// Start with saving fragments.
+    /// We initialize the assembler with the total size of the final packet.
+    ///
+    /// # Errors
+    ///
+    /// - Returns [`Error::PacketAssemblerBufferTooSmall`] when the buffer is too smal for holding all the
+    /// fragments of a packet.
+    pub(crate) fn start(
+        &mut self,
+        total_size: usize,
+        info: Info,
+        start_time: Instant,
+    ) -> Result<()> {
+        match &mut self.buffer {
+            ManagedSlice::Borrowed(b) if b.len() < total_size => {
+                return Err(Error::PacketAssemblerBufferTooSmall);
+            }
+            ManagedSlice::Borrowed(_) => (),
+            #[cfg(any(feature = "std", feature = "alloc"))]
+            ManagedSlice::Owned(b) => {
+                b.resize(total_size, 0);
+            }
+        }
+
+        self.assembler = AssemblerState::Assembling {
+            assembler: Assembler::new(total_size),
+            total_size,
+            info,
+            last_updated: start_time,
+            started_on: start_time,
+        };
+
+        Ok(())
+    }
+
+    /// Add a fragment into the packet that is being reassembled.
+    ///
+    /// # Errors
+    ///
+    /// - Returns [`Error::PacketAssemblerNotInit`] when the assembler was not initialized (try initializing the
+    /// assembler with [Self::start]).
+    /// - Returns [`Error::PacketAssemblerBufferTooSmall`] when trying to add data into the buffer at a non-existing
+    /// place.
+    /// - Returns [`Error::PacketAssemblerOverlap`] when there was an overlap when adding data.
+    pub(crate) fn add(&mut self, data: &[u8], offset: usize, now: Instant) -> Result<bool> {
+        match self.assembler {
+            AssemblerState::NotInit => Err(Error::PacketAssemblerNotInit),
+            AssemblerState::Assembling {
+                ref mut assembler,
+                total_size,
+                ref info,
+                ref mut last_updated,
+                ..
+            } => {
+                let offset = info.calc_offset(offset);
+
+                if offset + data.len() > total_size {
+                    return Err(Error::PacketAssemblerBufferTooSmall);
+                }
+
+                let len = data.len();
+                self.buffer[offset..][..len].copy_from_slice(data);
+
+                match assembler.add(offset, data.len()) {
+                    Ok(false) => {
+                        *last_updated = now;
+                        self.is_complete()
+                    }
+                    Ok(true) => Err(Error::PacketAssemblerOverlap),
+                    // NOTE(thvdveld): hopefully we wont get too many holes errors I guess?
+                    Err(_) => Err(Error::PacketAssemblerTooManyHoles),
+                }
+            }
+        }
+    }
+
+    /// Get an immutable slice of the underlying packet data.
+    /// This will mark the assembler state as [`AssemblerState::NotInit`] such that it can be reused.
+    ///
+    /// # Errors
+    ///
+    /// - Returns [`Error::PacketAssemblerNotInit`] when the assembler was not initialized (try initializing the
+    /// assembler with [`Self::start`]).
+    /// - Returns [`Error::PacketAssemblerIncomplete`] when not all the fragments have been collected.
+    pub(crate) fn assemble(&mut self) -> Result<&'_ [u8]> {
+        let b = match self.assembler {
+            AssemblerState::NotInit => return Err(Error::PacketAssemblerNotInit),
+            AssemblerState::Assembling { total_size, .. } => {
+                if self.is_complete()? {
+                    let a = &self.buffer[..total_size];
+                    self.assembler = AssemblerState::NotInit;
+                    a
+                } else {
+                    return Err(Error::PacketAssemblerIncomplete);
+                }
+            }
+        };
+        self.assembler = AssemblerState::NotInit;
+        Ok(b)
+    }
+
+    /// Returns `true` when all fragments have been received, otherwise `false`.
+    ///
+    /// # Errors
+    ///
+    /// - Returns [`Error::PacketAssemblerNotInit`] when the assembler was not initialized (try initializing the
+    /// assembler with [`Self::start`]).
+    pub(crate) fn is_complete(&self) -> Result<bool> {
+        match &self.assembler {
+            AssemblerState::NotInit => Err(Error::PacketAssemblerNotInit),
+            AssemblerState::Assembling {
+                assembler,
+                total_size,
+                ..
+            } => {
+                if let Some(front) = assembler.peek_front() {
+                    Ok(front == *total_size)
+                } else {
+                    Ok(false)
+                }
+            }
+        }
+    }
+
+    /// Returns `true` when the packet assembler is free to use.
+    fn is_free(&self) -> bool {
+        self.assembler == AssemblerState::NotInit
+    }
+
+    /// Returns the [`Instant`] when the packet assembler was started.
+    ///
+    /// # Errors
+    ///
+    /// - Returns [`Error::PacketAssemblerNotInit`] when the packet assembler was not initialized.
+    pub fn start_time(&self) -> Result<Instant> {
+        match self.assembler {
+            AssemblerState::NotInit => Err(Error::PacketAssemblerNotInit),
+            AssemblerState::Assembling { started_on, .. } => Ok(started_on),
+        }
+    }
+
+    /// Returns the [`Instant`] when the packet assembler was last updated.
+    ///
+    /// # Errors
+    ///
+    /// - Returns [`Error::PacketAssemblerNotInit`] when the packet assembler was not initialized.
+    pub fn last_update_time(&self) -> Result<Instant> {
+        match self.assembler {
+            AssemblerState::NotInit => Err(Error::PacketAssemblerNotInit),
+            AssemblerState::Assembling { last_updated, .. } => Ok(last_updated),
+        }
+    }
+
+    /// Mark this assembler as [`AssemblerState::NotInit`].
+    /// This is then cleaned up by the [`PacketAssemblerSet`].
+    pub fn mark_discarded(&mut self) {
+        self.assembler = AssemblerState::NotInit;
+    }
+}
+
+/// Set holding multiple [`PacketAssembler`].
+#[derive(Debug)]
+#[cfg_attr(feature = "defmt", derive(defmt::Format))]
+pub struct PacketAssemblerSet<'a, Key: Eq + Ord + Clone + Copy, Info: PacketAssemblerInfo> {
+    packet_buffer: ManagedSlice<'a, PacketAssembler<'a, Info>>,
+    index_buffer: ManagedMap<'a, Key, u8>,
+}
+
+impl<'a, K: Eq + Ord + Clone + Copy, Info: PacketAssemblerInfo> PacketAssemblerSet<'a, K, Info> {
+    /// Create a new set of packet assemblers.
+    ///
+    /// # Panics
+    ///
+    /// This will panic when:
+    ///   - The packet buffer and index buffer don't have the same size or are empty (when they are
+    ///   both borrowed).
+    ///   - The packet buffer is empty (when only the packet buffer is borrowed).
+    ///   - The index buffer is empty (when only the index buffer is borrowed).
+    pub fn new<FB, IB>(packet_buffer: FB, index_buffer: IB) -> Self
+    where
+        FB: Into<ManagedSlice<'a, PacketAssembler<'a, Info>>>,
+        IB: Into<ManagedMap<'a, K, u8>>,
+    {
+        let packet_buffer = packet_buffer.into();
+        let index_buffer = index_buffer.into();
+
+        match (&packet_buffer, &index_buffer) {
+            (ManagedSlice::Borrowed(f), ManagedMap::Borrowed(i)) => {
+                if f.len() != i.len() {
+                    panic!("The amount of places in the index buffer must be the same as the amount of possible fragments assemblers.");
+                }
+            }
+            #[cfg(any(feature = "std", feature = "alloc"))]
+            (ManagedSlice::Borrowed(f), ManagedMap::Owned(_)) => {
+                if f.is_empty() {
+                    panic!("The packet buffer cannot be empty.");
+                }
+            }
+            #[cfg(any(feature = "std", feature = "alloc"))]
+            (ManagedSlice::Owned(_), ManagedMap::Borrowed(i)) => {
+                if i.is_empty() {
+                    panic!("The index buffer cannot be empty.");
+                }
+            }
+            #[cfg(any(feature = "std", feature = "alloc"))]
+            (ManagedSlice::Owned(_), ManagedMap::Owned(_)) => (),
+        }
+
+        Self {
+            packet_buffer,
+            index_buffer,
+        }
+    }
+
+    /// Reserve a [`PacketAssembler`], which is linked to a specific key.
+    /// Returns the reserved fragments assembler.
+    ///
+    /// # Errors
+    ///
+    /// - Returns [`Error::PacketAssemblerSetFull`] when every [`PacketAssembler`] in the buffer is used (only
+    /// when the non allocating version of is used).
+    pub(crate) fn reserve_with_key(&mut self, key: &K) -> Result<&mut PacketAssembler<'a, Info>> {
+        if self.packet_buffer.len() == self.index_buffer.len() {
+            match &mut self.packet_buffer {
+                ManagedSlice::Borrowed(_) => return Err(Error::PacketAssemblerSetFull),
+                #[cfg(any(feature = "std", feature = "alloc"))]
+                ManagedSlice::Owned(b) => {
+                    b.resize_with(self.index_buffer.len() + 1, || {
+                        PacketAssembler::new(Vec::new())
+                    });
+                }
+            }
+        }
+
+        let i = self
+            .get_free_packet_assembler()
+            .ok_or(Error::PacketAssemblerSetFull)?;
+
+        // NOTE(thvdveld): this should not fail because we already checked the available space.
+        match self.index_buffer.insert(*key, i as u8) {
+            Ok(_) => Ok(&mut self.packet_buffer[i]),
+            Err(_) => unreachable!(),
+        }
+    }
+
+    /// Return the first free packet assembler available from the cache.
+    fn get_free_packet_assembler(&self) -> Option<usize> {
+        self.packet_buffer
+            .iter()
+            .enumerate()
+            .find(|(_, b)| b.is_free())
+            .map(|(i, _)| i)
+    }
+
+    /// Return a mutable slice to a packet assembler.
+    ///
+    /// # Errors
+    ///
+    /// - Returns [`Error::PacketAssemblerSetKeyNotFound`] when the key was not found in the set.
+    pub(crate) fn get_packet_assembler_mut(
+        &mut self,
+        key: &K,
+    ) -> Result<&mut PacketAssembler<'a, Info>> {
+        if let Some(i) = self.index_buffer.get(key) {
+            Ok(&mut self.packet_buffer[*i as usize])
+        } else {
+            Err(Error::PacketAssemblerSetKeyNotFound)
+        }
+    }
+
+    /// Return the assembled packet from a packet assembler.
+    /// This also removes it from the set.
+    ///
+    /// # Errors
+    ///
+    /// - Returns [`Error::PacketAssemblerSetKeyNotFound`] when the `key` was not found.
+    /// - Returns [`Error::PacketAssemblerIncomplete`] when the fragments assembler was empty or not fully assembled.
+    pub(crate) fn get_assembled_packet(&mut self, key: &K) -> Result<&[u8]> {
+        if let Some(i) = self.index_buffer.get(key) {
+            let p = self.packet_buffer[*i as usize].assemble()?;
+            self.index_buffer.remove(key);
+            Ok(p)
+        } else {
+            Err(Error::PacketAssemblerSetKeyNotFound)
+        }
+    }
+
+    /// Remove all [`PacketAssembler`]s that are marked as discared.
+    pub fn remove_discarded(&mut self) {
+        loop {
+            let mut key = None;
+            for (k, i) in self.index_buffer.iter() {
+                if self.packet_buffer[*i as usize].assembler == AssemblerState::NotInit {
+                    key = Some(*k);
+                    break;
+                }
+            }
+
+            if let Some(k) = key {
+                self.index_buffer.remove(&k);
+            } else {
+                break;
+            }
+        }
+    }
+
+    /// Remove all [`PacketAssembler`]s for which `f` returns `Ok(true)`.
+    pub fn remove_when(
+        &mut self,
+        f: impl Fn(&mut PacketAssembler<'_, Info>) -> Result<bool>,
+    ) -> Result<()> {
+        for (_, i) in &mut self.index_buffer.iter() {
+            let frag = &mut self.packet_buffer[*i as usize];
+            if f(frag)? {
+                frag.mark_discarded();
+            }
+        }
+        self.remove_discarded();
+
+        Ok(())
+    }
+}
+
+#[cfg(feature = "proto-sixlowpan")]
+pub mod sixlowpan {
+    #[derive(Debug, PartialEq)]
+    pub struct SixlowpanAssemblerInfo {
+        header_size: usize,
+    }
+
+    impl SixlowpanAssemblerInfo {
+        pub fn new(header_size: usize) -> Self {
+            SixlowpanAssemblerInfo { header_size }
+        }
+    }
+
+    impl super::PacketAssemblerInfo for SixlowpanAssemblerInfo {
+        #[inline]
+        fn calc_offset(&self, offset: usize) -> usize {
+            match offset {
+                0 => 0,
+                offset => offset - self.header_size,
+            }
+        }
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    #[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Copy)]
+    struct Key {
+        id: usize,
+    }
+
+    #[test]
+    fn packet_assembler_not_init() {
+        let mut p_assembler = PacketAssembler::<NoInfo>::new(vec![]);
+        let data = b"Hello World!";
+        assert_eq!(
+            p_assembler.add(&data[..], data.len(), Instant::now()),
+            Err(Error::PacketAssemblerNotInit)
+        );
+
+        assert_eq!(
+            p_assembler.is_complete(),
+            Err(Error::PacketAssemblerNotInit)
+        );
+        assert_eq!(p_assembler.assemble(), Err(Error::PacketAssemblerNotInit));
+    }
+
+    #[test]
+    fn packet_assembler_buffer_too_small() {
+        let mut storage = [0u8; 1];
+        let mut p_assembler = PacketAssembler::<NoInfo>::new(&mut storage[..]);
+
+        assert_eq!(
+            p_assembler.start(2, NoInfo, Instant::now()),
+            Err(Error::PacketAssemblerBufferTooSmall)
+        );
+        assert_eq!(p_assembler.start(1, NoInfo, Instant::now()), Ok(()));
+
+        let data = b"Hello World!";
+        assert_eq!(
+            p_assembler.add(&data[..], data.len(), Instant::now()),
+            Err(Error::PacketAssemblerBufferTooSmall)
+        );
+    }
+
+    #[test]
+    fn packet_assembler_overlap() {
+        let mut storage = [0u8; 5];
+        let mut p_assembler = PacketAssembler::new(&mut storage[..]);
+
+        p_assembler.start(5, NoInfo, Instant::now()).unwrap();
+        let data = b"Rust";
+
+        p_assembler.add(&data[..], 0, Instant::now()).unwrap();
+
+        assert_eq!(
+            p_assembler.add(&data[..], 1, Instant::now()),
+            Err(Error::PacketAssemblerOverlap),
+        );
+    }
+
+    #[test]
+    fn packet_assembler_assemble() {
+        let mut storage = [0u8; 12];
+        let mut p_assembler = PacketAssembler::new(&mut storage[..]);
+
+        let data = b"Hello World!";
+
+        p_assembler
+            .start(data.len(), NoInfo, Instant::now())
+            .unwrap();
+
+        p_assembler.add(b"Hello ", 0, Instant::now()).unwrap();
+        assert_eq!(
+            p_assembler.assemble(),
+            Err(Error::PacketAssemblerIncomplete)
+        );
+
+        p_assembler
+            .add(b"World!", b"Hello ".len(), Instant::now())
+            .unwrap();
+
+        assert_eq!(p_assembler.assemble(), Ok(&b"Hello World!"[..]));
+    }
+
+    #[test]
+    fn packet_assembler_set() {
+        let key = Key { id: 1 };
+
+        let mut set =
+            PacketAssemblerSet::<'_, _, NoInfo>::new(vec![], std::collections::BTreeMap::new());
+
+        if let Err(e) = set.get_packet_assembler_mut(&key) {
+            assert_eq!(e, Error::PacketAssemblerSetKeyNotFound);
+        }
+
+        assert!(set.reserve_with_key(&key).is_ok());
+    }
+
+    #[test]
+    fn packet_assembler_set_borrowed() {
+        let mut buf = [0u8, 127];
+        let mut packet_assembler_cache = [PacketAssembler::<'_, NoInfo>::new(&mut buf[..])];
+        let mut packet_index_cache = [None];
+
+        let key = Key { id: 1 };
+
+        let mut set =
+            PacketAssemblerSet::new(&mut packet_assembler_cache[..], &mut packet_index_cache[..]);
+
+        if let Err(e) = set.get_packet_assembler_mut(&key) {
+            assert_eq!(e, Error::PacketAssemblerSetKeyNotFound);
+        }
+
+        assert!(set.reserve_with_key(&key).is_ok());
+    }
+
+    #[test]
+    fn packet_assembler_set_assembling_many() {
+        let mut buf = [0u8, 127];
+        let mut packet_assembler_cache = [PacketAssembler::new(&mut buf[..])];
+        let mut packet_index_cache = [None];
+
+        let mut set =
+            PacketAssemblerSet::new(&mut packet_assembler_cache[..], &mut packet_index_cache[..]);
+
+        let key = Key { id: 0 };
+        set.reserve_with_key(&key).unwrap();
+        set.get_packet_assembler_mut(&key)
+            .unwrap()
+            .start(0, NoInfo, Instant::now())
+            .unwrap();
+        set.get_assembled_packet(&key).unwrap();
+
+        let key = Key { id: 1 };
+        set.reserve_with_key(&key).unwrap();
+        set.get_packet_assembler_mut(&key)
+            .unwrap()
+            .start(0, NoInfo, Instant::now())
+            .unwrap();
+        set.get_assembled_packet(&key).unwrap();
+
+        let key = Key { id: 2 };
+        set.reserve_with_key(&key).unwrap();
+        set.get_packet_assembler_mut(&key)
+            .unwrap()
+            .start(0, NoInfo, Instant::now())
+            .unwrap();
+        set.get_assembled_packet(&key).unwrap();
+    }
+}

+ 2 - 0
src/iface/interface.rs

@@ -5,6 +5,8 @@
 use core::cmp;
 use managed::{ManagedMap, ManagedSlice};
 
+#[cfg(feature = "proto-sixlowpan")]
+use super::fragmentation::{sixlowpan::SixlowpanAssemblerInfo, PacketAssemblerSet};
 use super::socket_set::SocketSet;
 use super::{SocketHandle, SocketStorage};
 use crate::iface::Routes;

+ 7 - 0
src/iface/mod.rs

@@ -4,6 +4,8 @@ The `iface` module deals with the *network interfaces*. It filters incoming fram
 provides lookup and caching of hardware addresses, and handles management packets.
 */
 
+#[cfg(feature = "proto-sixlowpan")]
+mod fragmentation;
 mod interface;
 #[cfg(any(feature = "medium-ethernet", feature = "medium-ieee802154"))]
 mod neighbor;
@@ -20,4 +22,9 @@ pub use self::neighbor::Neighbor;
 pub use self::route::{Route, Routes};
 pub use socket_set::{SocketHandle, SocketStorage};
 
+#[cfg(feature = "proto-sixlowpan")]
+pub use self::fragmentation::{
+    sixlowpan::SixlowpanAssemblerInfo, PacketAssembler, PacketAssemblerSet as FragmentsCache,
+};
+
 pub use self::interface::{Interface, InterfaceBuilder, InterfaceInner as Context};

+ 36 - 0
src/lib.rs

@@ -186,6 +186,25 @@ pub enum Error {
     /// An incoming packet was recognized but contradicted internal state.
     /// E.g. a TCP packet addressed to a socket that doesn't exist.
     Dropped,
+    /// An incoming fragment arrived too late.
+    ReassemblyTimeout,
+
+    /// The packet assembler is not initialized, thus it cannot know what the final size of the
+    /// packet would be.
+    PacketAssemblerNotInit,
+    /// The buffer of the assembler is to small and thus the final packet wont fit into it.
+    PacketAssemblerBufferTooSmall,
+    /// The packet assembler did not receive all the fragments for assembling the final packet.
+    PacketAssemblerIncomplete,
+    /// There are too many holes in the packet assembler (should be fixed in the future?).
+    PacketAssemblerTooManyHoles,
+    /// There was an overlap when adding data to the packet assembler.
+    PacketAssemblerOverlap,
+
+    /// The packet assembler set has no place for assembling a new stream of fragments.
+    PacketAssemblerSetFull,
+    /// The key was not found in the packet assembler set.
+    PacketAssemblerSetKeyNotFound,
 
     /// An incoming packet was recognized but some parts are not supported by smoltcp.
     /// E.g. some bit configuration in a packet header is not supported, but is defined in an RFC.
@@ -211,6 +230,23 @@ impl fmt::Display for Error {
             Error::Fragmented => write!(f, "fragmented packet"),
             Error::Malformed => write!(f, "malformed packet"),
             Error::Dropped => write!(f, "dropped by socket"),
+            Error::ReassemblyTimeout => write!(f, "incoming fragment arrived too late"),
+            Error::PacketAssemblerNotInit => write!(f, "packet assembler was not initialized"),
+            Error::PacketAssemblerBufferTooSmall => {
+                write!(f, "packet assembler buffer too small for final packet")
+            }
+            Error::PacketAssemblerIncomplete => write!(f, "packet assembler incomplete"),
+            Error::PacketAssemblerTooManyHoles => write!(
+                f,
+                "packet assembler has too many holes (internal smoltcp error)"
+            ),
+            Error::PacketAssemblerOverlap => {
+                write!(f, "overlap when adding data to packet assembler")
+            }
+            Error::PacketAssemblerSetFull => write!(f, "packet assembler set is full"),
+            Error::PacketAssemblerSetKeyNotFound => {
+                write!(f, "packet assembler set does not find key")
+            }
             Error::NotSupported => write!(f, "not supported by smoltcp"),
         }
     }

+ 4 - 4
src/socket/tcp.rs

@@ -261,7 +261,7 @@ impl Timer {
             Timer::Idle { .. } | Timer::FastRetransmit { .. } => {
                 *self = Timer::Retransmit {
                     expires_at: timestamp + delay,
-                    delay: delay,
+                    delay,
                 }
             }
             Timer::Retransmit { expires_at, delay } if timestamp >= expires_at => {
@@ -414,8 +414,8 @@ impl<'a> TcpSocket<'a> {
             timer: Timer::new(),
             rtte: RttEstimator::default(),
             assembler: Assembler::new(rx_buffer.capacity()),
-            tx_buffer: tx_buffer,
-            rx_buffer: rx_buffer,
+            tx_buffer,
+            rx_buffer,
             rx_fin_received: false,
             timeout: None,
             keep_alive: None,
@@ -1829,7 +1829,7 @@ impl<'a> TcpSocket<'a> {
 
         // Try adding payload octets to the assembler.
         match self.assembler.add(payload_offset, payload_len) {
-            Ok(()) => {
+            Ok(_) => {
                 debug_assert!(self.assembler.total_size() == self.rx_buffer.capacity());
                 // Place payload octets into the buffer.
                 net_trace!(

+ 2 - 2
src/socket/udp.rs

@@ -38,8 +38,8 @@ impl<'a> UdpSocket<'a> {
     pub fn new(rx_buffer: UdpSocketBuffer<'a>, tx_buffer: UdpSocketBuffer<'a>) -> UdpSocket<'a> {
         UdpSocket {
             endpoint: IpEndpoint::default(),
-            rx_buffer: rx_buffer,
-            tx_buffer: tx_buffer,
+            rx_buffer,
+            tx_buffer,
             hop_limit: None,
             #[cfg(feature = "async")]
             rx_waker: WakerRegistration::new(),

+ 40 - 25
src/storage/assembler.rs

@@ -94,8 +94,7 @@ const CONTIG_COUNT: usize = 4;
 /// A buffer (re)assembler.
 ///
 /// Currently, up to a hardcoded limit of 4 or 32 holes can be tracked in the buffer.
-#[derive(Debug)]
-#[cfg_attr(test, derive(PartialEq, Eq, Clone))]
+#[derive(Debug, PartialEq, Eq, Clone)]
 #[cfg_attr(feature = "defmt", derive(defmt::Format))]
 pub struct Assembler {
     #[cfg(not(any(feature = "std", feature = "alloc")))]
@@ -139,6 +138,16 @@ impl Assembler {
         self.contigs[0]
     }
 
+    /// Return length of the front contiguous range without removing it from the assembler
+    pub fn peek_front(&self) -> Option<usize> {
+        let front = self.front();
+        if front.has_hole() {
+            None
+        } else {
+            Some(front.data_size)
+        }
+    }
+
     fn back(&self) -> Contig {
         self.contigs[self.contigs.len() - 1]
     }
@@ -182,10 +191,12 @@ impl Assembler {
         Ok(&mut self.contigs[at])
     }
 
-    /// Add a new contiguous range to the assembler, and return `Ok(())`,
+    /// Add a new contiguous range to the assembler, and return `Ok(bool)`,
     /// or return `Err(())` if too many discontiguities are already recorded.
-    pub fn add(&mut self, mut offset: usize, mut size: usize) -> Result<(), TooManyHolesError> {
+    /// Returns `Ok(true)` when there was an overlap.
+    pub fn add(&mut self, mut offset: usize, mut size: usize) -> Result<bool, TooManyHolesError> {
         let mut index = 0;
+        let mut overlap = size;
         while index != self.contigs.len() && size != 0 {
             let contig = self.contigs[index];
 
@@ -196,6 +207,7 @@ impl Assembler {
                 // The range being added covers the entire hole in this contig, merge it
                 // into the previous config.
                 self.contigs[index - 1].expand_data_by(contig.total_size());
+                overlap -= contig.total_size();
                 self.remove_contig_at(index);
                 index += 0;
             } else if offset == 0 && size < contig.hole_size && index > 0 {
@@ -204,10 +216,12 @@ impl Assembler {
                 // the previous contig.
                 self.contigs[index - 1].expand_data_by(size);
                 self.contigs[index].shrink_hole_by(size);
+                overlap -= size;
                 index += 1;
             } else if offset <= contig.hole_size && offset + size >= contig.hole_size {
                 // The range being added covers both a part of the hole and a part of the data
                 // in this contig, shrink the hole in this contig.
+                overlap -= contig.hole_size - offset;
                 self.contigs[index].shrink_hole_to(offset);
                 index += 1;
             } else if offset + size >= contig.hole_size {
@@ -219,6 +233,7 @@ impl Assembler {
                 {
                     let inserted = self.add_contig_at(index)?;
                     *inserted = Contig::hole_and_data(offset, size);
+                    overlap -= size;
                 }
                 // Previous contigs[index] got moved to contigs[index+1]
                 self.contigs[index + 1].shrink_hole_by(offset + size);
@@ -237,7 +252,7 @@ impl Assembler {
         }
 
         debug_assert!(size == 0);
-        Ok(())
+        Ok(overlap != 0)
     }
 
     /// Remove a contiguous range from the front of the assembler and `Some(data_size)`,
@@ -280,8 +295,8 @@ pub struct AssemblerIter<'a> {
 impl<'a> AssemblerIter<'a> {
     fn new(assembler: &'a Assembler, offset: usize) -> AssemblerIter<'a> {
         AssemblerIter {
-            assembler: assembler,
-            offset: offset,
+            assembler,
+            offset,
             index: 0,
             left: 0,
             right: 0,
@@ -348,84 +363,84 @@ mod test {
     #[test]
     fn test_empty_add_full() {
         let mut assr = Assembler::new(16);
-        assert_eq!(assr.add(0, 16), Ok(()));
+        assert_eq!(assr.add(0, 16), Ok(false));
         assert_eq!(assr, contigs![(0, 16)]);
     }
 
     #[test]
     fn test_empty_add_front() {
         let mut assr = Assembler::new(16);
-        assert_eq!(assr.add(0, 4), Ok(()));
+        assert_eq!(assr.add(0, 4), Ok(false));
         assert_eq!(assr, contigs![(0, 4), (12, 0)]);
     }
 
     #[test]
     fn test_empty_add_back() {
         let mut assr = Assembler::new(16);
-        assert_eq!(assr.add(12, 4), Ok(()));
+        assert_eq!(assr.add(12, 4), Ok(false));
         assert_eq!(assr, contigs![(12, 4)]);
     }
 
     #[test]
     fn test_empty_add_mid() {
         let mut assr = Assembler::new(16);
-        assert_eq!(assr.add(4, 8), Ok(()));
+        assert_eq!(assr.add(4, 8), Ok(false));
         assert_eq!(assr, contigs![(4, 8), (4, 0)]);
     }
 
     #[test]
     fn test_partial_add_front() {
         let mut assr = contigs![(4, 8), (4, 0)];
-        assert_eq!(assr.add(0, 4), Ok(()));
+        assert_eq!(assr.add(0, 4), Ok(false));
         assert_eq!(assr, contigs![(0, 12), (4, 0)]);
     }
 
     #[test]
     fn test_partial_add_back() {
         let mut assr = contigs![(4, 8), (4, 0)];
-        assert_eq!(assr.add(12, 4), Ok(()));
+        assert_eq!(assr.add(12, 4), Ok(false));
         assert_eq!(assr, contigs![(4, 12)]);
     }
 
     #[test]
     fn test_partial_add_front_overlap() {
         let mut assr = contigs![(4, 8), (4, 0)];
-        assert_eq!(assr.add(0, 8), Ok(()));
+        assert_eq!(assr.add(0, 8), Ok(true));
         assert_eq!(assr, contigs![(0, 12), (4, 0)]);
     }
 
     #[test]
     fn test_partial_add_front_overlap_split() {
         let mut assr = contigs![(4, 8), (4, 0)];
-        assert_eq!(assr.add(2, 6), Ok(()));
+        assert_eq!(assr.add(2, 6), Ok(true));
         assert_eq!(assr, contigs![(2, 10), (4, 0)]);
     }
 
     #[test]
     fn test_partial_add_back_overlap() {
         let mut assr = contigs![(4, 8), (4, 0)];
-        assert_eq!(assr.add(8, 8), Ok(()));
+        assert_eq!(assr.add(8, 8), Ok(true));
         assert_eq!(assr, contigs![(4, 12)]);
     }
 
     #[test]
     fn test_partial_add_back_overlap_split() {
         let mut assr = contigs![(4, 8), (4, 0)];
-        assert_eq!(assr.add(10, 4), Ok(()));
+        assert_eq!(assr.add(10, 4), Ok(true));
         assert_eq!(assr, contigs![(4, 10), (2, 0)]);
     }
 
     #[test]
     fn test_partial_add_both_overlap() {
         let mut assr = contigs![(4, 8), (4, 0)];
-        assert_eq!(assr.add(0, 16), Ok(()));
+        assert_eq!(assr.add(0, 16), Ok(true));
         assert_eq!(assr, contigs![(0, 16)]);
     }
 
     #[test]
     fn test_partial_add_both_overlap_split() {
         let mut assr = contigs![(4, 8), (4, 0)];
-        assert_eq!(assr.add(2, 12), Ok(()));
+        assert_eq!(assr.add(2, 12), Ok(true));
         assert_eq!(assr, contigs![(2, 12), (2, 0)]);
     }
 
@@ -433,7 +448,7 @@ mod test {
     fn test_rejected_add_keeps_state() {
         let mut assr = Assembler::new(CONTIG_COUNT * 20);
         for c in 1..=CONTIG_COUNT - 1 {
-            assert_eq!(assr.add(c * 10, 3), Ok(()));
+            assert_eq!(assr.add(c * 10, 3), Ok(false));
         }
         // Maximum of allowed holes is reached
         let assr_before = assr.clone();
@@ -471,7 +486,7 @@ mod test {
     #[test]
     fn test_iter_full() {
         let mut assr = Assembler::new(16);
-        assert_eq!(assr.add(0, 16), Ok(()));
+        assert_eq!(assr.add(0, 16), Ok(false));
         let segments: Vec<_> = assr.iter_data(10).collect();
         assert_eq!(segments, vec![(10, 26)]);
     }
@@ -479,7 +494,7 @@ mod test {
     #[test]
     fn test_iter_offset() {
         let mut assr = Assembler::new(16);
-        assert_eq!(assr.add(0, 16), Ok(()));
+        assert_eq!(assr.add(0, 16), Ok(false));
         let segments: Vec<_> = assr.iter_data(100).collect();
         assert_eq!(segments, vec![(100, 116)]);
     }
@@ -487,7 +502,7 @@ mod test {
     #[test]
     fn test_iter_one_front() {
         let mut assr = Assembler::new(16);
-        assert_eq!(assr.add(0, 4), Ok(()));
+        assert_eq!(assr.add(0, 4), Ok(false));
         let segments: Vec<_> = assr.iter_data(10).collect();
         assert_eq!(segments, vec![(10, 14)]);
     }
@@ -495,7 +510,7 @@ mod test {
     #[test]
     fn test_iter_one_back() {
         let mut assr = Assembler::new(16);
-        assert_eq!(assr.add(12, 4), Ok(()));
+        assert_eq!(assr.add(12, 4), Ok(false));
         let segments: Vec<_> = assr.iter_data(10).collect();
         assert_eq!(segments, vec![(22, 26)]);
     }
@@ -503,7 +518,7 @@ mod test {
     #[test]
     fn test_iter_one_mid() {
         let mut assr = Assembler::new(16);
-        assert_eq!(assr.add(4, 8), Ok(()));
+        assert_eq!(assr.add(4, 8), Ok(false));
         let segments: Vec<_> = assr.iter_data(10).collect();
         assert_eq!(segments, vec![(14, 22)]);
     }