|
@@ -6,6 +6,7 @@ use std::{
|
|
|
atomic::{AtomicBool, Ordering},
|
|
|
},
|
|
|
thread,
|
|
|
+ time::Duration,
|
|
|
};
|
|
|
|
|
|
use anyhow::Context as _;
|
|
@@ -19,10 +20,7 @@ use aya_obj::generated::BPF_RINGBUF_HDR_SZ;
|
|
|
use integration_common::ring_buf::Registers;
|
|
|
use rand::Rng as _;
|
|
|
use test_log::test;
|
|
|
-use tokio::{
|
|
|
- io::unix::AsyncFd,
|
|
|
- time::{Duration, sleep},
|
|
|
-};
|
|
|
+use tokio::io::unix::AsyncFd;
|
|
|
|
|
|
struct RingBufTest {
|
|
|
_bpf: Ebpf,
|
|
@@ -178,49 +176,42 @@ async fn ring_buf_async_with_drops() {
|
|
|
seen += 1;
|
|
|
}
|
|
|
};
|
|
|
- use futures::future::{
|
|
|
- Either::{Left, Right},
|
|
|
- select,
|
|
|
- };
|
|
|
- let writer = futures::future::try_join_all(data.chunks(8).map(ToOwned::to_owned).map(|v| {
|
|
|
- tokio::spawn(async {
|
|
|
- for value in v {
|
|
|
- ring_buf_trigger_ebpf_program(value);
|
|
|
- }
|
|
|
- })
|
|
|
- }));
|
|
|
- let readable = {
|
|
|
- let mut writer = writer;
|
|
|
- loop {
|
|
|
- let readable = Box::pin(async_fd.readable_mut());
|
|
|
- writer = match select(readable, writer).await {
|
|
|
- Left((guard, writer)) => {
|
|
|
- let mut guard = guard.unwrap();
|
|
|
- process_ring_buf(guard.get_inner_mut());
|
|
|
- guard.clear_ready();
|
|
|
- writer
|
|
|
- }
|
|
|
- Right((writer, readable)) => {
|
|
|
- writer.unwrap();
|
|
|
- break readable;
|
|
|
+ let mut writer =
|
|
|
+ futures::future::try_join_all(data.chunks(8).map(ToOwned::to_owned).map(|v| {
|
|
|
+ tokio::spawn(async {
|
|
|
+ for value in v {
|
|
|
+ ring_buf_trigger_ebpf_program(value);
|
|
|
}
|
|
|
+ })
|
|
|
+ }));
|
|
|
+ loop {
|
|
|
+ let readable = async_fd.readable_mut();
|
|
|
+ futures::pin_mut!(readable);
|
|
|
+ match futures::future::select(readable, &mut writer).await {
|
|
|
+ futures::future::Either::Left((guard, _writer)) => {
|
|
|
+ let mut guard = guard.unwrap();
|
|
|
+ process_ring_buf(guard.get_inner_mut());
|
|
|
+ guard.clear_ready();
|
|
|
}
|
|
|
- }
|
|
|
- };
|
|
|
+ futures::future::Either::Right((writer, readable)) => {
|
|
|
+ writer.unwrap();
|
|
|
+
|
|
|
+ // If there's more to read, we should receive a readiness notification in a timely
|
|
|
+ // manner. If we don't then, then assert that there's nothing else to read. Note
|
|
|
+ // that it's important to wait some time before attempting to read, otherwise we may
|
|
|
+ // catch up with the producer before epoll has an opportunity to send a
|
|
|
+ // notification; our consumer thread can race with the kernel epoll check.
|
|
|
+ match tokio::time::timeout(Duration::from_millis(10), readable).await {
|
|
|
+ Err(tokio::time::error::Elapsed { .. }) => (),
|
|
|
+ Ok(guard) => {
|
|
|
+ let mut guard = guard.unwrap();
|
|
|
+ process_ring_buf(guard.get_inner_mut());
|
|
|
+ guard.clear_ready();
|
|
|
+ }
|
|
|
+ }
|
|
|
|
|
|
- // If there's more to read, we should receive a readiness notification in a timely manner.
|
|
|
- // If we don't then, then assert that there's nothing else to read. Note that it's important
|
|
|
- // to wait some time before attempting to read, otherwise we may catch up with the producer
|
|
|
- // before epoll has an opportunity to send a notification; our consumer thread can race
|
|
|
- // with the kernel epoll check.
|
|
|
- let sleep_fut = sleep(Duration::from_millis(10));
|
|
|
- tokio::pin!(sleep_fut);
|
|
|
- match select(sleep_fut, readable).await {
|
|
|
- Left(((), _)) => {}
|
|
|
- Right((guard, _)) => {
|
|
|
- let mut guard = guard.unwrap();
|
|
|
- process_ring_buf(guard.get_inner_mut());
|
|
|
- guard.clear_ready();
|
|
|
+ break;
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -280,7 +271,7 @@ async fn ring_buf_async_no_drop() {
|
|
|
for (value, duration) in data {
|
|
|
// Sleep a tad so we feel confident that the consumer will keep up
|
|
|
// and no messages will be dropped.
|
|
|
- sleep(duration).await;
|
|
|
+ tokio::time::sleep(duration).await;
|
|
|
ring_buf_trigger_ebpf_program(value);
|
|
|
}
|
|
|
})
|