ring_buf.rs 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412
  1. use std::{
  2. mem,
  3. os::fd::AsRawFd as _,
  4. sync::{
  5. Arc,
  6. atomic::{AtomicBool, Ordering},
  7. },
  8. thread,
  9. time::Duration,
  10. };
  11. use anyhow::Context as _;
  12. use assert_matches::assert_matches;
  13. use aya::{
  14. Ebpf, EbpfLoader,
  15. maps::{MapData, array::PerCpuArray, ring_buf::RingBuf},
  16. programs::UProbe,
  17. };
  18. use aya_obj::generated::BPF_RINGBUF_HDR_SZ;
  19. use integration_common::ring_buf::Registers;
  20. use rand::Rng as _;
  21. use test_log::test;
  22. use tokio::io::unix::AsyncFd;
  23. struct RingBufTest {
  24. _bpf: Ebpf,
  25. ring_buf: RingBuf<MapData>,
  26. regs: PerCpuArray<MapData, Registers>,
  27. }
  28. // Note that it is important for this test that RING_BUF_MAX_ENTRIES ends up creating a ring buffer
  29. // that is exactly a power-of-two multiple of the page size. The synchronous test will fail if
  30. // that's not the case because the actual size will be rounded up, and fewer entries will be dropped
  31. // than expected.
  32. const RING_BUF_MAX_ENTRIES: usize = 512;
  33. impl RingBufTest {
  34. fn new() -> Self {
  35. const RING_BUF_BYTE_SIZE: u32 =
  36. (RING_BUF_MAX_ENTRIES * (mem::size_of::<u64>() + BPF_RINGBUF_HDR_SZ as usize)) as u32;
  37. // Use the loader API to control the size of the ring_buf.
  38. let mut bpf = EbpfLoader::new()
  39. .set_max_entries("RING_BUF", RING_BUF_BYTE_SIZE)
  40. .load(crate::RING_BUF)
  41. .unwrap();
  42. let ring_buf = bpf.take_map("RING_BUF").unwrap();
  43. let ring_buf = RingBuf::try_from(ring_buf).unwrap();
  44. let regs = bpf.take_map("REGISTERS").unwrap();
  45. let regs = PerCpuArray::<_, Registers>::try_from(regs).unwrap();
  46. let prog: &mut UProbe = bpf
  47. .program_mut("ring_buf_test")
  48. .unwrap()
  49. .try_into()
  50. .unwrap();
  51. prog.load().unwrap();
  52. prog.attach(
  53. "ring_buf_trigger_ebpf_program",
  54. "/proc/self/exe",
  55. None,
  56. None,
  57. )
  58. .unwrap();
  59. Self {
  60. _bpf: bpf,
  61. ring_buf,
  62. regs,
  63. }
  64. }
  65. }
  66. struct WithData(RingBufTest, Vec<u64>);
  67. impl WithData {
  68. fn new(n: usize) -> Self {
  69. Self(RingBufTest::new(), {
  70. let mut rng = rand::rng();
  71. std::iter::repeat_with(|| rng.random()).take(n).collect()
  72. })
  73. }
  74. }
  75. #[test_case::test_case(0; "write zero items")]
  76. #[test_case::test_case(1; "write one item")]
  77. #[test_case::test_case(RING_BUF_MAX_ENTRIES / 2; "write half the capacity items")]
  78. #[test_case::test_case(RING_BUF_MAX_ENTRIES - 1; "write one less than capacity items")]
  79. #[test_case::test_case(RING_BUF_MAX_ENTRIES * 8; "write more items than capacity")]
  80. fn ring_buf(n: usize) {
  81. let WithData(
  82. RingBufTest {
  83. mut ring_buf,
  84. regs,
  85. _bpf,
  86. },
  87. data,
  88. ) = WithData::new(n);
  89. // Note that after expected_capacity has been submitted, reserve calls in the probe will fail
  90. // and the probe will give up.
  91. let expected_capacity = RING_BUF_MAX_ENTRIES - 1;
  92. // Call the function that the uprobe is attached to with the data.
  93. let mut expected = Vec::new();
  94. let mut expected_rejected = 0u64;
  95. let mut expected_dropped = 0u64;
  96. for (i, &v) in data.iter().enumerate() {
  97. ring_buf_trigger_ebpf_program(v);
  98. if i >= expected_capacity {
  99. expected_dropped += 1;
  100. } else if v % 2 == 0 {
  101. expected.push(v);
  102. } else {
  103. expected_rejected += 1;
  104. }
  105. }
  106. let mut seen = Vec::<u64>::new();
  107. while seen.len() < expected.len() {
  108. if let Some(read) = ring_buf.next() {
  109. let read: [u8; 8] = (*read)
  110. .try_into()
  111. .with_context(|| format!("data: {:?}", read.len()))
  112. .unwrap();
  113. let arg = u64::from_ne_bytes(read);
  114. assert_eq!(arg % 2, 0, "got {arg} from probe");
  115. seen.push(arg);
  116. }
  117. }
  118. // Make sure that there is nothing else in the ring_buf.
  119. assert_matches!(ring_buf.next(), None);
  120. // Ensure that the data that was read matches what was passed, and the rejected count was set
  121. // properly.
  122. assert_eq!(seen, expected);
  123. let Registers { dropped, rejected } = regs.get(&0, 0).unwrap().iter().sum();
  124. assert_eq!(dropped, expected_dropped);
  125. assert_eq!(rejected, expected_rejected);
  126. }
  127. #[unsafe(no_mangle)]
  128. #[inline(never)]
  129. pub extern "C" fn ring_buf_trigger_ebpf_program(arg: u64) {
  130. std::hint::black_box(arg);
  131. }
  132. // This test differs from the other async test in that it's possible for the producer
  133. // to fill the ring_buf. We just ensure that the number of events we see is sane given
  134. // what the producer sees, and that the logic does not hang. This exercises interleaving
  135. // discards, successful commits, and drops due to the ring_buf being full.
  136. #[test(tokio::test(flavor = "multi_thread"))]
  137. async fn ring_buf_async_with_drops() {
  138. let WithData(
  139. RingBufTest {
  140. ring_buf,
  141. regs,
  142. _bpf,
  143. },
  144. data,
  145. ) = WithData::new(RING_BUF_MAX_ENTRIES * 8);
  146. let mut async_fd = AsyncFd::new(ring_buf).unwrap();
  147. // Spawn the writer which internally will spawn many parallel writers.
  148. // Construct an AsyncFd from the RingBuf in order to receive readiness notifications.
  149. let mut seen = 0;
  150. let mut process_ring_buf = |ring_buf: &mut RingBuf<_>| {
  151. while let Some(read) = ring_buf.next() {
  152. let read: [u8; 8] = (*read)
  153. .try_into()
  154. .with_context(|| format!("data: {:?}", read.len()))
  155. .unwrap();
  156. let arg = u64::from_ne_bytes(read);
  157. assert_eq!(arg % 2, 0, "got {arg} from probe");
  158. seen += 1;
  159. }
  160. };
  161. let mut writer =
  162. futures::future::try_join_all(data.chunks(8).map(ToOwned::to_owned).map(|v| {
  163. tokio::spawn(async {
  164. for value in v {
  165. ring_buf_trigger_ebpf_program(value);
  166. }
  167. })
  168. }));
  169. loop {
  170. let readable = async_fd.readable_mut();
  171. futures::pin_mut!(readable);
  172. match futures::future::select(readable, &mut writer).await {
  173. futures::future::Either::Left((guard, _writer)) => {
  174. let mut guard = guard.unwrap();
  175. process_ring_buf(guard.get_inner_mut());
  176. guard.clear_ready();
  177. }
  178. futures::future::Either::Right((writer, readable)) => {
  179. writer.unwrap();
  180. // If there's more to read, we should receive a readiness notification in a timely
  181. // manner. If we don't then, then assert that there's nothing else to read. Note
  182. // that it's important to wait some time before attempting to read, otherwise we may
  183. // catch up with the producer before epoll has an opportunity to send a
  184. // notification; our consumer thread can race with the kernel epoll check.
  185. match tokio::time::timeout(Duration::from_millis(10), readable).await {
  186. Err(tokio::time::error::Elapsed { .. }) => (),
  187. Ok(guard) => {
  188. let mut guard = guard.unwrap();
  189. process_ring_buf(guard.get_inner_mut());
  190. guard.clear_ready();
  191. }
  192. }
  193. break;
  194. }
  195. }
  196. }
  197. // Make sure that there is nothing else in the ring_buf.
  198. assert_matches!(async_fd.into_inner().next(), None);
  199. let max_dropped: u64 = u64::try_from(
  200. data.len()
  201. .checked_sub(RING_BUF_MAX_ENTRIES - 1)
  202. .unwrap_or_default(),
  203. )
  204. .unwrap();
  205. let max_seen = u64::try_from(data.iter().filter(|v| *v % 2 == 0).count()).unwrap();
  206. let max_rejected = u64::try_from(data.len()).unwrap() - max_seen;
  207. let Registers { dropped, rejected } = regs.get(&0, 0).unwrap().iter().sum();
  208. let total = u64::try_from(data.len()).unwrap();
  209. let min_seen = max_seen.checked_sub(max_dropped).unwrap_or_default();
  210. let min_rejected = max_rejected.checked_sub(dropped).unwrap_or_default();
  211. let facts = format!(
  212. "seen={seen}, rejected={rejected}, dropped={dropped}, total={total}, max_seen={max_seen}, \
  213. max_rejected={max_rejected}, max_dropped={max_dropped}",
  214. );
  215. assert_eq!(seen + rejected + dropped, total, "{facts}",);
  216. assert!(
  217. (0u64..=max_dropped).contains(&dropped),
  218. "dropped={dropped} not in 0..={max_dropped}; {facts}",
  219. );
  220. assert!(
  221. (min_rejected..=max_rejected).contains(&rejected),
  222. "rejected={rejected} not in {min_rejected}..={max_rejected}; {facts}",
  223. );
  224. assert!(
  225. (min_seen..=max_seen).contains(&seen),
  226. "seen={seen} not in {min_seen}..={max_seen}, rejected={rejected}; {facts}",
  227. );
  228. }
  229. #[test(tokio::test(flavor = "multi_thread"))]
  230. async fn ring_buf_async_no_drop() {
  231. let WithData(
  232. RingBufTest {
  233. ring_buf,
  234. regs,
  235. _bpf,
  236. },
  237. data,
  238. ) = WithData::new(RING_BUF_MAX_ENTRIES * 3);
  239. let writer = {
  240. let mut rng = rand::rng();
  241. let data: Vec<_> = data
  242. .iter()
  243. .copied()
  244. .map(|value| (value, Duration::from_nanos(rng.random_range(0..10))))
  245. .collect();
  246. tokio::spawn(async move {
  247. for (value, duration) in data {
  248. // Sleep a tad so we feel confident that the consumer will keep up
  249. // and no messages will be dropped.
  250. tokio::time::sleep(duration).await;
  251. ring_buf_trigger_ebpf_program(value);
  252. }
  253. })
  254. };
  255. // Construct an AsyncFd from the RingBuf in order to receive readiness notifications.
  256. let mut async_fd = AsyncFd::new(ring_buf).unwrap();
  257. // Note that unlike in the synchronous case where all of the entries are written before any of
  258. // them are read, in this case we expect all of the entries to make their way to userspace
  259. // because entries are being consumed as they are produced.
  260. let expected: Vec<u64> = data.iter().cloned().filter(|v| *v % 2 == 0).collect();
  261. let expected_len = expected.len();
  262. let reader = async move {
  263. let mut seen = Vec::with_capacity(expected_len);
  264. while seen.len() < expected_len {
  265. let mut guard = async_fd.readable_mut().await.unwrap();
  266. let ring_buf = guard.get_inner_mut();
  267. while let Some(read) = ring_buf.next() {
  268. let read: [u8; 8] = (*read)
  269. .try_into()
  270. .with_context(|| format!("data: {:?}", read.len()))
  271. .unwrap();
  272. let arg = u64::from_ne_bytes(read);
  273. seen.push(arg);
  274. }
  275. guard.clear_ready();
  276. }
  277. (seen, async_fd.into_inner())
  278. };
  279. let (writer, (seen, mut ring_buf)) = futures::future::join(writer, reader).await;
  280. writer.unwrap();
  281. // Make sure that there is nothing else in the ring_buf.
  282. assert_matches!(ring_buf.next(), None);
  283. // Ensure that the data that was read matches what was passed.
  284. assert_eq!(&seen, &expected);
  285. let Registers { dropped, rejected } = regs.get(&0, 0).unwrap().iter().sum();
  286. assert_eq!(dropped, 0);
  287. assert_eq!(rejected, (data.len() - expected.len()).try_into().unwrap());
  288. }
  289. // This test reproduces a bug where the ring buffer would not be notified of new entries if the
  290. // state was not properly synchronized between the producer and consumer. This would result in the
  291. // consumer never being woken up and the test hanging.
  292. #[test]
  293. fn ring_buf_epoll_wakeup() {
  294. let RingBufTest {
  295. mut ring_buf,
  296. _bpf,
  297. regs: _,
  298. } = RingBufTest::new();
  299. let epoll_fd = epoll::create(false).unwrap();
  300. epoll::ctl(
  301. epoll_fd,
  302. epoll::ControlOptions::EPOLL_CTL_ADD,
  303. ring_buf.as_raw_fd(),
  304. // The use of EPOLLET is intentional. Without it, level-triggering would result in
  305. // more notifications, and would mask the underlying bug this test reproduced when
  306. // the synchronization logic in the RingBuf mirrored that of libbpf. Also, tokio's
  307. // AsyncFd always uses this flag (as demonstrated in the subsequent test).
  308. epoll::Event::new(epoll::Events::EPOLLIN | epoll::Events::EPOLLET, 0),
  309. )
  310. .unwrap();
  311. let mut epoll_event_buf = [epoll::Event::new(epoll::Events::EPOLLIN, 0); 1];
  312. let mut total_events: u64 = 0;
  313. let writer = WriterThread::spawn();
  314. while total_events < WriterThread::NUM_MESSAGES {
  315. epoll::wait(epoll_fd, -1, &mut epoll_event_buf).unwrap();
  316. while let Some(read) = ring_buf.next() {
  317. assert_eq!(read.len(), 8);
  318. total_events += 1;
  319. }
  320. }
  321. writer.join();
  322. }
  323. // This test is like the above test but uses tokio and AsyncFd instead of raw epoll.
  324. #[test(tokio::test)]
  325. async fn ring_buf_asyncfd_events() {
  326. let RingBufTest {
  327. ring_buf,
  328. regs: _,
  329. _bpf,
  330. } = RingBufTest::new();
  331. let mut async_fd = AsyncFd::new(ring_buf).unwrap();
  332. let mut total_events = 0;
  333. let writer = WriterThread::spawn();
  334. while total_events < WriterThread::NUM_MESSAGES {
  335. let mut guard = async_fd.readable_mut().await.unwrap();
  336. let rb = guard.get_inner_mut();
  337. while let Some(read) = rb.next() {
  338. assert_eq!(read.len(), 8);
  339. total_events += 1;
  340. }
  341. guard.clear_ready();
  342. }
  343. writer.join();
  344. }
  345. // WriterThread triggers the ring_buf write continuously until the join() method is called. It is
  346. // used by both the epoll and async fd test that need frequent writes to the ring buffer to trigger
  347. // the memory synchronization bug that was fixed.
  348. struct WriterThread {
  349. thread: thread::JoinHandle<()>,
  350. done: Arc<AtomicBool>,
  351. }
  352. impl WriterThread {
  353. // When the ring buffer implementation uses Ordering::Relaxed to write the consumer position
  354. // rather than Ordering::SeqCst, the test will hang. This number was determined to be large
  355. // enough to tickle that bug on a hardware accelerated VM with 2 vCPUs.
  356. const NUM_MESSAGES: u64 = 20_000;
  357. fn spawn() -> Self {
  358. let done = Arc::new(AtomicBool::new(false));
  359. Self {
  360. thread: {
  361. let done = done.clone();
  362. thread::spawn(move || {
  363. while !done.load(Ordering::Relaxed) {
  364. // Write 0 which is even and won't be rejected.
  365. ring_buf_trigger_ebpf_program(0);
  366. }
  367. })
  368. },
  369. done,
  370. }
  371. }
  372. fn join(self) {
  373. let Self { thread, done } = self;
  374. done.store(true, Ordering::Relaxed);
  375. thread.join().unwrap();
  376. }
  377. }