|
@@ -77,27 +77,40 @@ fn bench_spsc_reusable(c: &mut Criterion) {
|
|
|
&size,
|
|
|
|b, &i| {
|
|
|
let rt = runtime::Builder::new_current_thread().build().unwrap();
|
|
|
- b.to_async(rt).iter(|| async {
|
|
|
- use tokio::sync::mpsc::{self, error::TrySendError};
|
|
|
- let (tx, mut rx) = mpsc::channel(100);
|
|
|
- task::spawn(async move {
|
|
|
- loop {
|
|
|
- // this actually brings Tokio's MPSC closer to what
|
|
|
- // `ThingBuf` can do than all the other impls --- we
|
|
|
- // only allocate if we _were_ able to reserve send
|
|
|
- // capacity. but, we will still allocate and
|
|
|
- // deallocate a string for every message...
|
|
|
- match tx.try_reserve() {
|
|
|
- Ok(permit) => permit.send(String::from(THE_STRING)),
|
|
|
- Err(TrySendError::Closed(_)) => break,
|
|
|
- _ => task::yield_now().await,
|
|
|
+ b.to_async(rt).iter(|| {
|
|
|
+ // turn off Tokio's automatic cooperative yielding for this
|
|
|
+ // benchmark. in code with a large number of concurrent
|
|
|
+ // tasks, this feature makes the MPSC channel (and other
|
|
|
+ // Tokio synchronization primitives) better "team players"
|
|
|
+ // than other implementations, since it prevents them from
|
|
|
+ // using too much scheduler time.
|
|
|
+ //
|
|
|
+ // in this benchmark, though, there *are* no other tasks
|
|
|
+ // running, so automatic yielding just means we spend more
|
|
|
+ // time ping-ponging through the scheduler than every other
|
|
|
+ // implementation.
|
|
|
+ tokio::task::unconstrained(async {
|
|
|
+ use tokio::sync::mpsc::{self, error::TrySendError};
|
|
|
+ let (tx, mut rx) = mpsc::channel(100);
|
|
|
+ task::spawn(tokio::task::unconstrained(async move {
|
|
|
+ loop {
|
|
|
+ // this actually brings Tokio's MPSC closer to what
|
|
|
+ // `ThingBuf` can do than all the other impls --- we
|
|
|
+ // only allocate if we _were_ able to reserve send
|
|
|
+ // capacity. but, we will still allocate and
|
|
|
+ // deallocate a string for every message...
|
|
|
+ match tx.try_reserve() {
|
|
|
+ Ok(permit) => permit.send(String::from(THE_STRING)),
|
|
|
+ Err(TrySendError::Closed(_)) => break,
|
|
|
+ _ => task::yield_now().await,
|
|
|
+ }
|
|
|
}
|
|
|
+ }));
|
|
|
+ for _ in 0..i {
|
|
|
+ let val = rx.recv().await.unwrap();
|
|
|
+ criterion::black_box(&val);
|
|
|
}
|
|
|
- });
|
|
|
- for _ in 0..i {
|
|
|
- let val = rx.recv().await.unwrap();
|
|
|
- criterion::black_box(&val);
|
|
|
- }
|
|
|
+ })
|
|
|
})
|
|
|
},
|
|
|
);
|
|
@@ -198,25 +211,38 @@ fn bench_spsc_integer(c: &mut Criterion) {
|
|
|
&size,
|
|
|
|b, &i| {
|
|
|
let rt = runtime::Builder::new_current_thread().build().unwrap();
|
|
|
- b.to_async(rt).iter(|| async {
|
|
|
- use tokio::sync::mpsc::{self, error::TrySendError};
|
|
|
- let (tx, mut rx) = mpsc::channel(100);
|
|
|
- task::spawn(async move {
|
|
|
- let mut i = 0;
|
|
|
- loop {
|
|
|
- match tx.try_send(i) {
|
|
|
- Ok(()) => {
|
|
|
- i += 1;
|
|
|
+ b.to_async(rt).iter(|| {
|
|
|
+ // turn off Tokio's automatic cooperative yielding for this
|
|
|
+ // benchmark. in code with a large number of concurrent
|
|
|
+ // tasks, this feature makes the MPSC channel (and other
|
|
|
+ // Tokio synchronization primitives) better "team players"
|
|
|
+ // than other implementations, since it prevents them from
|
|
|
+ // using too much scheduler time.
|
|
|
+ //
|
|
|
+ // in this benchmark, though, there *are* no other tasks
|
|
|
+ // running, so automatic yielding just means we spend more
|
|
|
+ // time ping-ponging through the scheduler than every other
|
|
|
+ // implementation.
|
|
|
+ tokio::task::unconstrained(async {
|
|
|
+ use tokio::sync::mpsc::{self, error::TrySendError};
|
|
|
+ let (tx, mut rx) = mpsc::channel(100);
|
|
|
+ task::spawn(tokio::task::unconstrained(async move {
|
|
|
+ let mut i = 0;
|
|
|
+ loop {
|
|
|
+ match tx.try_send(i) {
|
|
|
+ Ok(()) => {
|
|
|
+ i += 1;
|
|
|
+ }
|
|
|
+ Err(TrySendError::Closed(_)) => break,
|
|
|
+ _ => task::yield_now().await,
|
|
|
}
|
|
|
- Err(TrySendError::Closed(_)) => break,
|
|
|
- _ => task::yield_now().await,
|
|
|
}
|
|
|
+ }));
|
|
|
+ for n in 0..i {
|
|
|
+ let val = rx.recv().await.unwrap();
|
|
|
+ assert_eq!(n, val);
|
|
|
}
|
|
|
- });
|
|
|
- for n in 0..i {
|
|
|
- let val = rx.recv().await.unwrap();
|
|
|
- assert_eq!(n, val);
|
|
|
- }
|
|
|
+ })
|
|
|
})
|
|
|
},
|
|
|
);
|