ring_buf.rs 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463
  1. //! A [ring buffer map][ringbuf] that may be used to receive events from eBPF programs.
  2. //! As of Linux 5.8, this is the preferred way to transfer per-event data from eBPF
  3. //! programs to userspace.
  4. //!
  5. //! [ringbuf]: https://www.kernel.org/doc/html/latest/bpf/ringbuf.html
  6. use std::{
  7. borrow::Borrow,
  8. ffi::{c_int, c_void},
  9. fmt::{self, Debug, Formatter},
  10. io, mem,
  11. ops::Deref,
  12. os::fd::{AsFd as _, AsRawFd, BorrowedFd, RawFd},
  13. ptr,
  14. ptr::NonNull,
  15. slice,
  16. sync::atomic::{AtomicU32, AtomicUsize, Ordering},
  17. };
  18. use aya_obj::generated::{BPF_RINGBUF_BUSY_BIT, BPF_RINGBUF_DISCARD_BIT, BPF_RINGBUF_HDR_SZ};
  19. use libc::{munmap, off_t, MAP_FAILED, MAP_SHARED, PROT_READ, PROT_WRITE};
  20. use crate::{
  21. maps::{MapData, MapError},
  22. sys::{mmap, SyscallError},
  23. util::page_size,
  24. };
  25. /// A map that can be used to receive events from eBPF programs.
  26. ///
  27. /// This is similar to [`crate::maps::PerfEventArray`], but different in a few ways:
  28. /// * It's shared across all CPUs, which allows a strong ordering between events.
  29. /// * Data notifications are delivered precisely instead of being sampled for every N events; the
  30. /// eBPF program can also control notification delivery if sampling is desired for performance
  31. /// reasons. By default, a notification will be sent if the consumer is caught up at the time of
  32. /// committing. The eBPF program can use the `BPF_RB_NO_WAKEUP` or `BPF_RB_FORCE_WAKEUP` flags to
  33. /// control this behavior.
  34. /// * On the eBPF side, it supports the reserve-commit pattern where the event can be directly
  35. /// written into the ring without copying from a temporary location.
  36. /// * Dropped sample notifications go to the eBPF program as the return value of `reserve`/`output`,
  37. /// and not the userspace reader. This might require extra code to handle, but allows for more
  38. /// flexible schemes to handle dropped samples.
  39. ///
  40. /// To receive events you need to:
  41. /// * Construct [`RingBuf`] using [`RingBuf::try_from`].
  42. /// * Call [`RingBuf::next`] to poll events from the [`RingBuf`].
  43. ///
  44. /// To receive async notifications of data availability, you may construct an
  45. /// [`tokio::io::unix::AsyncFd`] from the [`RingBuf`]'s file descriptor and poll it for readiness.
  46. ///
  47. /// # Minimum kernel version
  48. ///
  49. /// The minimum kernel version required to use this feature is 5.8.
  50. ///
  51. /// # Examples
  52. ///
  53. /// ```no_run
  54. /// # struct PollFd<T>(T);
  55. /// # fn poll_fd<T>(t: T) -> PollFd<T> { PollFd(t) }
  56. /// # impl<T> PollFd<T> {
  57. /// # fn readable(&mut self) -> Guard<'_, T> { Guard(self) }
  58. /// # }
  59. /// # struct Guard<'a, T>(&'a mut PollFd<T>);
  60. /// # impl<T> Guard<'_, T> {
  61. /// # fn inner_mut(&mut self) -> &mut T {
  62. /// # let Guard(PollFd(t)) = self;
  63. /// # t
  64. /// # }
  65. /// # fn clear_ready(&mut self) {}
  66. /// # }
  67. /// # let mut bpf = aya::Ebpf::load(&[])?;
  68. /// use aya::maps::RingBuf;
  69. /// use std::convert::TryFrom;
  70. ///
  71. /// let ring_buf = RingBuf::try_from(bpf.map_mut("ARRAY").unwrap()).unwrap();
  72. /// let mut poll = poll_fd(ring_buf);
  73. /// loop {
  74. /// let mut guard = poll.readable();
  75. /// let ring_buf = guard.inner_mut();
  76. /// while let Some(item) = ring_buf.next() {
  77. /// println!("Received: {:?}", item);
  78. /// }
  79. /// guard.clear_ready();
  80. /// }
  81. /// # Ok::<(), aya::EbpfError>(())
  82. /// ```
  83. ///
  84. /// # Polling
  85. ///
  86. /// In the example above the implementations of poll(), poll.readable(), guard.inner_mut(), and
  87. /// guard.clear_ready() are not given. RingBuf implements the AsRawFd trait, so you can implement
  88. /// polling using any crate that can poll file descriptors, like epoll, mio etc. The above example
  89. /// API is motivated by that of [`tokio::io::unix::AsyncFd`].
  90. #[doc(alias = "BPF_MAP_TYPE_RINGBUF")]
  91. pub struct RingBuf<T> {
  92. map: T,
  93. consumer: ConsumerPos,
  94. producer: ProducerData,
  95. }
  96. impl<T: Borrow<MapData>> RingBuf<T> {
  97. pub(crate) fn new(map: T) -> Result<Self, MapError> {
  98. let data: &MapData = map.borrow();
  99. let page_size = page_size();
  100. let map_fd = data.fd().as_fd();
  101. let byte_size = data.obj.max_entries();
  102. let consumer_metadata = ConsumerMetadata::new(map_fd, 0, page_size)?;
  103. let consumer = ConsumerPos::new(consumer_metadata);
  104. let producer = ProducerData::new(map_fd, page_size, page_size, byte_size)?;
  105. Ok(Self {
  106. map,
  107. consumer,
  108. producer,
  109. })
  110. }
  111. }
  112. impl<T> RingBuf<T> {
  113. /// Try to take a new entry from the ringbuf.
  114. ///
  115. /// Returns `Some(item)` if the ringbuf is not empty. Returns `None` if the ringbuf is empty, in
  116. /// which case the caller may register for availability notifications through `epoll` or other
  117. /// APIs. Only one RingBufItem may be outstanding at a time.
  118. //
  119. // This is not an implementation of `Iterator` because we need to be able to refer to the
  120. // lifetime of the iterator in the returned `RingBufItem`. If the Iterator::Item leveraged GATs,
  121. // one could imagine an implementation of `Iterator` that would work. GATs are stabilized in
  122. // Rust 1.65, but there's not yet a trait that the community seems to have standardized around.
  123. #[allow(clippy::should_implement_trait)]
  124. pub fn next(&mut self) -> Option<RingBufItem<'_>> {
  125. let Self {
  126. consumer, producer, ..
  127. } = self;
  128. producer.next(consumer)
  129. }
  130. }
  131. /// Access to the RawFd can be used to construct an AsyncFd for use with epoll.
  132. impl<T: Borrow<MapData>> AsRawFd for RingBuf<T> {
  133. fn as_raw_fd(&self) -> RawFd {
  134. let Self {
  135. map,
  136. consumer: _,
  137. producer: _,
  138. } = self;
  139. map.borrow().fd().as_fd().as_raw_fd()
  140. }
  141. }
  142. /// The current outstanding item read from the ringbuf.
  143. pub struct RingBufItem<'a> {
  144. data: &'a [u8],
  145. consumer: &'a mut ConsumerPos,
  146. }
  147. impl Deref for RingBufItem<'_> {
  148. type Target = [u8];
  149. fn deref(&self) -> &Self::Target {
  150. let Self { data, .. } = self;
  151. data
  152. }
  153. }
  154. impl Drop for RingBufItem<'_> {
  155. fn drop(&mut self) {
  156. let Self { consumer, data } = self;
  157. consumer.consume(data.len())
  158. }
  159. }
  160. impl Debug for RingBufItem<'_> {
  161. fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
  162. let Self {
  163. data,
  164. consumer:
  165. ConsumerPos {
  166. pos,
  167. metadata: ConsumerMetadata { mmap: _ },
  168. },
  169. } = self;
  170. // In general Relaxed here is sufficient, for debugging, it certainly is.
  171. f.debug_struct("RingBufItem")
  172. .field("pos", pos)
  173. .field("len", &data.len())
  174. .finish()
  175. }
  176. }
  177. struct ConsumerMetadata {
  178. mmap: MMap,
  179. }
  180. impl ConsumerMetadata {
  181. fn new(fd: BorrowedFd<'_>, offset: usize, page_size: usize) -> Result<Self, MapError> {
  182. let mmap = MMap::new(
  183. fd,
  184. page_size,
  185. PROT_READ | PROT_WRITE,
  186. MAP_SHARED,
  187. offset.try_into().unwrap(),
  188. )?;
  189. Ok(Self { mmap })
  190. }
  191. }
  192. impl AsRef<AtomicUsize> for ConsumerMetadata {
  193. fn as_ref(&self) -> &AtomicUsize {
  194. let Self {
  195. mmap: MMap { ptr, .. },
  196. } = self;
  197. unsafe { ptr.cast::<AtomicUsize>().as_ref() }
  198. }
  199. }
  200. struct ConsumerPos {
  201. pos: usize,
  202. metadata: ConsumerMetadata,
  203. }
  204. impl ConsumerPos {
  205. fn new(metadata: ConsumerMetadata) -> Self {
  206. // Load the initial value of the consumer position. SeqCst is used to be safe given we don't
  207. // have any claims about memory synchronization performed by some previous writer.
  208. let pos = metadata.as_ref().load(Ordering::SeqCst);
  209. Self { pos, metadata }
  210. }
  211. fn consume(&mut self, len: usize) {
  212. let Self { pos, metadata } = self;
  213. // TODO: Use primitive method when https://github.com/rust-lang/rust/issues/88581 is stabilized.
  214. fn next_multiple_of(n: usize, multiple: usize) -> usize {
  215. match n % multiple {
  216. 0 => n,
  217. rem => n + (multiple - rem),
  218. }
  219. }
  220. *pos += next_multiple_of(usize::try_from(BPF_RINGBUF_HDR_SZ).unwrap() + len, 8);
  221. // Write operation needs to be properly ordered with respect to the producer committing new
  222. // data to the ringbuf. The producer uses xchg (SeqCst) to commit new data [1]. The producer
  223. // reads the consumer offset after clearing the busy bit on a new entry [2]. By using SeqCst
  224. // here we ensure that either a subsequent read by the consumer to consume messages will see
  225. // an available message, or the producer in the kernel will see the updated consumer offset
  226. // that is caught up.
  227. //
  228. // [1]: https://github.com/torvalds/linux/blob/2772d7df/kernel/bpf/ringbuf.c#L487-L488
  229. // [2]: https://github.com/torvalds/linux/blob/2772d7df/kernel/bpf/ringbuf.c#L494
  230. metadata.as_ref().store(*pos, Ordering::SeqCst);
  231. }
  232. }
  233. struct ProducerData {
  234. mmap: MMap,
  235. // Offset in the mmap where the data starts.
  236. data_offset: usize,
  237. // A cache of the value of the producer position. It is used to avoid re-reading the producer
  238. // position when we know there is more data to consume.
  239. pos_cache: usize,
  240. // A bitmask which truncates u32 values to the domain of valid offsets in the ringbuf.
  241. mask: u32,
  242. }
  243. impl ProducerData {
  244. fn new(
  245. fd: BorrowedFd<'_>,
  246. offset: usize,
  247. page_size: usize,
  248. byte_size: u32,
  249. ) -> Result<Self, MapError> {
  250. // The producer pages have one page of metadata and then the data pages, all mapped
  251. // read-only. Note that the length of the mapping includes the data pages twice as the
  252. // kernel will map them two time consecutively to avoid special handling of entries that
  253. // cross over the end of the ring buffer.
  254. //
  255. // The kernel diagram below shows the layout of the ring buffer. It references "meta pages",
  256. // but we only map exactly one producer meta page read-only. The consumer meta page is mapped
  257. // read-write elsewhere, and is taken into consideration via the offset parameter.
  258. //
  259. // From kernel/bpf/ringbuf.c[0]:
  260. //
  261. // Each data page is mapped twice to allow "virtual"
  262. // continuous read of samples wrapping around the end of ring
  263. // buffer area:
  264. // ------------------------------------------------------
  265. // | meta pages | real data pages | same data pages |
  266. // ------------------------------------------------------
  267. // | | 1 2 3 4 5 6 7 8 9 | 1 2 3 4 5 6 7 8 9 |
  268. // ------------------------------------------------------
  269. // | | TA DA | TA DA |
  270. // ------------------------------------------------------
  271. // ^^^^^^^
  272. // |
  273. // Here, no need to worry about special handling of wrapped-around
  274. // data due to double-mapped data pages. This works both in kernel and
  275. // when mmap()'ed in user-space, simplifying both kernel and
  276. // user-space implementations significantly.
  277. //
  278. // [0]: https://github.com/torvalds/linux/blob/3f01e9fe/kernel/bpf/ringbuf.c#L108-L124
  279. let len = page_size + 2 * usize::try_from(byte_size).unwrap();
  280. let mmap = MMap::new(fd, len, PROT_READ, MAP_SHARED, offset.try_into().unwrap())?;
  281. // byte_size is required to be a power of two multiple of page_size (which implicitly is a
  282. // power of 2), so subtracting one will create a bitmask for values less than byte_size.
  283. debug_assert!(byte_size.is_power_of_two());
  284. let mask = byte_size - 1;
  285. Ok(Self {
  286. mmap,
  287. data_offset: page_size,
  288. pos_cache: 0,
  289. mask,
  290. })
  291. }
  292. fn next<'a>(&'a mut self, consumer: &'a mut ConsumerPos) -> Option<RingBufItem<'a>> {
  293. let Self {
  294. ref mmap,
  295. data_offset,
  296. pos_cache,
  297. mask,
  298. } = self;
  299. let pos = unsafe { mmap.ptr.cast().as_ref() };
  300. let mmap_data = mmap.as_ref();
  301. let data_pages = mmap_data.get(*data_offset..).unwrap_or_else(|| {
  302. panic!(
  303. "offset {} out of bounds, data len {}",
  304. data_offset,
  305. mmap_data.len()
  306. )
  307. });
  308. while data_available(pos, pos_cache, consumer) {
  309. match read_item(data_pages, *mask, consumer) {
  310. Item::Busy => return None,
  311. Item::Discard { len } => consumer.consume(len),
  312. Item::Data(data) => return Some(RingBufItem { data, consumer }),
  313. }
  314. }
  315. return None;
  316. enum Item<'a> {
  317. Busy,
  318. Discard { len: usize },
  319. Data(&'a [u8]),
  320. }
  321. fn data_available(
  322. producer: &AtomicUsize,
  323. cache: &mut usize,
  324. consumer: &ConsumerPos,
  325. ) -> bool {
  326. let ConsumerPos { pos: consumer, .. } = consumer;
  327. if consumer == cache {
  328. // This value is written using Release by the kernel [1], and should be read with
  329. // Acquire to ensure that the prior writes to the entry header are visible.
  330. //
  331. // [1]: https://github.com/torvalds/linux/blob/eb26cbb1/kernel/bpf/ringbuf.c#L447-L448
  332. *cache = producer.load(Ordering::Acquire);
  333. }
  334. // Note that we don't compare the order of the values because the producer position may
  335. // overflow u32 and wrap around to 0. Instead we just compare equality and assume that
  336. // the consumer position is always logically less than the producer position.
  337. //
  338. // Note also that the kernel, at the time of writing [1], doesn't seem to handle this
  339. // overflow correctly at all, and it's not clear that one can produce events after the
  340. // producer position has wrapped around.
  341. //
  342. // [1]: https://github.com/torvalds/linux/blob/4b810bf0/kernel/bpf/ringbuf.c#L434-L440
  343. consumer != cache
  344. }
  345. fn read_item<'data>(data: &'data [u8], mask: u32, pos: &ConsumerPos) -> Item<'data> {
  346. let ConsumerPos { pos, .. } = pos;
  347. let offset = pos & usize::try_from(mask).unwrap();
  348. let must_get_data = |offset, len| {
  349. data.get(offset..offset + len).unwrap_or_else(|| {
  350. panic!("{:?} not in {:?}", offset..offset + len, 0..data.len())
  351. })
  352. };
  353. let header_ptr =
  354. must_get_data(offset, mem::size_of::<AtomicU32>()).as_ptr() as *const AtomicU32;
  355. // Pair the kernel's SeqCst write (implies Release) [1] with an Acquire load. This
  356. // ensures data written by the producer will be visible.
  357. //
  358. // [1]: https://github.com/torvalds/linux/blob/eb26cbb1/kernel/bpf/ringbuf.c#L488
  359. let header = unsafe { &*header_ptr }.load(Ordering::Acquire);
  360. if header & BPF_RINGBUF_BUSY_BIT != 0 {
  361. Item::Busy
  362. } else {
  363. let len = usize::try_from(header & mask).unwrap();
  364. if header & BPF_RINGBUF_DISCARD_BIT != 0 {
  365. Item::Discard { len }
  366. } else {
  367. let data_offset = offset + usize::try_from(BPF_RINGBUF_HDR_SZ).unwrap();
  368. let data = must_get_data(data_offset, len);
  369. Item::Data(data)
  370. }
  371. }
  372. }
  373. }
  374. }
  375. // MMap corresponds to a memory-mapped region.
  376. //
  377. // The data is unmapped in Drop.
  378. struct MMap {
  379. ptr: NonNull<c_void>,
  380. len: usize,
  381. }
  382. // Needed because NonNull<T> is !Send and !Sync out of caution that the data
  383. // might be aliased unsafely.
  384. unsafe impl Send for MMap {}
  385. unsafe impl Sync for MMap {}
  386. impl MMap {
  387. fn new(
  388. fd: BorrowedFd<'_>,
  389. len: usize,
  390. prot: c_int,
  391. flags: c_int,
  392. offset: off_t,
  393. ) -> Result<Self, MapError> {
  394. match unsafe { mmap(ptr::null_mut(), len, prot, flags, fd, offset) } {
  395. MAP_FAILED => Err(MapError::SyscallError(SyscallError {
  396. call: "mmap",
  397. io_error: io::Error::last_os_error(),
  398. })),
  399. ptr => Ok(Self {
  400. ptr: NonNull::new(ptr).ok_or(
  401. // This should never happen, but to be paranoid, and so we never need to talk
  402. // about a null pointer, we check it anyway.
  403. MapError::SyscallError(SyscallError {
  404. call: "mmap",
  405. io_error: io::Error::new(
  406. io::ErrorKind::Other,
  407. "mmap returned null pointer",
  408. ),
  409. }),
  410. )?,
  411. len,
  412. }),
  413. }
  414. }
  415. }
  416. impl AsRef<[u8]> for MMap {
  417. fn as_ref(&self) -> &[u8] {
  418. let Self { ptr, len } = self;
  419. unsafe { slice::from_raw_parts(ptr.as_ptr().cast(), *len) }
  420. }
  421. }
  422. impl Drop for MMap {
  423. fn drop(&mut self) {
  424. let Self { ptr, len } = *self;
  425. unsafe { munmap(ptr.as_ptr(), len) };
  426. }
  427. }