Explorar o código

aya-log: Replace AsyncPerfEventArray with RingBuf

This doesn't get us to zero copy because the reserve/submit APIs do not
support DSTs for reasons I don't remember.

Now that it is unused in userspace, move `LOG_BUF_CAPACITY` to
`aya-log-ebpf` by making its type `LogValueLength` which obviates the
need for `log_value_length_sufficient`.
Tamir Duberstein hai 3 días
pai
achega
9be2d723ce

+ 0 - 17
aya-log-common/src/lib.rs

@@ -7,8 +7,6 @@ use core::{
 
 use num_enum::IntoPrimitive;
 
-pub const LOG_BUF_CAPACITY: usize = 8192;
-
 pub const LOG_FIELDS: usize = 6;
 
 pub type LogValueLength = u16;
@@ -329,18 +327,3 @@ pub fn write_record_header(
     }
     NonZeroUsize::new(size)
 }
-
-#[cfg(test)]
-mod test {
-    use super::*;
-
-    #[test]
-    fn log_value_length_sufficient() {
-        assert!(
-            LOG_BUF_CAPACITY <= LogValueLength::MAX.into(),
-            "{} > {}",
-            LOG_BUF_CAPACITY,
-            LogValueLength::MAX
-        );
-    }
-}

+ 3 - 2
aya-log-ebpf-macros/src/expand.rs

@@ -154,6 +154,8 @@ pub(crate) fn log(args: LogArgs, level: Option<TokenStream>) -> Result<TokenStre
         match ::aya_log_ebpf::macro_support::AYA_LOG_BUF.get_ptr_mut(0).and_then(|ptr| unsafe { ptr.as_mut() }) {
             None => {},
             Some(::aya_log_ebpf::macro_support::LogBuf { buf: #buf }) => {
+                // Silence unused variable warning; we may need ctx in the future.
+                let _ = #ctx;
                 let _: Option<()> = (|| {
                     let #size = ::aya_log_ebpf::macro_support::write_record_header(
                         #buf,
@@ -173,8 +175,7 @@ pub(crate) fn log(args: LogArgs, level: Option<TokenStream>) -> Result<TokenStre
                         }
                     )*
                     let #record = #buf.get(..#size)?;
-                    ::aya_log_ebpf::macro_support::AYA_LOGS.output(#ctx, #record, 0);
-                    Some(())
+                    Result::<_, i64>::ok(::aya_log_ebpf::macro_support::AYA_LOGS.output(#record, 0))
                 })();
             }
         }

+ 7 - 0
aya-log/CHANGELOG.md

@@ -5,6 +5,13 @@ All notable changes to this project will be documented in this file.
 The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
 and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
 
+## [Unreleased]
+
+### Breaking Changes
+
+- The implementation is now backed by a ring buffer rather than a perf event array. This should
+  improve performance but increases the minimum supported kernel version to 5.8.
+
 ## v0.2.1 (2024-10-09)
 
 ### Chore

+ 2 - 3
aya-log/Cargo.toml

@@ -17,12 +17,11 @@ rust-version.workspace = true
 workspace = true
 
 [dependencies]
-aya = { path = "../aya", version = "^0.13.1", features = ["async_tokio"] }
+aya = { path = "../aya", version = "^0.13.1", default-features = false }
 aya-log-common = { path = "../aya-log-common", version = "^0.1.15", default-features = false }
-bytes = { workspace = true }
 log = { workspace = true }
 thiserror = { workspace = true }
-tokio = { workspace = true, features = ["rt"] }
+tokio = { workspace = true, features = ["net", "rt"] }
 
 [dev-dependencies]
 env_logger = { workspace = true }

+ 25 - 43
aya-log/src/lib.rs

@@ -54,24 +54,16 @@ use std::{
     io, mem,
     net::{Ipv4Addr, Ipv6Addr},
     ptr, str,
-    sync::Arc,
 };
 
 const MAP_NAME: &str = "AYA_LOGS";
 
 use aya::{
     Ebpf, Pod,
-    maps::{
-        Map, MapData, MapError, MapInfo,
-        perf::{AsyncPerfEventArray, Events, PerfBufferError},
-    },
+    maps::{Map, MapData, MapError, MapInfo, RingBuf},
     programs::{ProgramError, loaded_programs},
-    util::online_cpus,
 };
-use aya_log_common::{
-    Argument, DisplayHint, LOG_BUF_CAPACITY, LOG_FIELDS, Level, LogValueLength, RecordField,
-};
-use bytes::BytesMut;
+use aya_log_common::{Argument, DisplayHint, LOG_FIELDS, Level, LogValueLength, RecordField};
 use log::{Log, Record, error};
 use thiserror::Error;
 
@@ -112,8 +104,7 @@ impl EbpfLogger {
         logger: T,
     ) -> Result<EbpfLogger, Error> {
         let map = bpf.take_map(MAP_NAME).ok_or(Error::MapNotFound)?;
-        Self::read_logs_async(map, logger)?;
-        Ok(EbpfLogger {})
+        Self::read_logs_async(map, logger)
     }
 
     /// Attaches to an existing `aya-log-ebpf` instance.
@@ -149,32 +140,26 @@ impl EbpfLogger {
             .ok_or(Error::MapNotFound)?;
         let map = MapData::from_id(map.id())?;
 
-        Self::read_logs_async(Map::PerfEventArray(map), logger)?;
-
-        Ok(EbpfLogger {})
+        Self::read_logs_async(Map::RingBuf(map), logger)
     }
 
-    fn read_logs_async<T: Log + 'static>(map: Map, logger: T) -> Result<(), Error> {
-        let mut logs: AsyncPerfEventArray<_> = map.try_into()?;
-
-        let logger = Arc::new(logger);
-        for cpu_id in online_cpus().map_err(|(_, error)| Error::InvalidOnlineCpu(error))? {
-            let mut buf = logs.open(cpu_id, None)?;
-
-            let log = logger.clone();
-            tokio::spawn(async move {
-                let mut buffers = vec![BytesMut::with_capacity(LOG_BUF_CAPACITY); 10];
-
-                loop {
-                    let Events { read, lost: _ } = buf.read_events(&mut buffers).await.unwrap();
-
-                    for buf in buffers.iter().take(read) {
-                        log_buf(buf.as_ref(), &*log).unwrap();
-                    }
+    fn read_logs_async<T: Log + 'static>(map: Map, logger: T) -> Result<Self, Error> {
+        let ring_buf: RingBuf<_> = map.try_into()?;
+        let mut async_fd =
+            tokio::io::unix::AsyncFd::with_interest(ring_buf, tokio::io::Interest::READABLE)
+                .map_err(Error::AsyncFdNew)?;
+
+        tokio::spawn(async move {
+            loop {
+                let mut guard = async_fd.readable_mut().await.unwrap();
+                while let Some(buf) = guard.get_inner_mut().next() {
+                    log_buf(buf.as_ref(), &logger).unwrap();
                 }
-            });
-        }
-        Ok(())
+                guard.clear_ready();
+            }
+        });
+
+        Ok(EbpfLogger {})
     }
 }
 
@@ -438,17 +423,14 @@ impl_format_float!(f64);
 
 #[derive(Error, Debug)]
 pub enum Error {
-    #[error("log event array {} doesn't exist", MAP_NAME)]
+    #[error("{} not found", MAP_NAME)]
     MapNotFound,
 
-    #[error("error opening log event array")]
+    #[error(transparent)]
     MapError(#[from] MapError),
 
-    #[error("error opening log buffer")]
-    PerfBufferError(#[from] PerfBufferError),
-
-    #[error("invalid /sys/devices/system/cpu/online format")]
-    InvalidOnlineCpu(#[source] io::Error),
+    #[error("tokio::io::unix::AsyncFd::new")]
+    AsyncFdNew(#[source] io::Error),
 
     #[error("program not found")]
     ProgramNotFound,
@@ -457,7 +439,7 @@ pub enum Error {
     ProgramError(#[from] ProgramError),
 }
 
-fn log_buf(mut buf: &[u8], logger: &dyn Log) -> Result<(), ()> {
+fn log_buf<T: ?Sized + Log>(mut buf: &[u8], logger: &T) -> Result<(), ()> {
     let mut target = None;
     let mut level = None;
     let mut module = None;

+ 6 - 4
ebpf/aya-log-ebpf/src/lib.rs

@@ -7,16 +7,18 @@ pub use aya_log_ebpf_macros::{debug, error, info, log, trace, warn};
 pub mod macro_support {
     #[cfg(target_arch = "bpf")]
     use aya_ebpf::macros::map;
-    use aya_ebpf::maps::{PerCpuArray, PerfEventByteArray};
-    use aya_log_common::LOG_BUF_CAPACITY;
+    use aya_ebpf::maps::{PerCpuArray, RingBuf};
+    use aya_log_common::LogValueLength;
     pub use aya_log_common::{
         DefaultFormatter, DisplayHint, IpFormatter, Level, LowerHexFormatter, LowerMacFormatter,
         UpperHexFormatter, UpperMacFormatter, WriteToBuf, write_record_header,
     };
 
+    const LOG_BUF_CAPACITY: LogValueLength = 8192;
+
     #[repr(C)]
     pub struct LogBuf {
-        pub buf: [u8; LOG_BUF_CAPACITY],
+        pub buf: [u8; LOG_BUF_CAPACITY as usize],
     }
 
     // This cfg_attr prevents compilation failures on macOS where the generated section name doesn't
@@ -31,5 +33,5 @@ pub mod macro_support {
     // because the integration-test crate depends on this crate transitively. See comment in
     // test/integration-test/Cargo.toml.
     #[cfg_attr(target_arch = "bpf", map)]
-    pub static AYA_LOGS: PerfEventByteArray = PerfEventByteArray::new(0);
+    pub static AYA_LOGS: RingBuf = RingBuf::with_byte_size((LOG_BUF_CAPACITY as u32) << 4, 0);
 }

+ 0 - 1
xtask/public-api/aya-log-common.txt

@@ -177,7 +177,6 @@ impl<T> core::clone::CloneToUninit for aya_log_common::RecordField where T: core
 pub unsafe fn aya_log_common::RecordField::clone_to_uninit(&self, dest: *mut u8)
 impl<T> core::convert::From<T> for aya_log_common::RecordField
 pub fn aya_log_common::RecordField::from(t: T) -> T
-pub const aya_log_common::LOG_BUF_CAPACITY: usize
 pub const aya_log_common::LOG_FIELDS: usize
 pub trait aya_log_common::DefaultFormatter
 impl aya_log_common::DefaultFormatter for &str

+ 1 - 4
xtask/public-api/aya-log.txt

@@ -1,15 +1,12 @@
 pub mod aya_log
 pub enum aya_log::Error
-pub aya_log::Error::InvalidOnlineCpu(std::io::error::Error)
+pub aya_log::Error::AsyncFdNew(std::io::error::Error)
 pub aya_log::Error::MapError(aya::maps::MapError)
 pub aya_log::Error::MapNotFound
-pub aya_log::Error::PerfBufferError(aya::maps::perf::perf_buffer::PerfBufferError)
 pub aya_log::Error::ProgramError(aya::programs::ProgramError)
 pub aya_log::Error::ProgramNotFound
 impl core::convert::From<aya::maps::MapError> for aya_log::Error
 pub fn aya_log::Error::from(source: aya::maps::MapError) -> Self
-impl core::convert::From<aya::maps::perf::perf_buffer::PerfBufferError> for aya_log::Error
-pub fn aya_log::Error::from(source: aya::maps::perf::perf_buffer::PerfBufferError) -> Self
 impl core::convert::From<aya::programs::ProgramError> for aya_log::Error
 pub fn aya_log::Error::from(source: aya::programs::ProgramError) -> Self
 impl core::error::Error for aya_log::Error