Browse Source

Merge pull request #394 from smoltcp-rs/async

Async/await waker support.
Dario Nieuwenhuis 4 năm trước cách đây
mục cha
commit
73bb62a2b0
8 tập tin đã thay đổi với 307 bổ sung11 xóa
  1. 2 2
      .github/workflows/test.yml
  2. 3 1
      Cargo.toml
  3. 66 3
      src/socket/icmp.rs
  4. 5 0
      src/socket/mod.rs
  5. 57 1
      src/socket/raw.rs
  6. 75 1
      src/socket/tcp.rs
  7. 66 3
      src/socket/udp.rs
  8. 33 0
      src/socket/waker.rs

+ 2 - 2
.github/workflows/test.yml

@@ -34,7 +34,7 @@ jobs:
           - std ethernet proto-ipv6 socket-icmp socket-tcp
 
           # Test features chosen to be as aggressive as possible.
-          - std ethernet proto-ipv4 proto-ipv6 socket-raw socket-udp socket-tcp socket-icmp
+          - std ethernet proto-ipv4 proto-ipv6 socket-raw socket-udp socket-tcp socket-icmp async
 
         include:
           # Test alloc feature which requires nightly.
@@ -66,7 +66,7 @@ jobs:
 
         features:
           # These feature sets cannot run tests, so we only check they build.
-          - ethernet proto-ipv6 proto-ipv6 proto-igmp proto-dhcpv4 socket-raw socket-udp socket-tcp socket-icmp
+          - ethernet proto-ipv6 proto-ipv6 proto-igmp proto-dhcpv4 socket-raw socket-udp socket-tcp socket-icmp async
 
     steps:
       - uses: actions/checkout@v2

+ 3 - 1
Cargo.toml

@@ -42,12 +42,14 @@ ethernet = []
 "socket-udp" = []
 "socket-tcp" = []
 "socket-icmp" = []
+"async" = []
 default = [
   "std", "log", # needed for `cargo test --no-default-features --features default` :/
   "ethernet",
   "phy-raw_socket", "phy-tap_interface",
   "proto-ipv4", "proto-igmp", "proto-dhcpv4", "proto-ipv6",
-  "socket-raw", "socket-icmp", "socket-udp", "socket-tcp"
+  "socket-raw", "socket-icmp", "socket-udp", "socket-tcp",
+  "async"
 ]
 
 # experimental; do not use; no guarantees provided that this feature will be kept

+ 66 - 3
src/socket/icmp.rs

@@ -5,6 +5,10 @@ use phy::{ChecksumCapabilities, DeviceCapabilities};
 use socket::{Socket, SocketMeta, SocketHandle, PollAt};
 use storage::{PacketBuffer, PacketMetadata};
 use wire::{IpAddress, IpEndpoint, IpProtocol, IpRepr};
+#[cfg(feature = "async")]
+use socket::WakerRegistration;
+#[cfg(feature = "async")]
+use core::task::Waker;
 
 #[cfg(feature = "proto-ipv4")]
 use wire::{Ipv4Address, Ipv4Repr, Icmpv4Packet, Icmpv4Repr};
@@ -61,7 +65,11 @@ pub struct IcmpSocket<'a, 'b: 'a> {
     /// The endpoint this socket is communicating with
     endpoint:  Endpoint,
     /// The time-to-live (IPv4) or hop limit (IPv6) value used in outgoing packets.
-    hop_limit: Option<u8>
+    hop_limit: Option<u8>,
+    #[cfg(feature = "async")]
+    rx_waker: WakerRegistration,
+    #[cfg(feature = "async")]
+    tx_waker: WakerRegistration,
 }
 
 impl<'a, 'b> IcmpSocket<'a, 'b> {
@@ -73,10 +81,49 @@ impl<'a, 'b> IcmpSocket<'a, 'b> {
             rx_buffer: rx_buffer,
             tx_buffer: tx_buffer,
             endpoint:  Endpoint::default(),
-            hop_limit: None
+            hop_limit: None,
+            #[cfg(feature = "async")]
+            rx_waker: WakerRegistration::new(),
+            #[cfg(feature = "async")]
+            tx_waker: WakerRegistration::new(),
         }
     }
 
+    /// Register a waker for receive operations.
+    ///
+    /// The waker is woken on state changes that might affect the return value
+    /// of `recv` method calls, such as receiving data, or the socket closing.
+    /// 
+    /// Notes:
+    ///
+    /// - Only one waker can be registered at a time. If another waker was previously registered,
+    ///   it is overwritten and will no longer be woken.
+    /// - The Waker is woken only once. Once woken, you must register it again to receive more wakes. 
+    /// - "Spurious wakes" are allowed: a wake doesn't guarantee the result of `recv` has
+    ///   necessarily changed.
+    #[cfg(feature = "async")]
+    pub fn register_recv_waker(&mut self, waker: &Waker) {
+        self.rx_waker.register(waker)
+    }
+
+    /// Register a waker for send operations.
+    ///
+    /// The waker is woken on state changes that might affect the return value
+    /// of `send` method calls, such as space becoming available in the transmit
+    /// buffer, or the socket closing.
+    /// 
+    /// Notes:
+    ///
+    /// - Only one waker can be registered at a time. If another waker was previously registered,
+    ///   it is overwritten and will no longer be woken.
+    /// - The Waker is woken only once. Once woken, you must register it again to receive more wakes. 
+    /// - "Spurious wakes" are allowed: a wake doesn't guarantee the result of `send` has
+    ///   necessarily changed.
+    #[cfg(feature = "async")]
+    pub fn register_send_waker(&mut self, waker: &Waker) {
+        self.tx_waker.register(waker)
+    }
+
     /// Return the socket handle.
     #[inline]
     pub fn handle(&self) -> SocketHandle {
@@ -174,6 +221,13 @@ impl<'a, 'b> IcmpSocket<'a, 'b> {
         if self.is_open() { return Err(Error::Illegal) }
 
         self.endpoint = endpoint;
+
+        #[cfg(feature = "async")]
+        {
+            self.rx_waker.wake();
+            self.tx_waker.wake();
+        }
+
         Ok(())
     }
 
@@ -339,6 +393,10 @@ impl<'a, 'b> IcmpSocket<'a, 'b> {
                            self.meta.handle, icmp_repr.buffer_len(), packet_buf.len());
             },
         }
+
+        #[cfg(feature = "async")]
+        self.rx_waker.wake();
+
         Ok(())
     }
 
@@ -380,7 +438,12 @@ impl<'a, 'b> IcmpSocket<'a, 'b> {
                 },
                 _ => Err(Error::Unaddressable)
             }
-        })
+        })?;
+        
+        #[cfg(feature = "async")]
+        self.tx_waker.wake();
+
+        Ok(())
     }
 
     pub(crate) fn poll_at(&self) -> PollAt {

+ 5 - 0
src/socket/mod.rs

@@ -26,7 +26,12 @@ mod tcp;
 mod set;
 mod ref_;
 
+#[cfg(feature = "async")]
+mod waker;
+
 pub(crate) use self::meta::Meta as SocketMeta;
+#[cfg(feature = "async")]
+pub(crate) use self::waker::WakerRegistration;
 
 #[cfg(feature = "socket-raw")]
 pub use self::raw::{RawPacketMetadata,

+ 57 - 1
src/socket/raw.rs

@@ -9,6 +9,10 @@ use wire::{IpVersion, IpRepr, IpProtocol};
 use wire::{Ipv4Repr, Ipv4Packet};
 #[cfg(feature = "proto-ipv6")]
 use wire::{Ipv6Repr, Ipv6Packet};
+#[cfg(feature = "async")]
+use socket::WakerRegistration;
+#[cfg(feature = "async")]
+use core::task::Waker;
 
 /// A UDP packet metadata.
 pub type RawPacketMetadata = PacketMetadata<()>;
@@ -27,6 +31,10 @@ pub struct RawSocket<'a, 'b: 'a> {
     ip_protocol: IpProtocol,
     rx_buffer:   RawSocketBuffer<'a, 'b>,
     tx_buffer:   RawSocketBuffer<'a, 'b>,
+    #[cfg(feature = "async")]
+    rx_waker: WakerRegistration,
+    #[cfg(feature = "async")]
+    tx_waker: WakerRegistration,
 }
 
 impl<'a, 'b> RawSocket<'a, 'b> {
@@ -41,9 +49,48 @@ impl<'a, 'b> RawSocket<'a, 'b> {
             ip_protocol,
             rx_buffer,
             tx_buffer,
+            #[cfg(feature = "async")]
+            rx_waker: WakerRegistration::new(),
+            #[cfg(feature = "async")]
+            tx_waker: WakerRegistration::new(),
         }
     }
 
+    /// Register a waker for receive operations.
+    ///
+    /// The waker is woken on state changes that might affect the return value
+    /// of `recv` method calls, such as receiving data, or the socket closing.
+    /// 
+    /// Notes:
+    ///
+    /// - Only one waker can be registered at a time. If another waker was previously registered,
+    ///   it is overwritten and will no longer be woken.
+    /// - The Waker is woken only once. Once woken, you must register it again to receive more wakes. 
+    /// - "Spurious wakes" are allowed: a wake doesn't guarantee the result of `recv` has
+    ///   necessarily changed.
+    #[cfg(feature = "async")]
+    pub fn register_recv_waker(&mut self, waker: &Waker) {
+        self.rx_waker.register(waker)
+    }
+
+    /// Register a waker for send operations.
+    ///
+    /// The waker is woken on state changes that might affect the return value
+    /// of `send` method calls, such as space becoming available in the transmit
+    /// buffer, or the socket closing.
+    /// 
+    /// Notes:
+    ///
+    /// - Only one waker can be registered at a time. If another waker was previously registered,
+    ///   it is overwritten and will no longer be woken.
+    /// - The Waker is woken only once. Once woken, you must register it again to receive more wakes. 
+    /// - "Spurious wakes" are allowed: a wake doesn't guarantee the result of `send` has
+    ///   necessarily changed.
+    #[cfg(feature = "async")]
+    pub fn register_send_waker(&mut self, waker: &Waker) {
+        self.tx_waker.register(waker)
+    }
+
     /// Return the socket handle.
     #[inline]
     pub fn handle(&self) -> SocketHandle {
@@ -171,6 +218,10 @@ impl<'a, 'b> RawSocket<'a, 'b> {
         net_trace!("{}:{}:{}: receiving {} octets",
                    self.meta.handle, self.ip_version, self.ip_protocol,
                    packet_buf.len());
+
+        #[cfg(feature = "async")]
+        self.rx_waker.wake();
+
         Ok(())
     }
 
@@ -228,7 +279,12 @@ impl<'a, 'b> RawSocket<'a, 'b> {
                     Ok(())
                 }
             }
-        })
+        })?;
+
+        #[cfg(feature = "async")]
+        self.tx_waker.wake();
+
+        Ok(())
     }
 
     pub(crate) fn poll_at(&self) -> PollAt {

+ 75 - 1
src/socket/tcp.rs

@@ -10,6 +10,10 @@ use time::{Duration, Instant};
 use socket::{Socket, SocketMeta, SocketHandle, PollAt};
 use storage::{Assembler, RingBuffer};
 use wire::{IpProtocol, IpRepr, IpAddress, IpEndpoint, TcpSeqNumber, TcpRepr, TcpControl};
+#[cfg(feature = "async")]
+use socket::WakerRegistration;
+#[cfg(feature = "async")]
+use core::task::Waker;
 
 /// A TCP socket ring buffer.
 pub type SocketBuffer<'a> = RingBuffer<'a, u8>;
@@ -248,6 +252,12 @@ pub struct TcpSocket<'a> {
     /// The number of packets recived directly after
     /// each other which have the same ACK number.
     local_rx_dup_acks: u8,
+
+    #[cfg(feature = "async")]
+    rx_waker: WakerRegistration,
+    #[cfg(feature = "async")]
+    tx_waker: WakerRegistration,
+
 }
 
 const DEFAULT_MSS: usize = 536;
@@ -298,9 +308,49 @@ impl<'a> TcpSocket<'a> {
             local_rx_last_ack: None,
             local_rx_last_seq: None,
             local_rx_dup_acks: 0,
+
+            #[cfg(feature = "async")]
+            rx_waker: WakerRegistration::new(),
+            #[cfg(feature = "async")]
+            tx_waker: WakerRegistration::new(),
         }
     }
 
+    /// Register a waker for receive operations.
+    ///
+    /// The waker is woken on state changes that might affect the return value
+    /// of `recv` method calls, such as receiving data, or the socket closing.
+    /// 
+    /// Notes:
+    ///
+    /// - Only one waker can be registered at a time. If another waker was previously registered,
+    ///   it is overwritten and will no longer be woken.
+    /// - The Waker is woken only once. Once woken, you must register it again to receive more wakes. 
+    /// - "Spurious wakes" are allowed: a wake doesn't guarantee the result of `recv` has
+    ///   necessarily changed.
+    #[cfg(feature = "async")]
+    pub fn register_recv_waker(&mut self, waker: &Waker) {
+        self.rx_waker.register(waker)
+    }
+
+    /// Register a waker for send operations.
+    ///
+    /// The waker is woken on state changes that might affect the return value
+    /// of `send` method calls, such as space becoming available in the transmit
+    /// buffer, or the socket closing.
+    /// 
+    /// Notes:
+    ///
+    /// - Only one waker can be registered at a time. If another waker was previously registered,
+    ///   it is overwritten and will no longer be woken.
+    /// - The Waker is woken only once. Once woken, you must register it again to receive more wakes. 
+    /// - "Spurious wakes" are allowed: a wake doesn't guarantee the result of `send` has
+    ///   necessarily changed.
+    #[cfg(feature = "async")]
+    pub fn register_send_waker(&mut self, waker: &Waker) {
+        self.tx_waker.register(waker)
+    }
+
     /// Return the socket handle.
     #[inline]
     pub fn handle(&self) -> SocketHandle {
@@ -438,6 +488,12 @@ impl<'a> TcpSocket<'a> {
         self.remote_win_shift = rx_cap_log2.saturating_sub(16) as u8;
         self.remote_mss      = DEFAULT_MSS;
         self.remote_last_ts  = None;
+
+        #[cfg(feature = "async")]
+        {
+            self.rx_waker.wake();
+            self.tx_waker.wake();
+        }
     }
 
     /// Start listening on the given endpoint.
@@ -825,7 +881,17 @@ impl<'a> TcpSocket<'a> {
                            self.state, state);
             }
         }
-        self.state = state
+
+        self.state = state;
+
+        #[cfg(feature = "async")]
+        {
+            // Wake all tasks waiting. Even if we haven't received/sent data, this
+            // is needed because return values of functions may change depending on the state.
+            // For example, a pending read has to fail with an error if the socket is closed.
+            self.rx_waker.wake();
+            self.tx_waker.wake();
+        }
     }
 
     pub(crate) fn reply(ip_repr: &IpRepr, repr: &TcpRepr) -> (IpRepr, TcpRepr<'static>) {
@@ -1283,6 +1349,10 @@ impl<'a> TcpSocket<'a> {
                        self.meta.handle, self.local_endpoint, self.remote_endpoint,
                        ack_len, self.tx_buffer.len() - ack_len);
             self.tx_buffer.dequeue_allocated(ack_len);
+
+            // There's new room available in tx_buffer, wake the waiting task if any.
+            #[cfg(feature = "async")]
+            self.tx_waker.wake();
         }
 
         if let Some(ack_number) = repr.ack_number {
@@ -1367,6 +1437,10 @@ impl<'a> TcpSocket<'a> {
                        self.meta.handle, self.local_endpoint, self.remote_endpoint,
                        contig_len, self.rx_buffer.len() + contig_len);
             self.rx_buffer.enqueue_unallocated(contig_len);
+
+            // There's new data in rx_buffer, notify waiting task if any.
+            #[cfg(feature = "async")]
+            self.rx_waker.wake();
         }
 
         if !self.assembler.is_empty() {

+ 66 - 3
src/socket/udp.rs

@@ -4,6 +4,10 @@ use {Error, Result};
 use socket::{Socket, SocketMeta, SocketHandle, PollAt};
 use storage::{PacketBuffer, PacketMetadata};
 use wire::{IpProtocol, IpRepr, IpEndpoint, UdpRepr};
+#[cfg(feature = "async")]
+use socket::WakerRegistration;
+#[cfg(feature = "async")]
+use core::task::Waker;
 
 /// A UDP packet metadata.
 pub type UdpPacketMetadata = PacketMetadata<IpEndpoint>;
@@ -22,7 +26,11 @@ pub struct UdpSocket<'a, 'b: 'a> {
     rx_buffer: UdpSocketBuffer<'a, 'b>,
     tx_buffer: UdpSocketBuffer<'a, 'b>,
     /// The time-to-live (IPv4) or hop limit (IPv6) value used in outgoing packets.
-    hop_limit: Option<u8>
+    hop_limit: Option<u8>,
+    #[cfg(feature = "async")]
+    rx_waker: WakerRegistration,
+    #[cfg(feature = "async")]
+    tx_waker: WakerRegistration,
 }
 
 impl<'a, 'b> UdpSocket<'a, 'b> {
@@ -34,10 +42,49 @@ impl<'a, 'b> UdpSocket<'a, 'b> {
             endpoint:  IpEndpoint::default(),
             rx_buffer: rx_buffer,
             tx_buffer: tx_buffer,
-            hop_limit: None
+            hop_limit: None,
+            #[cfg(feature = "async")]
+            rx_waker: WakerRegistration::new(),
+            #[cfg(feature = "async")]
+            tx_waker: WakerRegistration::new(),
         }
     }
 
+    /// Register a waker for receive operations.
+    ///
+    /// The waker is woken on state changes that might affect the return value
+    /// of `recv` method calls, such as receiving data, or the socket closing.
+    /// 
+    /// Notes:
+    ///
+    /// - Only one waker can be registered at a time. If another waker was previously registered,
+    ///   it is overwritten and will no longer be woken.
+    /// - The Waker is woken only once. Once woken, you must register it again to receive more wakes. 
+    /// - "Spurious wakes" are allowed: a wake doesn't guarantee the result of `recv` has
+    ///   necessarily changed.
+    #[cfg(feature = "async")]
+    pub fn register_recv_waker(&mut self, waker: &Waker) {
+        self.rx_waker.register(waker)
+    }
+
+    /// Register a waker for send operations.
+    ///
+    /// The waker is woken on state changes that might affect the return value
+    /// of `send` method calls, such as space becoming available in the transmit
+    /// buffer, or the socket closing.
+    /// 
+    /// Notes:
+    ///
+    /// - Only one waker can be registered at a time. If another waker was previously registered,
+    ///   it is overwritten and will no longer be woken.
+    /// - The Waker is woken only once. Once woken, you must register it again to receive more wakes. 
+    /// - "Spurious wakes" are allowed: a wake doesn't guarantee the result of `send` has
+    ///   necessarily changed.
+    #[cfg(feature = "async")]
+    pub fn register_send_waker(&mut self, waker: &Waker) {
+        self.tx_waker.register(waker)
+    }
+
     /// Return the socket handle.
     #[inline]
     pub fn handle(&self) -> SocketHandle {
@@ -89,6 +136,13 @@ impl<'a, 'b> UdpSocket<'a, 'b> {
         if self.is_open() { return Err(Error::Illegal) }
 
         self.endpoint = endpoint;
+
+        #[cfg(feature = "async")]
+        {
+            self.rx_waker.wake();
+            self.tx_waker.wake();
+        }
+
         Ok(())
     }
 
@@ -234,6 +288,10 @@ impl<'a, 'b> UdpSocket<'a, 'b> {
         net_trace!("{}:{}:{}: receiving {} octets",
                    self.meta.handle, self.endpoint,
                    endpoint, size);
+
+        #[cfg(feature = "async")]
+        self.rx_waker.wake();
+
         Ok(())
     }
 
@@ -261,7 +319,12 @@ impl<'a, 'b> UdpSocket<'a, 'b> {
                 hop_limit:   hop_limit,
             };
             emit((ip_repr, repr))
-        })
+        })?;
+
+        #[cfg(feature = "async")]
+        self.tx_waker.wake();
+
+        Ok(())
     }
 
     pub(crate) fn poll_at(&self) -> PollAt {

+ 33 - 0
src/socket/waker.rs

@@ -0,0 +1,33 @@
+use core::task::Waker;
+
+/// Utility struct to register and wake a waker.
+#[derive(Debug)]
+pub struct WakerRegistration {
+    waker: Option<Waker>,
+}
+
+impl WakerRegistration {
+    pub const fn new() -> Self {
+        Self { waker: None }
+    }
+
+    /// Register a waker. Overwrites the previous waker, if any.
+    pub fn register(&mut self, w: &Waker) {
+        match self.waker {
+            // Optimization: If both the old and new Wakers wake the same task, we can simply
+            // keep the old waker, skipping the clone. (In most executor implementations,
+            // cloning a waker is somewhat expensive, comparable to cloning an Arc).
+            Some(ref w2) if (w2.will_wake(w)) => {}
+            // In all other cases
+            // - we have no waker registered
+            // - we have a waker registered but it's for a different task.
+            // then clone the new waker and store it
+            _ => self.waker = Some(w.clone()),
+        }
+    }
+
+    /// Wake the registered waker, if any.
+    pub fn wake(&mut self) {
+        self.waker.take().map(|w| w.wake());
+    }
+}