ring_buf.rs 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417
  1. use std::{
  2. mem,
  3. os::fd::AsRawFd as _,
  4. sync::{
  5. atomic::{AtomicBool, Ordering},
  6. Arc,
  7. },
  8. thread,
  9. };
  10. use anyhow::Context as _;
  11. use assert_matches::assert_matches;
  12. use aya::{
  13. maps::{array::PerCpuArray, ring_buf::RingBuf, MapData},
  14. programs::UProbe,
  15. Ebpf, EbpfLoader,
  16. };
  17. use aya_obj::generated::BPF_RINGBUF_HDR_SZ;
  18. use integration_common::ring_buf::Registers;
  19. use rand::Rng as _;
  20. use test_log::test;
  21. use tokio::{
  22. io::unix::AsyncFd,
  23. time::{sleep, Duration},
  24. };
  25. struct RingBufTest {
  26. _bpf: Ebpf,
  27. ring_buf: RingBuf<MapData>,
  28. regs: PerCpuArray<MapData, Registers>,
  29. }
  30. // Note that it is important for this test that RING_BUF_MAX_ENTRIES ends up creating a ring buffer
  31. // that is exactly a power-of-two multiple of the page size. The synchronous test will fail if
  32. // that's not the case because the actual size will be rounded up, and fewer entries will be dropped
  33. // than expected.
  34. const RING_BUF_MAX_ENTRIES: usize = 512;
  35. impl RingBufTest {
  36. fn new() -> Self {
  37. const RING_BUF_BYTE_SIZE: u32 =
  38. (RING_BUF_MAX_ENTRIES * (mem::size_of::<u64>() + BPF_RINGBUF_HDR_SZ as usize)) as u32;
  39. // Use the loader API to control the size of the ring_buf.
  40. let mut bpf = EbpfLoader::new()
  41. .set_max_entries("RING_BUF", RING_BUF_BYTE_SIZE)
  42. .load(crate::RING_BUF)
  43. .unwrap();
  44. let ring_buf = bpf.take_map("RING_BUF").unwrap();
  45. let ring_buf = RingBuf::try_from(ring_buf).unwrap();
  46. let regs = bpf.take_map("REGISTERS").unwrap();
  47. let regs = PerCpuArray::<_, Registers>::try_from(regs).unwrap();
  48. let prog: &mut UProbe = bpf
  49. .program_mut("ring_buf_test")
  50. .unwrap()
  51. .try_into()
  52. .unwrap();
  53. prog.load().unwrap();
  54. prog.attach(
  55. "ring_buf_trigger_ebpf_program",
  56. "/proc/self/exe",
  57. None,
  58. None,
  59. )
  60. .unwrap();
  61. Self {
  62. _bpf: bpf,
  63. ring_buf,
  64. regs,
  65. }
  66. }
  67. }
  68. struct WithData(RingBufTest, Vec<u64>);
  69. impl WithData {
  70. fn new(n: usize) -> Self {
  71. Self(RingBufTest::new(), {
  72. let mut rng = rand::thread_rng();
  73. std::iter::repeat_with(|| rng.gen()).take(n).collect()
  74. })
  75. }
  76. }
  77. #[test_case::test_case(0; "write zero items")]
  78. #[test_case::test_case(1; "write one item")]
  79. #[test_case::test_case(RING_BUF_MAX_ENTRIES / 2; "write half the capacity items")]
  80. #[test_case::test_case(RING_BUF_MAX_ENTRIES - 1; "write one less than capacity items")]
  81. #[test_case::test_case(RING_BUF_MAX_ENTRIES * 8; "write more items than capacity")]
  82. fn ring_buf(n: usize) {
  83. let WithData(
  84. RingBufTest {
  85. mut ring_buf,
  86. regs,
  87. _bpf,
  88. },
  89. data,
  90. ) = WithData::new(n);
  91. // Note that after expected_capacity has been submitted, reserve calls in the probe will fail
  92. // and the probe will give up.
  93. let expected_capacity = RING_BUF_MAX_ENTRIES - 1;
  94. // Call the function that the uprobe is attached to with the data.
  95. let mut expected = Vec::new();
  96. let mut expected_rejected = 0u64;
  97. let mut expected_dropped = 0u64;
  98. for (i, &v) in data.iter().enumerate() {
  99. ring_buf_trigger_ebpf_program(v);
  100. if i >= expected_capacity {
  101. expected_dropped += 1;
  102. } else if v % 2 == 0 {
  103. expected.push(v);
  104. } else {
  105. expected_rejected += 1;
  106. }
  107. }
  108. let mut seen = Vec::<u64>::new();
  109. while seen.len() < expected.len() {
  110. if let Some(read) = ring_buf.next() {
  111. let read: [u8; 8] = (*read)
  112. .try_into()
  113. .with_context(|| format!("data: {:?}", read.len()))
  114. .unwrap();
  115. let arg = u64::from_ne_bytes(read);
  116. assert_eq!(arg % 2, 0, "got {arg} from probe");
  117. seen.push(arg);
  118. }
  119. }
  120. // Make sure that there is nothing else in the ring_buf.
  121. assert_matches!(ring_buf.next(), None);
  122. // Ensure that the data that was read matches what was passed, and the rejected count was set
  123. // properly.
  124. assert_eq!(seen, expected);
  125. let Registers { dropped, rejected } = regs.get(&0, 0).unwrap().iter().sum();
  126. assert_eq!(dropped, expected_dropped);
  127. assert_eq!(rejected, expected_rejected);
  128. }
  129. #[no_mangle]
  130. #[inline(never)]
  131. pub extern "C" fn ring_buf_trigger_ebpf_program(arg: u64) {
  132. std::hint::black_box(arg);
  133. }
  134. // This test differs from the other async test in that it's possible for the producer
  135. // to fill the ring_buf. We just ensure that the number of events we see is sane given
  136. // what the producer sees, and that the logic does not hang. This exercises interleaving
  137. // discards, successful commits, and drops due to the ring_buf being full.
  138. #[test(tokio::test(flavor = "multi_thread"))]
  139. async fn ring_buf_async_with_drops() {
  140. let WithData(
  141. RingBufTest {
  142. ring_buf,
  143. regs,
  144. _bpf,
  145. },
  146. data,
  147. ) = WithData::new(RING_BUF_MAX_ENTRIES * 8);
  148. let mut async_fd = AsyncFd::new(ring_buf).unwrap();
  149. // Spawn the writer which internally will spawn many parallel writers.
  150. // Construct an AsyncFd from the RingBuf in order to receive readiness notifications.
  151. let mut seen = 0;
  152. let mut process_ring_buf = |ring_buf: &mut RingBuf<_>| {
  153. while let Some(read) = ring_buf.next() {
  154. let read: [u8; 8] = (*read)
  155. .try_into()
  156. .with_context(|| format!("data: {:?}", read.len()))
  157. .unwrap();
  158. let arg = u64::from_ne_bytes(read);
  159. assert_eq!(arg % 2, 0, "got {arg} from probe");
  160. seen += 1;
  161. }
  162. };
  163. use futures::future::{
  164. select,
  165. Either::{Left, Right},
  166. };
  167. let writer = futures::future::try_join_all(data.chunks(8).map(ToOwned::to_owned).map(|v| {
  168. tokio::spawn(async {
  169. for value in v {
  170. ring_buf_trigger_ebpf_program(value);
  171. }
  172. })
  173. }));
  174. let readable = {
  175. let mut writer = writer;
  176. loop {
  177. let readable = Box::pin(async_fd.readable_mut());
  178. writer = match select(readable, writer).await {
  179. Left((guard, writer)) => {
  180. let mut guard = guard.unwrap();
  181. process_ring_buf(guard.get_inner_mut());
  182. guard.clear_ready();
  183. writer
  184. }
  185. Right((writer, readable)) => {
  186. writer.unwrap();
  187. break readable;
  188. }
  189. }
  190. }
  191. };
  192. // If there's more to read, we should receive a readiness notification in a timely manner.
  193. // If we don't then, then assert that there's nothing else to read. Note that it's important
  194. // to wait some time before attempting to read, otherwise we may catch up with the producer
  195. // before epoll has an opportunity to send a notification; our consumer thread can race
  196. // with the kernel epoll check.
  197. let sleep_fut = sleep(Duration::from_millis(10));
  198. tokio::pin!(sleep_fut);
  199. match select(sleep_fut, readable).await {
  200. Left(((), _)) => {}
  201. Right((guard, _)) => {
  202. let mut guard = guard.unwrap();
  203. process_ring_buf(guard.get_inner_mut());
  204. guard.clear_ready();
  205. }
  206. }
  207. // Make sure that there is nothing else in the ring_buf.
  208. assert_matches!(async_fd.into_inner().next(), None);
  209. let max_dropped: u64 = u64::try_from(
  210. data.len()
  211. .checked_sub(RING_BUF_MAX_ENTRIES - 1)
  212. .unwrap_or_default(),
  213. )
  214. .unwrap();
  215. let max_seen = u64::try_from(data.iter().filter(|v| *v % 2 == 0).count()).unwrap();
  216. let max_rejected = u64::try_from(data.len()).unwrap() - max_seen;
  217. let Registers { dropped, rejected } = regs.get(&0, 0).unwrap().iter().sum();
  218. let total = u64::try_from(data.len()).unwrap();
  219. let min_seen = max_seen.checked_sub(max_dropped).unwrap_or_default();
  220. let min_rejected = max_rejected.checked_sub(dropped).unwrap_or_default();
  221. let facts = format!(
  222. "seen={seen}, rejected={rejected}, dropped={dropped}, total={total}, max_seen={max_seen}, \
  223. max_rejected={max_rejected}, max_dropped={max_dropped}",
  224. );
  225. assert_eq!(seen + rejected + dropped, total, "{facts}",);
  226. assert!(
  227. (0u64..=max_dropped).contains(&dropped),
  228. "dropped={dropped} not in 0..={max_dropped}; {facts}",
  229. );
  230. assert!(
  231. (min_rejected..=max_rejected).contains(&rejected),
  232. "rejected={rejected} not in {min_rejected}..={max_rejected}; {facts}",
  233. );
  234. assert!(
  235. (min_seen..=max_seen).contains(&seen),
  236. "seen={seen} not in {min_seen}..={max_seen}, rejected={rejected}; {facts}",
  237. );
  238. }
  239. #[test(tokio::test(flavor = "multi_thread"))]
  240. async fn ring_buf_async_no_drop() {
  241. let WithData(
  242. RingBufTest {
  243. ring_buf,
  244. regs,
  245. _bpf,
  246. },
  247. data,
  248. ) = WithData::new(RING_BUF_MAX_ENTRIES * 3);
  249. let writer = {
  250. let data = data.to_owned();
  251. tokio::spawn(async move {
  252. for value in data {
  253. // Sleep a tad so we feel confident that the consumer will keep up
  254. // and no messages will be dropped.
  255. let dur = Duration::from_nanos(rand::thread_rng().gen_range(0..10));
  256. sleep(dur).await;
  257. ring_buf_trigger_ebpf_program(value);
  258. }
  259. })
  260. };
  261. // Construct an AsyncFd from the RingBuf in order to receive readiness notifications.
  262. let mut async_fd = AsyncFd::new(ring_buf).unwrap();
  263. // Note that unlike in the synchronous case where all of the entries are written before any of
  264. // them are read, in this case we expect all of the entries to make their way to userspace
  265. // because entries are being consumed as they are produced.
  266. let expected: Vec<u64> = data.iter().cloned().filter(|v| *v % 2 == 0).collect();
  267. let expected_len = expected.len();
  268. let reader = async move {
  269. let mut seen = Vec::with_capacity(expected_len);
  270. while seen.len() < expected_len {
  271. let mut guard = async_fd.readable_mut().await.unwrap();
  272. let ring_buf = guard.get_inner_mut();
  273. while let Some(read) = ring_buf.next() {
  274. let read: [u8; 8] = (*read)
  275. .try_into()
  276. .with_context(|| format!("data: {:?}", read.len()))
  277. .unwrap();
  278. let arg = u64::from_ne_bytes(read);
  279. seen.push(arg);
  280. }
  281. guard.clear_ready();
  282. }
  283. (seen, async_fd.into_inner())
  284. };
  285. let (writer, (seen, mut ring_buf)) = futures::future::join(writer, reader).await;
  286. writer.unwrap();
  287. // Make sure that there is nothing else in the ring_buf.
  288. assert_matches!(ring_buf.next(), None);
  289. // Ensure that the data that was read matches what was passed.
  290. assert_eq!(&seen, &expected);
  291. let Registers { dropped, rejected } = regs.get(&0, 0).unwrap().iter().sum();
  292. assert_eq!(dropped, 0);
  293. assert_eq!(rejected, (data.len() - expected.len()).try_into().unwrap());
  294. }
  295. // This test reproduces a bug where the ring buffer would not be notified of new entries if the
  296. // state was not properly synchronized between the producer and consumer. This would result in the
  297. // consumer never being woken up and the test hanging.
  298. #[test]
  299. fn ring_buf_epoll_wakeup() {
  300. let RingBufTest {
  301. mut ring_buf,
  302. _bpf,
  303. regs: _,
  304. } = RingBufTest::new();
  305. let epoll_fd = epoll::create(false).unwrap();
  306. epoll::ctl(
  307. epoll_fd,
  308. epoll::ControlOptions::EPOLL_CTL_ADD,
  309. ring_buf.as_raw_fd(),
  310. // The use of EPOLLET is intentional. Without it, level-triggering would result in
  311. // more notifications, and would mask the underlying bug this test reproduced when
  312. // the synchronization logic in the RingBuf mirrored that of libbpf. Also, tokio's
  313. // AsyncFd always uses this flag (as demonstrated in the subsequent test).
  314. epoll::Event::new(epoll::Events::EPOLLIN | epoll::Events::EPOLLET, 0),
  315. )
  316. .unwrap();
  317. let mut epoll_event_buf = [epoll::Event::new(epoll::Events::EPOLLIN, 0); 1];
  318. let mut total_events: u64 = 0;
  319. let writer = WriterThread::spawn();
  320. while total_events < WriterThread::NUM_MESSAGES {
  321. epoll::wait(epoll_fd, -1, &mut epoll_event_buf).unwrap();
  322. while let Some(read) = ring_buf.next() {
  323. assert_eq!(read.len(), 8);
  324. total_events += 1;
  325. }
  326. }
  327. writer.join();
  328. }
  329. // This test is like the above test but uses tokio and AsyncFd instead of raw epoll.
  330. #[test(tokio::test)]
  331. async fn ring_buf_asyncfd_events() {
  332. let RingBufTest {
  333. ring_buf,
  334. regs: _,
  335. _bpf,
  336. } = RingBufTest::new();
  337. let mut async_fd = AsyncFd::new(ring_buf).unwrap();
  338. let mut total_events = 0;
  339. let writer = WriterThread::spawn();
  340. while total_events < WriterThread::NUM_MESSAGES {
  341. let mut guard = async_fd.readable_mut().await.unwrap();
  342. let rb = guard.get_inner_mut();
  343. while let Some(read) = rb.next() {
  344. assert_eq!(read.len(), 8);
  345. total_events += 1;
  346. }
  347. guard.clear_ready();
  348. }
  349. writer.join();
  350. }
  351. // WriterThread triggers the ring_buf write continuously until the join() method is called. It is
  352. // used by both the epoll and async fd test that need frequent writes to the ring buffer to trigger
  353. // the memory synchronization bug that was fixed.
  354. struct WriterThread {
  355. thread: thread::JoinHandle<()>,
  356. done: Arc<AtomicBool>,
  357. }
  358. impl WriterThread {
  359. // When the ring buffer implementation uses Ordering::Relaxed to write the consumer position
  360. // rather than Ordering::SeqCst, the test will hang. This number was determined to be large
  361. // enough to tickle that bug on a hardware accelerated VM with 2 vCPUs.
  362. const NUM_MESSAGES: u64 = 20_000;
  363. fn spawn() -> Self {
  364. let done = Arc::new(AtomicBool::new(false));
  365. Self {
  366. thread: {
  367. let done = done.clone();
  368. thread::spawn(move || {
  369. while !done.load(Ordering::Relaxed) {
  370. // Write 0 which is even and won't be rejected.
  371. ring_buf_trigger_ebpf_program(0);
  372. }
  373. })
  374. },
  375. done,
  376. }
  377. }
  378. fn join(self) {
  379. let Self { thread, done } = self;
  380. done.store(true, Ordering::Relaxed);
  381. thread.join().unwrap();
  382. }
  383. }