Browse Source

maps: add AsyncPerfMap

When the async_tokio or async_std features are enabled, AsyncPerfMap
provides an async version of PerfMap which returns a future from
read_events()
Alessandro Decina 4 years ago
parent
commit
fdc4dad5ff

+ 11 - 1
aya/Cargo.toml

@@ -11,6 +11,16 @@ object = "0.23"
 bytes = "1"
 lazy_static = "1"
 parking_lot = { version = "0.11.1", features = ["send_guard"] }
+futures = "0.3.12"
+tokio = { version = "1.2.0", features = ["macros", "rt", "rt-multi-thread", "net"], optional = true }
+async-std = { version = "1.9.0", optional = true }
+async-io = { version = "1.3", optional = true }
 
 [dev-dependencies]
-matches = "0.1.8"
+matches = "0.1.8"
+
+[features]
+default = []
+async = []
+async_tokio = ["tokio", "async"]
+async_std = ["async-std", "async-io", "async"]

+ 108 - 0
aya/src/maps/perf_map/async_perf_map.rs

@@ -0,0 +1,108 @@
+use bytes::BytesMut;
+use std::{
+    convert::TryFrom,
+    ops::DerefMut,
+    os::unix::prelude::{AsRawFd, RawFd},
+};
+
+#[cfg(feature = "async_std")]
+use async_io::Async;
+
+#[cfg(feature = "async_tokio")]
+use tokio::io::unix::AsyncFd;
+
+use crate::maps::{
+    perf_map::{Events, PerfBufferError, PerfMap, PerfMapBuffer, PerfMapError},
+    Map, MapRefMut,
+};
+
+pub struct AsyncPerfMap<T: DerefMut<Target = Map>> {
+    perf_map: PerfMap<T>,
+}
+
+impl<T: DerefMut<Target = Map>> AsyncPerfMap<T> {
+    pub fn open(
+        &mut self,
+        index: u32,
+        page_count: Option<usize>,
+    ) -> Result<AsyncPerfMapBuffer<T>, PerfMapError> {
+        let buf = self.perf_map.open(index, page_count)?;
+        let fd = buf.as_raw_fd();
+        Ok(AsyncPerfMapBuffer {
+            buf,
+
+            #[cfg(feature = "async_tokio")]
+            async_fd: AsyncFd::new(fd)?,
+
+            #[cfg(feature = "async_std")]
+            async_fd: Async::new(fd)?,
+        })
+    }
+}
+
+impl<T: DerefMut<Target = Map>> AsyncPerfMap<T> {
+    fn new(map: T) -> Result<AsyncPerfMap<T>, PerfMapError> {
+        Ok(AsyncPerfMap {
+            perf_map: PerfMap::new(map)?,
+        })
+    }
+}
+
+pub struct AsyncPerfMapBuffer<T: DerefMut<Target = Map>> {
+    buf: PerfMapBuffer<T>,
+
+    #[cfg(feature = "async_tokio")]
+    async_fd: AsyncFd<RawFd>,
+
+    #[cfg(feature = "async_std")]
+    async_fd: Async<RawFd>,
+}
+
+#[cfg(feature = "async_tokio")]
+impl<T: DerefMut<Target = Map>> AsyncPerfMapBuffer<T> {
+    pub async fn read_events(
+        &mut self,
+        buffers: &mut [BytesMut],
+    ) -> Result<Events, PerfBufferError> {
+        loop {
+            let mut guard = self.async_fd.readable_mut().await?;
+
+            match self.buf.read_events(buffers) {
+                Ok(events) if events.read > 0 || events.lost > 0 => return Ok(events),
+                Ok(_) => {
+                    guard.clear_ready();
+                    continue;
+                }
+                Err(e) => return Err(e),
+            }
+        }
+    }
+}
+
+#[cfg(feature = "async_std")]
+impl<T: DerefMut<Target = Map>> AsyncPerfMapBuffer<T> {
+    pub async fn read_events(
+        &mut self,
+        buffers: &mut [BytesMut],
+    ) -> Result<Events, PerfBufferError> {
+        loop {
+            if !self.buf.readable() {
+                let _ = self.async_fd.readable().await?;
+            }
+
+            match self.buf.read_events(buffers) {
+                Ok(events) if events.read > 0 || events.lost > 0 => return Ok(events),
+                Ok(_) => continue,
+                Err(e) => return Err(e),
+            }
+        }
+    }
+}
+
+impl TryFrom<MapRefMut> for AsyncPerfMap<MapRefMut> {
+    type Error = PerfMapError;
+
+    fn try_from(a: MapRefMut) -> Result<AsyncPerfMap<MapRefMut>, PerfMapError> {
+        AsyncPerfMap::new(a)
+    }
+}

+ 4 - 0
aya/src/maps/perf_map/mod.rs

@@ -1,5 +1,9 @@
+#[cfg(feature = "async")]
+mod async_perf_map;
 mod perf_buffer;
 mod perf_map;
 
+#[cfg(feature = "async")]
+pub use async_perf_map::*;
 pub use perf_buffer::*;
 pub use perf_map::*;

+ 11 - 1
aya/src/maps/perf_map/perf_buffer.rs

@@ -105,7 +105,17 @@ impl PerfBuffer {
         Ok(perf_buf)
     }
 
-    pub fn read_events(&mut self, buffers: &mut [BytesMut]) -> Result<Events, PerfBufferError> {
+    pub(crate) fn readable(&self) -> bool {
+        let header = self.buf.load(Ordering::SeqCst);
+        let head = unsafe { (*header).data_head } as usize;
+        let tail = unsafe { (*header).data_tail } as usize;
+        head != tail
+    }
+
+    pub(crate) fn read_events(
+        &mut self,
+        buffers: &mut [BytesMut],
+    ) -> Result<Events, PerfBufferError> {
         if buffers.is_empty() {
             return Err(PerfBufferError::NoBuffers);
         }

+ 4 - 0
aya/src/maps/perf_map/perf_map.rs

@@ -45,6 +45,10 @@ pub struct PerfMapBuffer<T: DerefMut<Target = Map>> {
 }
 
 impl<T: DerefMut<Target = Map>> PerfMapBuffer<T> {
+    pub fn readable(&self) -> bool {
+        self.buf.readable()
+    }
+
     pub fn read_events(&mut self, buffers: &mut [BytesMut]) -> Result<Events, PerfBufferError> {
         self.buf.read_events(buffers)
     }