ring_buf.rs 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494
  1. use std::{
  2. mem,
  3. os::fd::AsRawFd as _,
  4. path::Path,
  5. sync::{
  6. Arc,
  7. atomic::{AtomicBool, Ordering},
  8. },
  9. thread,
  10. time::Duration,
  11. };
  12. use anyhow::Context as _;
  13. use assert_matches::assert_matches;
  14. use aya::{
  15. Ebpf, EbpfLoader,
  16. maps::{MapData, array::PerCpuArray, ring_buf::RingBuf},
  17. programs::UProbe,
  18. };
  19. use aya_obj::generated::BPF_RINGBUF_HDR_SZ;
  20. use integration_common::ring_buf::Registers;
  21. use rand::Rng as _;
  22. use scopeguard::defer;
  23. use tokio::io::{Interest, unix::AsyncFd};
  24. struct RingBufTest {
  25. _bpf: Ebpf,
  26. ring_buf: RingBuf<MapData>,
  27. regs: PerCpuArray<MapData, Registers>,
  28. }
  29. // Note that it is important for this test that RING_BUF_MAX_ENTRIES ends up creating a ring buffer
  30. // that is exactly a power-of-two multiple of the page size. The synchronous test will fail if
  31. // that's not the case because the actual size will be rounded up, and fewer entries will be dropped
  32. // than expected.
  33. const RING_BUF_MAX_ENTRIES: usize = 512;
  34. impl RingBufTest {
  35. fn new() -> Self {
  36. Self::new_with_mutators(|_loader| {}, |_bpf| {})
  37. }
  38. // Allows the test to mutate the EbpfLoader before it loads the object file from disk, and to
  39. // mutate the loaded Ebpf object after it has been loaded from disk but before it is loaded
  40. // into the kernel.
  41. fn new_with_mutators<'loader>(
  42. loader_fn: impl FnOnce(&mut EbpfLoader<'loader>),
  43. bpf_fn: impl FnOnce(&mut Ebpf),
  44. ) -> Self {
  45. const RING_BUF_BYTE_SIZE: u32 =
  46. (RING_BUF_MAX_ENTRIES * (mem::size_of::<u64>() + BPF_RINGBUF_HDR_SZ as usize)) as u32;
  47. // Use the loader API to control the size of the ring_buf.
  48. let mut loader = EbpfLoader::new();
  49. loader.map_max_entries("RING_BUF", RING_BUF_BYTE_SIZE);
  50. loader_fn(&mut loader);
  51. let mut bpf = loader.load(crate::RING_BUF).unwrap();
  52. bpf_fn(&mut bpf);
  53. let ring_buf = bpf.take_map("RING_BUF").unwrap();
  54. let ring_buf = RingBuf::try_from(ring_buf).unwrap();
  55. let regs = bpf.take_map("REGISTERS").unwrap();
  56. let regs = PerCpuArray::<_, Registers>::try_from(regs).unwrap();
  57. let prog: &mut UProbe = bpf
  58. .program_mut("ring_buf_test")
  59. .unwrap()
  60. .try_into()
  61. .unwrap();
  62. prog.load().unwrap();
  63. prog.attach(
  64. "ring_buf_trigger_ebpf_program",
  65. "/proc/self/exe",
  66. None,
  67. None,
  68. )
  69. .unwrap();
  70. Self {
  71. _bpf: bpf,
  72. ring_buf,
  73. regs,
  74. }
  75. }
  76. }
  77. struct WithData(RingBufTest, Vec<u64>);
  78. impl WithData {
  79. fn new(n: usize) -> Self {
  80. Self(RingBufTest::new(), {
  81. let mut rng = rand::rng();
  82. std::iter::repeat_with(|| rng.random()).take(n).collect()
  83. })
  84. }
  85. }
  86. #[test_case::test_case(0; "write zero items")]
  87. #[test_case::test_case(1; "write one item")]
  88. #[test_case::test_case(RING_BUF_MAX_ENTRIES / 2; "write half the capacity items")]
  89. #[test_case::test_case(RING_BUF_MAX_ENTRIES - 1; "write one less than capacity items")]
  90. #[test_case::test_case(RING_BUF_MAX_ENTRIES * 8; "write more items than capacity")]
  91. fn ring_buf(n: usize) {
  92. let WithData(
  93. RingBufTest {
  94. mut ring_buf,
  95. regs,
  96. _bpf,
  97. },
  98. data,
  99. ) = WithData::new(n);
  100. // Note that after expected_capacity has been submitted, reserve calls in the probe will fail
  101. // and the probe will give up.
  102. let expected_capacity = RING_BUF_MAX_ENTRIES - 1;
  103. // Call the function that the uprobe is attached to with the data.
  104. let mut expected = Vec::new();
  105. let mut expected_rejected = 0u64;
  106. let mut expected_dropped = 0u64;
  107. for (i, &v) in data.iter().enumerate() {
  108. ring_buf_trigger_ebpf_program(v);
  109. if i >= expected_capacity {
  110. expected_dropped += 1;
  111. } else if v % 2 == 0 {
  112. expected.push(v);
  113. } else {
  114. expected_rejected += 1;
  115. }
  116. }
  117. let mut seen = Vec::<u64>::new();
  118. while seen.len() < expected.len() {
  119. if let Some(read) = ring_buf.next() {
  120. let read: [u8; 8] = (*read)
  121. .try_into()
  122. .with_context(|| format!("data: {:?}", read.len()))
  123. .unwrap();
  124. let arg = u64::from_ne_bytes(read);
  125. assert_eq!(arg % 2, 0, "got {arg} from probe");
  126. seen.push(arg);
  127. }
  128. }
  129. // Make sure that there is nothing else in the ring_buf.
  130. assert_matches!(ring_buf.next(), None);
  131. // Ensure that the data that was read matches what was passed, and the rejected count was set
  132. // properly.
  133. assert_eq!(seen, expected);
  134. let Registers { dropped, rejected } = regs.get(&0, 0).unwrap().iter().sum();
  135. assert_eq!(dropped, expected_dropped);
  136. assert_eq!(rejected, expected_rejected);
  137. }
  138. #[unsafe(no_mangle)]
  139. #[inline(never)]
  140. extern "C" fn ring_buf_trigger_ebpf_program(arg: u64) {
  141. std::hint::black_box(arg);
  142. }
  143. // This test differs from the other async test in that it's possible for the producer
  144. // to fill the ring_buf. We just ensure that the number of events we see is sane given
  145. // what the producer sees, and that the logic does not hang. This exercises interleaving
  146. // discards, successful commits, and drops due to the ring_buf being full.
  147. #[tokio::test(flavor = "multi_thread")]
  148. #[test_log::test]
  149. async fn ring_buf_async_with_drops() {
  150. let WithData(
  151. RingBufTest {
  152. ring_buf,
  153. regs,
  154. _bpf,
  155. },
  156. data,
  157. ) = WithData::new(RING_BUF_MAX_ENTRIES * 8);
  158. let mut async_fd = AsyncFd::with_interest(ring_buf, Interest::READABLE).unwrap();
  159. // Spawn the writer which internally will spawn many parallel writers.
  160. // Construct an AsyncFd from the RingBuf in order to receive readiness notifications.
  161. let mut seen = 0;
  162. let mut process_ring_buf = |ring_buf: &mut RingBuf<_>| {
  163. while let Some(read) = ring_buf.next() {
  164. let read: [u8; 8] = (*read)
  165. .try_into()
  166. .with_context(|| format!("data: {:?}", read.len()))
  167. .unwrap();
  168. let arg = u64::from_ne_bytes(read);
  169. assert_eq!(arg % 2, 0, "got {arg} from probe");
  170. seen += 1;
  171. }
  172. };
  173. let mut writer =
  174. futures::future::try_join_all(data.chunks(8).map(ToOwned::to_owned).map(|v| {
  175. tokio::spawn(async {
  176. for value in v {
  177. ring_buf_trigger_ebpf_program(value);
  178. }
  179. })
  180. }));
  181. loop {
  182. let readable = async_fd.readable_mut();
  183. futures::pin_mut!(readable);
  184. match futures::future::select(readable, &mut writer).await {
  185. futures::future::Either::Left((guard, _writer)) => {
  186. let mut guard = guard.unwrap();
  187. process_ring_buf(guard.get_inner_mut());
  188. guard.clear_ready();
  189. }
  190. futures::future::Either::Right((writer, readable)) => {
  191. writer.unwrap();
  192. // If there's more to read, we should receive a readiness notification in a timely
  193. // manner. If we don't then, then assert that there's nothing else to read. Note
  194. // that it's important to wait some time before attempting to read, otherwise we may
  195. // catch up with the producer before epoll has an opportunity to send a
  196. // notification; our consumer thread can race with the kernel epoll check.
  197. match tokio::time::timeout(Duration::from_millis(10), readable).await {
  198. Err(tokio::time::error::Elapsed { .. }) => (),
  199. Ok(guard) => {
  200. let mut guard = guard.unwrap();
  201. process_ring_buf(guard.get_inner_mut());
  202. guard.clear_ready();
  203. }
  204. }
  205. break;
  206. }
  207. }
  208. }
  209. // Make sure that there is nothing else in the ring_buf.
  210. assert_matches!(async_fd.into_inner().next(), None);
  211. let max_dropped: u64 = u64::try_from(
  212. data.len()
  213. .checked_sub(RING_BUF_MAX_ENTRIES - 1)
  214. .unwrap_or_default(),
  215. )
  216. .unwrap();
  217. let max_seen = u64::try_from(data.iter().filter(|v| *v % 2 == 0).count()).unwrap();
  218. let max_rejected = u64::try_from(data.len()).unwrap() - max_seen;
  219. let Registers { dropped, rejected } = regs.get(&0, 0).unwrap().iter().sum();
  220. let total = u64::try_from(data.len()).unwrap();
  221. let min_seen = max_seen.checked_sub(max_dropped).unwrap_or_default();
  222. let min_rejected = max_rejected.checked_sub(dropped).unwrap_or_default();
  223. let facts = format!(
  224. "seen={seen}, rejected={rejected}, dropped={dropped}, total={total}, max_seen={max_seen}, \
  225. max_rejected={max_rejected}, max_dropped={max_dropped}",
  226. );
  227. assert_eq!(seen + rejected + dropped, total, "{facts}");
  228. assert!(
  229. (0u64..=max_dropped).contains(&dropped),
  230. "dropped={dropped} not in 0..={max_dropped}; {facts}",
  231. );
  232. assert!(
  233. (min_rejected..=max_rejected).contains(&rejected),
  234. "rejected={rejected} not in {min_rejected}..={max_rejected}; {facts}",
  235. );
  236. assert!(
  237. (min_seen..=max_seen).contains(&seen),
  238. "seen={seen} not in {min_seen}..={max_seen}, rejected={rejected}; {facts}",
  239. );
  240. }
  241. #[tokio::test(flavor = "multi_thread")]
  242. #[test_log::test]
  243. async fn ring_buf_async_no_drop() {
  244. let WithData(
  245. RingBufTest {
  246. ring_buf,
  247. regs,
  248. _bpf,
  249. },
  250. data,
  251. ) = WithData::new(RING_BUF_MAX_ENTRIES * 3);
  252. let writer = {
  253. let mut rng = rand::rng();
  254. let data: Vec<_> = data
  255. .iter()
  256. .copied()
  257. .map(|value| (value, Duration::from_nanos(rng.random_range(0..10))))
  258. .collect();
  259. tokio::spawn(async move {
  260. for (value, duration) in data {
  261. // Sleep a tad so we feel confident that the consumer will keep up
  262. // and no messages will be dropped.
  263. tokio::time::sleep(duration).await;
  264. ring_buf_trigger_ebpf_program(value);
  265. }
  266. })
  267. };
  268. // Construct an AsyncFd from the RingBuf in order to receive readiness notifications.
  269. let mut async_fd = AsyncFd::with_interest(ring_buf, Interest::READABLE).unwrap();
  270. // Note that unlike in the synchronous case where all of the entries are written before any of
  271. // them are read, in this case we expect all of the entries to make their way to userspace
  272. // because entries are being consumed as they are produced.
  273. let expected: Vec<u64> = data.iter().cloned().filter(|v| *v % 2 == 0).collect();
  274. let expected_len = expected.len();
  275. let reader = async move {
  276. let mut seen = Vec::with_capacity(expected_len);
  277. while seen.len() < expected_len {
  278. let mut guard = async_fd.readable_mut().await.unwrap();
  279. let ring_buf = guard.get_inner_mut();
  280. while let Some(read) = ring_buf.next() {
  281. let read: [u8; 8] = (*read)
  282. .try_into()
  283. .with_context(|| format!("data: {:?}", read.len()))
  284. .unwrap();
  285. let arg = u64::from_ne_bytes(read);
  286. seen.push(arg);
  287. }
  288. guard.clear_ready();
  289. }
  290. (seen, async_fd.into_inner())
  291. };
  292. let (writer, (seen, mut ring_buf)) = futures::future::join(writer, reader).await;
  293. writer.unwrap();
  294. // Make sure that there is nothing else in the ring_buf.
  295. assert_matches!(ring_buf.next(), None);
  296. // Ensure that the data that was read matches what was passed.
  297. assert_eq!(&seen, &expected);
  298. let Registers { dropped, rejected } = regs.get(&0, 0).unwrap().iter().sum();
  299. assert_eq!(dropped, 0);
  300. assert_eq!(rejected, (data.len() - expected.len()).try_into().unwrap());
  301. }
  302. // This test reproduces a bug where the ring buffer would not be notified of new entries if the
  303. // state was not properly synchronized between the producer and consumer. This would result in the
  304. // consumer never being woken up and the test hanging.
  305. #[test_log::test]
  306. fn ring_buf_epoll_wakeup() {
  307. let RingBufTest {
  308. mut ring_buf,
  309. _bpf,
  310. regs: _,
  311. } = RingBufTest::new();
  312. let epoll_fd = epoll::create(false).unwrap();
  313. epoll::ctl(
  314. epoll_fd,
  315. epoll::ControlOptions::EPOLL_CTL_ADD,
  316. ring_buf.as_raw_fd(),
  317. // The use of EPOLLET is intentional. Without it, level-triggering would result in
  318. // more notifications, and would mask the underlying bug this test reproduced when
  319. // the synchronization logic in the RingBuf mirrored that of libbpf. Also, tokio's
  320. // AsyncFd always uses this flag (as demonstrated in the subsequent test).
  321. epoll::Event::new(epoll::Events::EPOLLIN | epoll::Events::EPOLLET, 0),
  322. )
  323. .unwrap();
  324. let mut epoll_event_buf = [epoll::Event::new(epoll::Events::EPOLLIN, 0); 1];
  325. let mut total_events: u64 = 0;
  326. let writer = WriterThread::spawn();
  327. while total_events < WriterThread::NUM_MESSAGES {
  328. epoll::wait(epoll_fd, -1, &mut epoll_event_buf).unwrap();
  329. while let Some(read) = ring_buf.next() {
  330. assert_eq!(read.len(), 8);
  331. total_events += 1;
  332. }
  333. }
  334. writer.join();
  335. }
  336. // This test is like the above test but uses tokio and AsyncFd instead of raw epoll.
  337. #[tokio::test]
  338. #[test_log::test]
  339. async fn ring_buf_asyncfd_events() {
  340. let RingBufTest {
  341. ring_buf,
  342. regs: _,
  343. _bpf,
  344. } = RingBufTest::new();
  345. let mut async_fd = AsyncFd::with_interest(ring_buf, Interest::READABLE).unwrap();
  346. let mut total_events = 0;
  347. let writer = WriterThread::spawn();
  348. while total_events < WriterThread::NUM_MESSAGES {
  349. let mut guard = async_fd.readable_mut().await.unwrap();
  350. let rb = guard.get_inner_mut();
  351. while let Some(read) = rb.next() {
  352. assert_eq!(read.len(), 8);
  353. total_events += 1;
  354. }
  355. guard.clear_ready();
  356. }
  357. writer.join();
  358. }
  359. // WriterThread triggers the ring_buf write continuously until the join() method is called. It is
  360. // used by both the epoll and async fd test that need frequent writes to the ring buffer to trigger
  361. // the memory synchronization bug that was fixed.
  362. struct WriterThread {
  363. thread: thread::JoinHandle<()>,
  364. done: Arc<AtomicBool>,
  365. }
  366. impl WriterThread {
  367. // When the ring buffer implementation uses Ordering::Relaxed to write the consumer position
  368. // rather than Ordering::SeqCst, the test will hang. This number was determined to be large
  369. // enough to tickle that bug on a hardware accelerated VM with 2 vCPUs.
  370. const NUM_MESSAGES: u64 = 20_000;
  371. fn spawn() -> Self {
  372. let done = Arc::new(AtomicBool::new(false));
  373. Self {
  374. thread: {
  375. let done = done.clone();
  376. thread::spawn(move || {
  377. while !done.load(Ordering::Relaxed) {
  378. // Write 0 which is even and won't be rejected.
  379. ring_buf_trigger_ebpf_program(0);
  380. }
  381. })
  382. },
  383. done,
  384. }
  385. }
  386. fn join(self) {
  387. let Self { thread, done } = self;
  388. done.store(true, Ordering::Relaxed);
  389. thread.join().unwrap();
  390. }
  391. }
  392. // This tests that a ring buffer can be pinned and then re-opened and attached to a subsequent
  393. // program. It ensures that the producer position is properly synchronized between the two
  394. // programs, and that no unread data is lost.
  395. #[tokio::test(flavor = "multi_thread")]
  396. #[test_log::test]
  397. async fn ring_buf_pinned() {
  398. let pin_path =
  399. Path::new("/sys/fs/bpf/").join(format!("ring_buf_{}", rand::rng().random::<u64>()));
  400. let RingBufTest {
  401. mut ring_buf,
  402. regs: _,
  403. _bpf,
  404. } = RingBufTest::new_with_mutators(
  405. |_loader| {},
  406. |bpf| {
  407. let ring_buf = bpf.map_mut("RING_BUF").unwrap();
  408. ring_buf.pin(&pin_path).unwrap();
  409. },
  410. );
  411. defer! { std::fs::remove_file(&pin_path).unwrap() }
  412. // Write a few items to the ring buffer.
  413. let to_write_before_reopen = [2, 4, 6, 8];
  414. for v in to_write_before_reopen {
  415. ring_buf_trigger_ebpf_program(v);
  416. }
  417. let (to_read_before_reopen, to_read_after_reopen) = to_write_before_reopen.split_at(2);
  418. for v in to_read_before_reopen {
  419. let item = ring_buf.next().unwrap();
  420. let item: [u8; 8] = item.as_ref().try_into().unwrap();
  421. assert_eq!(item, v.to_ne_bytes());
  422. }
  423. drop(ring_buf);
  424. drop(_bpf);
  425. // Reopen the ring buffer using the pinned map.
  426. let RingBufTest {
  427. mut ring_buf,
  428. regs: _,
  429. _bpf,
  430. } = RingBufTest::new_with_mutators(
  431. |loader| {
  432. loader.map_pin_path("RING_BUF", &pin_path);
  433. },
  434. |_bpf| {},
  435. );
  436. let to_write_after_reopen = [10, 12];
  437. // Write some more data.
  438. for v in to_write_after_reopen {
  439. ring_buf_trigger_ebpf_program(v);
  440. }
  441. // Read both the data that was written before the ring buffer was reopened and the data that
  442. // was written after it was reopened.
  443. for v in to_read_after_reopen
  444. .iter()
  445. .chain(to_write_after_reopen.iter())
  446. {
  447. let item = ring_buf.next().unwrap();
  448. let item: [u8; 8] = item.as_ref().try_into().unwrap();
  449. assert_eq!(item, v.to_ne_bytes());
  450. }
  451. // Make sure there is nothing else in the ring buffer.
  452. assert_matches!(ring_buf.next(), None);
  453. }