ring_buf.rs 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442
  1. use std::{
  2. mem,
  3. os::fd::AsRawFd as _,
  4. sync::{
  5. atomic::{AtomicBool, Ordering},
  6. Arc,
  7. },
  8. thread,
  9. };
  10. use anyhow::Context as _;
  11. use assert_matches::assert_matches;
  12. use aya::{
  13. maps::{array::PerCpuArray, ring_buf::RingBuf, MapData},
  14. programs::UProbe,
  15. Ebpf, EbpfLoader, Pod,
  16. };
  17. use aya_obj::generated::BPF_RINGBUF_HDR_SZ;
  18. use rand::Rng as _;
  19. use test_log::test;
  20. use tokio::{
  21. io::unix::AsyncFd,
  22. time::{sleep, Duration},
  23. };
  24. // This structure's definition is duplicated in the probe.
  25. #[repr(C)]
  26. #[derive(Clone, Copy, Debug, Eq, PartialEq, Default)]
  27. struct Registers {
  28. dropped: u64,
  29. rejected: u64,
  30. }
  31. impl std::ops::Add for Registers {
  32. type Output = Self;
  33. fn add(self, rhs: Self) -> Self::Output {
  34. Self {
  35. dropped: self.dropped + rhs.dropped,
  36. rejected: self.rejected + rhs.rejected,
  37. }
  38. }
  39. }
  40. impl<'a> std::iter::Sum<&'a Registers> for Registers {
  41. fn sum<I: Iterator<Item = &'a Registers>>(iter: I) -> Self {
  42. iter.fold(Default::default(), |a, b| a + *b)
  43. }
  44. }
  45. unsafe impl Pod for Registers {}
  46. struct RingBufTest {
  47. _bpf: Ebpf,
  48. ring_buf: RingBuf<MapData>,
  49. regs: PerCpuArray<MapData, Registers>,
  50. }
  51. // Note that it is important for this test that RING_BUF_MAX_ENTRIES ends up creating a ring buffer
  52. // that is exactly a power-of-two multiple of the page size. The synchronous test will fail if
  53. // that's not the case because the actual size will be rounded up, and fewer entries will be dropped
  54. // than expected.
  55. const RING_BUF_MAX_ENTRIES: usize = 512;
  56. impl RingBufTest {
  57. fn new() -> Self {
  58. const RING_BUF_BYTE_SIZE: u32 =
  59. (RING_BUF_MAX_ENTRIES * (mem::size_of::<u64>() + BPF_RINGBUF_HDR_SZ as usize)) as u32;
  60. // Use the loader API to control the size of the ring_buf.
  61. let mut bpf = EbpfLoader::new()
  62. .set_max_entries("RING_BUF", RING_BUF_BYTE_SIZE)
  63. .load(crate::RING_BUF)
  64. .unwrap();
  65. let ring_buf = bpf.take_map("RING_BUF").unwrap();
  66. let ring_buf = RingBuf::try_from(ring_buf).unwrap();
  67. let regs = bpf.take_map("REGISTERS").unwrap();
  68. let regs = PerCpuArray::<_, Registers>::try_from(regs).unwrap();
  69. let prog: &mut UProbe = bpf
  70. .program_mut("ring_buf_test")
  71. .unwrap()
  72. .try_into()
  73. .unwrap();
  74. prog.load().unwrap();
  75. prog.attach(
  76. Some("ring_buf_trigger_ebpf_program"),
  77. 0,
  78. "/proc/self/exe",
  79. None,
  80. )
  81. .unwrap();
  82. Self {
  83. _bpf: bpf,
  84. ring_buf,
  85. regs,
  86. }
  87. }
  88. }
  89. struct WithData(RingBufTest, Vec<u64>);
  90. impl WithData {
  91. fn new(n: usize) -> Self {
  92. Self(RingBufTest::new(), {
  93. let mut rng = rand::thread_rng();
  94. std::iter::repeat_with(|| rng.gen()).take(n).collect()
  95. })
  96. }
  97. }
  98. #[test_case::test_case(0; "write zero items")]
  99. #[test_case::test_case(1; "write one item")]
  100. #[test_case::test_case(RING_BUF_MAX_ENTRIES / 2; "write half the capacity items")]
  101. #[test_case::test_case(RING_BUF_MAX_ENTRIES - 1; "write one less than capacity items")]
  102. #[test_case::test_case(RING_BUF_MAX_ENTRIES * 8; "write more items than capacity")]
  103. fn ring_buf(n: usize) {
  104. let WithData(
  105. RingBufTest {
  106. mut ring_buf,
  107. regs,
  108. _bpf,
  109. },
  110. data,
  111. ) = WithData::new(n);
  112. // Note that after expected_capacity has been submitted, reserve calls in the probe will fail
  113. // and the probe will give up.
  114. let expected_capacity = RING_BUF_MAX_ENTRIES - 1;
  115. // Call the function that the uprobe is attached to with the data.
  116. let mut expected = Vec::new();
  117. let mut expected_rejected = 0u64;
  118. let mut expected_dropped = 0u64;
  119. for (i, &v) in data.iter().enumerate() {
  120. ring_buf_trigger_ebpf_program(v);
  121. if i >= expected_capacity {
  122. expected_dropped += 1;
  123. } else if v % 2 == 0 {
  124. expected.push(v);
  125. } else {
  126. expected_rejected += 1;
  127. }
  128. }
  129. let mut seen = Vec::<u64>::new();
  130. while seen.len() < expected.len() {
  131. if let Some(read) = ring_buf.next() {
  132. let read: [u8; 8] = (*read)
  133. .try_into()
  134. .with_context(|| format!("data: {:?}", read.len()))
  135. .unwrap();
  136. let arg = u64::from_ne_bytes(read);
  137. assert_eq!(arg % 2, 0, "got {arg} from probe");
  138. seen.push(arg);
  139. }
  140. }
  141. // Make sure that there is nothing else in the ring_buf.
  142. assert_matches!(ring_buf.next(), None);
  143. // Ensure that the data that was read matches what was passed, and the rejected count was set
  144. // properly.
  145. assert_eq!(seen, expected);
  146. let Registers { dropped, rejected } = regs.get(&0, 0).unwrap().iter().sum();
  147. assert_eq!(dropped, expected_dropped);
  148. assert_eq!(rejected, expected_rejected);
  149. }
  150. #[no_mangle]
  151. #[inline(never)]
  152. pub extern "C" fn ring_buf_trigger_ebpf_program(arg: u64) {
  153. std::hint::black_box(arg);
  154. }
  155. // This test differs from the other async test in that it's possible for the producer
  156. // to fill the ring_buf. We just ensure that the number of events we see is sane given
  157. // what the producer sees, and that the logic does not hang. This exercises interleaving
  158. // discards, successful commits, and drops due to the ring_buf being full.
  159. #[test(tokio::test(flavor = "multi_thread"))]
  160. async fn ring_buf_async_with_drops() {
  161. let WithData(
  162. RingBufTest {
  163. ring_buf,
  164. regs,
  165. _bpf,
  166. },
  167. data,
  168. ) = WithData::new(RING_BUF_MAX_ENTRIES * 8);
  169. let mut async_fd = AsyncFd::new(ring_buf).unwrap();
  170. // Spawn the writer which internally will spawn many parallel writers.
  171. // Construct an AsyncFd from the RingBuf in order to receive readiness notifications.
  172. let mut seen = 0;
  173. let mut process_ring_buf = |ring_buf: &mut RingBuf<_>| {
  174. while let Some(read) = ring_buf.next() {
  175. let read: [u8; 8] = (*read)
  176. .try_into()
  177. .with_context(|| format!("data: {:?}", read.len()))
  178. .unwrap();
  179. let arg = u64::from_ne_bytes(read);
  180. assert_eq!(arg % 2, 0, "got {arg} from probe");
  181. seen += 1;
  182. }
  183. };
  184. use futures::future::{
  185. select,
  186. Either::{Left, Right},
  187. };
  188. let writer = futures::future::try_join_all(data.chunks(8).map(ToOwned::to_owned).map(|v| {
  189. tokio::spawn(async {
  190. for value in v {
  191. ring_buf_trigger_ebpf_program(value);
  192. }
  193. })
  194. }));
  195. let readable = {
  196. let mut writer = writer;
  197. loop {
  198. let readable = Box::pin(async_fd.readable_mut());
  199. writer = match select(readable, writer).await {
  200. Left((guard, writer)) => {
  201. let mut guard = guard.unwrap();
  202. process_ring_buf(guard.get_inner_mut());
  203. guard.clear_ready();
  204. writer
  205. }
  206. Right((writer, readable)) => {
  207. writer.unwrap();
  208. break readable;
  209. }
  210. }
  211. }
  212. };
  213. // If there's more to read, we should receive a readiness notification in a timely manner.
  214. // If we don't then, then assert that there's nothing else to read. Note that it's important
  215. // to wait some time before attempting to read, otherwise we may catch up with the producer
  216. // before epoll has an opportunity to send a notification; our consumer thread can race
  217. // with the kernel epoll check.
  218. let sleep_fut = sleep(Duration::from_millis(10));
  219. tokio::pin!(sleep_fut);
  220. match select(sleep_fut, readable).await {
  221. Left(((), _)) => {}
  222. Right((guard, _)) => {
  223. let mut guard = guard.unwrap();
  224. process_ring_buf(guard.get_inner_mut());
  225. guard.clear_ready();
  226. }
  227. }
  228. // Make sure that there is nothing else in the ring_buf.
  229. assert_matches!(async_fd.into_inner().next(), None);
  230. let max_dropped: u64 = u64::try_from(
  231. data.len()
  232. .checked_sub(RING_BUF_MAX_ENTRIES - 1)
  233. .unwrap_or_default(),
  234. )
  235. .unwrap();
  236. let max_seen = u64::try_from(data.iter().filter(|v| *v % 2 == 0).count()).unwrap();
  237. let max_rejected = u64::try_from(data.len()).unwrap() - max_seen;
  238. let Registers { dropped, rejected } = regs.get(&0, 0).unwrap().iter().sum();
  239. let total = u64::try_from(data.len()).unwrap();
  240. let min_seen = max_seen.checked_sub(max_dropped).unwrap_or_default();
  241. let min_rejected = max_rejected.checked_sub(dropped).unwrap_or_default();
  242. let facts = format!(
  243. "seen={seen}, rejected={rejected}, dropped={dropped}, total={total}, max_seen={max_seen}, \
  244. max_rejected={max_rejected}, max_dropped={max_dropped}",
  245. );
  246. assert_eq!(seen + rejected + dropped, total, "{facts}",);
  247. assert!(
  248. (0u64..=max_dropped).contains(&dropped),
  249. "dropped={dropped} not in 0..={max_dropped}; {facts}",
  250. );
  251. assert!(
  252. (min_rejected..=max_rejected).contains(&rejected),
  253. "rejected={rejected} not in {min_rejected}..={max_rejected}; {facts}",
  254. );
  255. assert!(
  256. (min_seen..=max_seen).contains(&seen),
  257. "seen={seen} not in {min_seen}..={max_seen}, rejected={rejected}; {facts}",
  258. );
  259. }
  260. #[test(tokio::test(flavor = "multi_thread"))]
  261. async fn ring_buf_async_no_drop() {
  262. let WithData(
  263. RingBufTest {
  264. ring_buf,
  265. regs,
  266. _bpf,
  267. },
  268. data,
  269. ) = WithData::new(RING_BUF_MAX_ENTRIES * 3);
  270. let writer = {
  271. let data = data.to_owned();
  272. tokio::spawn(async move {
  273. for value in data {
  274. // Sleep a tad so we feel confident that the consumer will keep up
  275. // and no messages will be dropped.
  276. let dur = Duration::from_nanos(rand::thread_rng().gen_range(0..10));
  277. sleep(dur).await;
  278. ring_buf_trigger_ebpf_program(value);
  279. }
  280. })
  281. };
  282. // Construct an AsyncFd from the RingBuf in order to receive readiness notifications.
  283. let mut async_fd = AsyncFd::new(ring_buf).unwrap();
  284. // Note that unlike in the synchronous case where all of the entries are written before any of
  285. // them are read, in this case we expect all of the entries to make their way to userspace
  286. // because entries are being consumed as they are produced.
  287. let expected: Vec<u64> = data.iter().cloned().filter(|v| *v % 2 == 0).collect();
  288. let expected_len = expected.len();
  289. let reader = async move {
  290. let mut seen = Vec::with_capacity(expected_len);
  291. while seen.len() < expected_len {
  292. let mut guard = async_fd.readable_mut().await.unwrap();
  293. let ring_buf = guard.get_inner_mut();
  294. while let Some(read) = ring_buf.next() {
  295. let read: [u8; 8] = (*read)
  296. .try_into()
  297. .with_context(|| format!("data: {:?}", read.len()))
  298. .unwrap();
  299. let arg = u64::from_ne_bytes(read);
  300. seen.push(arg);
  301. }
  302. guard.clear_ready();
  303. }
  304. (seen, async_fd.into_inner())
  305. };
  306. let (writer, (seen, mut ring_buf)) = futures::future::join(writer, reader).await;
  307. writer.unwrap();
  308. // Make sure that there is nothing else in the ring_buf.
  309. assert_matches!(ring_buf.next(), None);
  310. // Ensure that the data that was read matches what was passed.
  311. assert_eq!(&seen, &expected);
  312. let Registers { dropped, rejected } = regs.get(&0, 0).unwrap().iter().sum();
  313. assert_eq!(dropped, 0);
  314. assert_eq!(rejected, (data.len() - expected.len()).try_into().unwrap());
  315. }
  316. // This test reproduces a bug where the ring buffer would not be notified of new entries if the
  317. // state was not properly synchronized between the producer and consumer. This would result in the
  318. // consumer never being woken up and the test hanging.
  319. #[test]
  320. fn ring_buf_epoll_wakeup() {
  321. let RingBufTest {
  322. mut ring_buf,
  323. _bpf,
  324. regs: _,
  325. } = RingBufTest::new();
  326. let epoll_fd = epoll::create(false).unwrap();
  327. epoll::ctl(
  328. epoll_fd,
  329. epoll::ControlOptions::EPOLL_CTL_ADD,
  330. ring_buf.as_raw_fd(),
  331. // The use of EPOLLET is intentional. Without it, level-triggering would result in
  332. // more notifications, and would mask the underlying bug this test reproduced when
  333. // the synchronization logic in the RingBuf mirrored that of libbpf. Also, tokio's
  334. // AsyncFd always uses this flag (as demonstrated in the subsequent test).
  335. epoll::Event::new(epoll::Events::EPOLLIN | epoll::Events::EPOLLET, 0),
  336. )
  337. .unwrap();
  338. let mut epoll_event_buf = [epoll::Event::new(epoll::Events::EPOLLIN, 0); 1];
  339. let mut total_events: u64 = 0;
  340. let writer = WriterThread::spawn();
  341. while total_events < WriterThread::NUM_MESSAGES {
  342. epoll::wait(epoll_fd, -1, &mut epoll_event_buf).unwrap();
  343. while let Some(read) = ring_buf.next() {
  344. assert_eq!(read.len(), 8);
  345. total_events += 1;
  346. }
  347. }
  348. writer.join();
  349. }
  350. // This test is like the above test but uses tokio and AsyncFd instead of raw epoll.
  351. #[test(tokio::test)]
  352. async fn ring_buf_asyncfd_events() {
  353. let RingBufTest {
  354. ring_buf,
  355. regs: _,
  356. _bpf,
  357. } = RingBufTest::new();
  358. let mut async_fd = AsyncFd::new(ring_buf).unwrap();
  359. let mut total_events = 0;
  360. let writer = WriterThread::spawn();
  361. while total_events < WriterThread::NUM_MESSAGES {
  362. let mut guard = async_fd.readable_mut().await.unwrap();
  363. let rb = guard.get_inner_mut();
  364. while let Some(read) = rb.next() {
  365. assert_eq!(read.len(), 8);
  366. total_events += 1;
  367. }
  368. guard.clear_ready();
  369. }
  370. writer.join();
  371. }
  372. // WriterThread triggers the ring_buf write continuously until the join() method is called. It is
  373. // used by both the epoll and async fd test that need frequent writes to the ring buffer to trigger
  374. // the memory synchronization bug that was fixed.
  375. struct WriterThread {
  376. thread: thread::JoinHandle<()>,
  377. done: Arc<AtomicBool>,
  378. }
  379. impl WriterThread {
  380. // When the ring buffer implementation uses Ordering::Relaxed to write the consumer position
  381. // rather than Ordering::SeqCst, the test will hang. This number was determined to be large
  382. // enough to tickle that bug on a hardware accelerated VM with 2 vCPUs.
  383. const NUM_MESSAGES: u64 = 20_000;
  384. fn spawn() -> Self {
  385. let done = Arc::new(AtomicBool::new(false));
  386. Self {
  387. thread: {
  388. let done = done.clone();
  389. thread::spawn(move || {
  390. while !done.load(Ordering::Relaxed) {
  391. // Write 0 which is even and won't be rejected.
  392. ring_buf_trigger_ebpf_program(0);
  393. }
  394. })
  395. },
  396. done,
  397. }
  398. }
  399. fn join(self) {
  400. let Self { thread, done } = self;
  401. done.store(true, Ordering::Relaxed);
  402. thread.join().unwrap();
  403. }
  404. }