123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417 |
- use std::{
- mem,
- os::fd::AsRawFd as _,
- sync::{
- atomic::{AtomicBool, Ordering},
- Arc,
- },
- thread,
- };
- use anyhow::Context as _;
- use assert_matches::assert_matches;
- use aya::{
- maps::{array::PerCpuArray, ring_buf::RingBuf, MapData},
- programs::UProbe,
- Ebpf, EbpfLoader,
- };
- use aya_obj::generated::BPF_RINGBUF_HDR_SZ;
- use integration_common::ring_buf::Registers;
- use rand::Rng as _;
- use test_log::test;
- use tokio::{
- io::unix::AsyncFd,
- time::{sleep, Duration},
- };
- struct RingBufTest {
- _bpf: Ebpf,
- ring_buf: RingBuf<MapData>,
- regs: PerCpuArray<MapData, Registers>,
- }
- // Note that it is important for this test that RING_BUF_MAX_ENTRIES ends up creating a ring buffer
- // that is exactly a power-of-two multiple of the page size. The synchronous test will fail if
- // that's not the case because the actual size will be rounded up, and fewer entries will be dropped
- // than expected.
- const RING_BUF_MAX_ENTRIES: usize = 512;
- impl RingBufTest {
- fn new() -> Self {
- const RING_BUF_BYTE_SIZE: u32 =
- (RING_BUF_MAX_ENTRIES * (mem::size_of::<u64>() + BPF_RINGBUF_HDR_SZ as usize)) as u32;
- // Use the loader API to control the size of the ring_buf.
- let mut bpf = EbpfLoader::new()
- .set_max_entries("RING_BUF", RING_BUF_BYTE_SIZE)
- .load(crate::RING_BUF)
- .unwrap();
- let ring_buf = bpf.take_map("RING_BUF").unwrap();
- let ring_buf = RingBuf::try_from(ring_buf).unwrap();
- let regs = bpf.take_map("REGISTERS").unwrap();
- let regs = PerCpuArray::<_, Registers>::try_from(regs).unwrap();
- let prog: &mut UProbe = bpf
- .program_mut("ring_buf_test")
- .unwrap()
- .try_into()
- .unwrap();
- prog.load().unwrap();
- prog.attach(
- "ring_buf_trigger_ebpf_program",
- "/proc/self/exe",
- None,
- None,
- )
- .unwrap();
- Self {
- _bpf: bpf,
- ring_buf,
- regs,
- }
- }
- }
- struct WithData(RingBufTest, Vec<u64>);
- impl WithData {
- fn new(n: usize) -> Self {
- Self(RingBufTest::new(), {
- let mut rng = rand::thread_rng();
- std::iter::repeat_with(|| rng.gen()).take(n).collect()
- })
- }
- }
- #[test_case::test_case(0; "write zero items")]
- #[test_case::test_case(1; "write one item")]
- #[test_case::test_case(RING_BUF_MAX_ENTRIES / 2; "write half the capacity items")]
- #[test_case::test_case(RING_BUF_MAX_ENTRIES - 1; "write one less than capacity items")]
- #[test_case::test_case(RING_BUF_MAX_ENTRIES * 8; "write more items than capacity")]
- fn ring_buf(n: usize) {
- let WithData(
- RingBufTest {
- mut ring_buf,
- regs,
- _bpf,
- },
- data,
- ) = WithData::new(n);
- // Note that after expected_capacity has been submitted, reserve calls in the probe will fail
- // and the probe will give up.
- let expected_capacity = RING_BUF_MAX_ENTRIES - 1;
- // Call the function that the uprobe is attached to with the data.
- let mut expected = Vec::new();
- let mut expected_rejected = 0u64;
- let mut expected_dropped = 0u64;
- for (i, &v) in data.iter().enumerate() {
- ring_buf_trigger_ebpf_program(v);
- if i >= expected_capacity {
- expected_dropped += 1;
- } else if v % 2 == 0 {
- expected.push(v);
- } else {
- expected_rejected += 1;
- }
- }
- let mut seen = Vec::<u64>::new();
- while seen.len() < expected.len() {
- if let Some(read) = ring_buf.next() {
- let read: [u8; 8] = (*read)
- .try_into()
- .with_context(|| format!("data: {:?}", read.len()))
- .unwrap();
- let arg = u64::from_ne_bytes(read);
- assert_eq!(arg % 2, 0, "got {arg} from probe");
- seen.push(arg);
- }
- }
- // Make sure that there is nothing else in the ring_buf.
- assert_matches!(ring_buf.next(), None);
- // Ensure that the data that was read matches what was passed, and the rejected count was set
- // properly.
- assert_eq!(seen, expected);
- let Registers { dropped, rejected } = regs.get(&0, 0).unwrap().iter().sum();
- assert_eq!(dropped, expected_dropped);
- assert_eq!(rejected, expected_rejected);
- }
- #[no_mangle]
- #[inline(never)]
- pub extern "C" fn ring_buf_trigger_ebpf_program(arg: u64) {
- std::hint::black_box(arg);
- }
- // This test differs from the other async test in that it's possible for the producer
- // to fill the ring_buf. We just ensure that the number of events we see is sane given
- // what the producer sees, and that the logic does not hang. This exercises interleaving
- // discards, successful commits, and drops due to the ring_buf being full.
- #[test(tokio::test(flavor = "multi_thread"))]
- async fn ring_buf_async_with_drops() {
- let WithData(
- RingBufTest {
- ring_buf,
- regs,
- _bpf,
- },
- data,
- ) = WithData::new(RING_BUF_MAX_ENTRIES * 8);
- let mut async_fd = AsyncFd::new(ring_buf).unwrap();
- // Spawn the writer which internally will spawn many parallel writers.
- // Construct an AsyncFd from the RingBuf in order to receive readiness notifications.
- let mut seen = 0;
- let mut process_ring_buf = |ring_buf: &mut RingBuf<_>| {
- while let Some(read) = ring_buf.next() {
- let read: [u8; 8] = (*read)
- .try_into()
- .with_context(|| format!("data: {:?}", read.len()))
- .unwrap();
- let arg = u64::from_ne_bytes(read);
- assert_eq!(arg % 2, 0, "got {arg} from probe");
- seen += 1;
- }
- };
- use futures::future::{
- select,
- Either::{Left, Right},
- };
- let writer = futures::future::try_join_all(data.chunks(8).map(ToOwned::to_owned).map(|v| {
- tokio::spawn(async {
- for value in v {
- ring_buf_trigger_ebpf_program(value);
- }
- })
- }));
- let readable = {
- let mut writer = writer;
- loop {
- let readable = Box::pin(async_fd.readable_mut());
- writer = match select(readable, writer).await {
- Left((guard, writer)) => {
- let mut guard = guard.unwrap();
- process_ring_buf(guard.get_inner_mut());
- guard.clear_ready();
- writer
- }
- Right((writer, readable)) => {
- writer.unwrap();
- break readable;
- }
- }
- }
- };
- // If there's more to read, we should receive a readiness notification in a timely manner.
- // If we don't then, then assert that there's nothing else to read. Note that it's important
- // to wait some time before attempting to read, otherwise we may catch up with the producer
- // before epoll has an opportunity to send a notification; our consumer thread can race
- // with the kernel epoll check.
- let sleep_fut = sleep(Duration::from_millis(10));
- tokio::pin!(sleep_fut);
- match select(sleep_fut, readable).await {
- Left(((), _)) => {}
- Right((guard, _)) => {
- let mut guard = guard.unwrap();
- process_ring_buf(guard.get_inner_mut());
- guard.clear_ready();
- }
- }
- // Make sure that there is nothing else in the ring_buf.
- assert_matches!(async_fd.into_inner().next(), None);
- let max_dropped: u64 = u64::try_from(
- data.len()
- .checked_sub(RING_BUF_MAX_ENTRIES - 1)
- .unwrap_or_default(),
- )
- .unwrap();
- let max_seen = u64::try_from(data.iter().filter(|v| *v % 2 == 0).count()).unwrap();
- let max_rejected = u64::try_from(data.len()).unwrap() - max_seen;
- let Registers { dropped, rejected } = regs.get(&0, 0).unwrap().iter().sum();
- let total = u64::try_from(data.len()).unwrap();
- let min_seen = max_seen.checked_sub(max_dropped).unwrap_or_default();
- let min_rejected = max_rejected.checked_sub(dropped).unwrap_or_default();
- let facts = format!(
- "seen={seen}, rejected={rejected}, dropped={dropped}, total={total}, max_seen={max_seen}, \
- max_rejected={max_rejected}, max_dropped={max_dropped}",
- );
- assert_eq!(seen + rejected + dropped, total, "{facts}",);
- assert!(
- (0u64..=max_dropped).contains(&dropped),
- "dropped={dropped} not in 0..={max_dropped}; {facts}",
- );
- assert!(
- (min_rejected..=max_rejected).contains(&rejected),
- "rejected={rejected} not in {min_rejected}..={max_rejected}; {facts}",
- );
- assert!(
- (min_seen..=max_seen).contains(&seen),
- "seen={seen} not in {min_seen}..={max_seen}, rejected={rejected}; {facts}",
- );
- }
- #[test(tokio::test(flavor = "multi_thread"))]
- async fn ring_buf_async_no_drop() {
- let WithData(
- RingBufTest {
- ring_buf,
- regs,
- _bpf,
- },
- data,
- ) = WithData::new(RING_BUF_MAX_ENTRIES * 3);
- let writer = {
- let data = data.to_owned();
- tokio::spawn(async move {
- for value in data {
- // Sleep a tad so we feel confident that the consumer will keep up
- // and no messages will be dropped.
- let dur = Duration::from_nanos(rand::thread_rng().gen_range(0..10));
- sleep(dur).await;
- ring_buf_trigger_ebpf_program(value);
- }
- })
- };
- // Construct an AsyncFd from the RingBuf in order to receive readiness notifications.
- let mut async_fd = AsyncFd::new(ring_buf).unwrap();
- // Note that unlike in the synchronous case where all of the entries are written before any of
- // them are read, in this case we expect all of the entries to make their way to userspace
- // because entries are being consumed as they are produced.
- let expected: Vec<u64> = data.iter().cloned().filter(|v| *v % 2 == 0).collect();
- let expected_len = expected.len();
- let reader = async move {
- let mut seen = Vec::with_capacity(expected_len);
- while seen.len() < expected_len {
- let mut guard = async_fd.readable_mut().await.unwrap();
- let ring_buf = guard.get_inner_mut();
- while let Some(read) = ring_buf.next() {
- let read: [u8; 8] = (*read)
- .try_into()
- .with_context(|| format!("data: {:?}", read.len()))
- .unwrap();
- let arg = u64::from_ne_bytes(read);
- seen.push(arg);
- }
- guard.clear_ready();
- }
- (seen, async_fd.into_inner())
- };
- let (writer, (seen, mut ring_buf)) = futures::future::join(writer, reader).await;
- writer.unwrap();
- // Make sure that there is nothing else in the ring_buf.
- assert_matches!(ring_buf.next(), None);
- // Ensure that the data that was read matches what was passed.
- assert_eq!(&seen, &expected);
- let Registers { dropped, rejected } = regs.get(&0, 0).unwrap().iter().sum();
- assert_eq!(dropped, 0);
- assert_eq!(rejected, (data.len() - expected.len()).try_into().unwrap());
- }
- // This test reproduces a bug where the ring buffer would not be notified of new entries if the
- // state was not properly synchronized between the producer and consumer. This would result in the
- // consumer never being woken up and the test hanging.
- #[test]
- fn ring_buf_epoll_wakeup() {
- let RingBufTest {
- mut ring_buf,
- _bpf,
- regs: _,
- } = RingBufTest::new();
- let epoll_fd = epoll::create(false).unwrap();
- epoll::ctl(
- epoll_fd,
- epoll::ControlOptions::EPOLL_CTL_ADD,
- ring_buf.as_raw_fd(),
- // The use of EPOLLET is intentional. Without it, level-triggering would result in
- // more notifications, and would mask the underlying bug this test reproduced when
- // the synchronization logic in the RingBuf mirrored that of libbpf. Also, tokio's
- // AsyncFd always uses this flag (as demonstrated in the subsequent test).
- epoll::Event::new(epoll::Events::EPOLLIN | epoll::Events::EPOLLET, 0),
- )
- .unwrap();
- let mut epoll_event_buf = [epoll::Event::new(epoll::Events::EPOLLIN, 0); 1];
- let mut total_events: u64 = 0;
- let writer = WriterThread::spawn();
- while total_events < WriterThread::NUM_MESSAGES {
- epoll::wait(epoll_fd, -1, &mut epoll_event_buf).unwrap();
- while let Some(read) = ring_buf.next() {
- assert_eq!(read.len(), 8);
- total_events += 1;
- }
- }
- writer.join();
- }
- // This test is like the above test but uses tokio and AsyncFd instead of raw epoll.
- #[test(tokio::test)]
- async fn ring_buf_asyncfd_events() {
- let RingBufTest {
- ring_buf,
- regs: _,
- _bpf,
- } = RingBufTest::new();
- let mut async_fd = AsyncFd::new(ring_buf).unwrap();
- let mut total_events = 0;
- let writer = WriterThread::spawn();
- while total_events < WriterThread::NUM_MESSAGES {
- let mut guard = async_fd.readable_mut().await.unwrap();
- let rb = guard.get_inner_mut();
- while let Some(read) = rb.next() {
- assert_eq!(read.len(), 8);
- total_events += 1;
- }
- guard.clear_ready();
- }
- writer.join();
- }
- // WriterThread triggers the ring_buf write continuously until the join() method is called. It is
- // used by both the epoll and async fd test that need frequent writes to the ring buffer to trigger
- // the memory synchronization bug that was fixed.
- struct WriterThread {
- thread: thread::JoinHandle<()>,
- done: Arc<AtomicBool>,
- }
- impl WriterThread {
- // When the ring buffer implementation uses Ordering::Relaxed to write the consumer position
- // rather than Ordering::SeqCst, the test will hang. This number was determined to be large
- // enough to tickle that bug on a hardware accelerated VM with 2 vCPUs.
- const NUM_MESSAGES: u64 = 20_000;
- fn spawn() -> Self {
- let done = Arc::new(AtomicBool::new(false));
- Self {
- thread: {
- let done = done.clone();
- thread::spawn(move || {
- while !done.load(Ordering::Relaxed) {
- // Write 0 which is even and won't be rejected.
- ring_buf_trigger_ebpf_program(0);
- }
- })
- },
- done,
- }
- }
- fn join(self) {
- let Self { thread, done } = self;
- done.store(true, Ordering::Relaxed);
- thread.join().unwrap();
- }
- }
|