123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283 |
- use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion, Throughput};
- use tokio::{runtime, task};
- fn bench_spsc_reusable(c: &mut Criterion) {
- let mut group = c.benchmark_group("async/spsc_reusable");
- static THE_STRING: &str = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa\
- aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa\
- aaaaaaaaaaaaaa";
- for size in [100, 500, 1_000, 5_000, 10_000] {
- group.throughput(Throughput::Elements(size));
- group.bench_with_input(BenchmarkId::new("ThingBuf", size), &size, |b, &i| {
- let rt = runtime::Builder::new_current_thread().build().unwrap();
- b.to_async(rt).iter(|| async {
- use thingbuf::{
- mpsc::{self, TrySendError},
- ThingBuf,
- };
- let (tx, rx) = mpsc::channel(ThingBuf::new(100));
- task::spawn(async move {
- loop {
- match tx.try_send_ref() {
- Ok(mut r) => r.with_mut(|s: &mut String| {
- s.clear();
- s.push_str(THE_STRING)
- }),
- Err(TrySendError::Closed(_)) => break,
- _ => task::yield_now().await,
- }
- }
- });
- for _ in 0..i {
- let r = rx.recv_ref().await.unwrap();
- r.with(|val| {
- criterion::black_box(val);
- });
- }
- })
- });
- group.bench_with_input(
- BenchmarkId::new("futures::channel::mpsc", size),
- &size,
- |b, &i| {
- let rt = runtime::Builder::new_current_thread().build().unwrap();
- b.to_async(rt).iter(|| async {
- use futures::{channel::mpsc, stream::StreamExt};
- let (mut tx, mut rx) = mpsc::channel(100);
- task::spawn(async move {
- loop {
- match tx.try_send(String::from(THE_STRING)) {
- Ok(()) => {}
- Err(e) if e.is_disconnected() => break,
- _ => task::yield_now().await,
- }
- }
- });
- for _ in 0..i {
- let val = rx.next().await.unwrap();
- criterion::black_box(&val);
- }
- })
- },
- );
- group.bench_with_input(
- BenchmarkId::new("tokio::sync::mpsc", size),
- &size,
- |b, &i| {
- let rt = runtime::Builder::new_current_thread().build().unwrap();
- b.to_async(rt).iter(|| {
-
-
-
-
-
-
-
-
-
-
-
- tokio::task::unconstrained(async {
- use tokio::sync::mpsc::{self, error::TrySendError};
- let (tx, mut rx) = mpsc::channel(100);
- task::spawn(tokio::task::unconstrained(async move {
- loop {
-
-
-
-
-
- match tx.try_reserve() {
- Ok(permit) => permit.send(String::from(THE_STRING)),
- Err(TrySendError::Closed(_)) => break,
- _ => task::yield_now().await,
- }
- }
- }));
- for _ in 0..i {
- let val = rx.recv().await.unwrap();
- criterion::black_box(&val);
- }
- })
- })
- },
- );
- group.bench_with_input(
- BenchmarkId::new("async_std::channel::bounded", size),
- &size,
- |b, &i| {
- let rt = runtime::Builder::new_current_thread().build().unwrap();
- b.to_async(rt).iter(|| async {
- use async_std::channel::{self, TrySendError};
- let (tx, rx) = channel::bounded(100);
- task::spawn(async move {
- loop {
- match tx.try_send(String::from(THE_STRING)) {
- Ok(()) => {}
- Err(TrySendError::Closed(_)) => break,
- _ => task::yield_now().await,
- }
- }
- });
- for _ in 0..i {
- let val = rx.recv().await.unwrap();
- criterion::black_box(&val);
- }
- })
- },
- );
- }
- group.finish();
- }
- fn bench_spsc_integer(c: &mut Criterion) {
- let mut group = c.benchmark_group("async/spsc_integer");
- for size in [100, 500, 1_000, 5_000, 10_000] {
- group.throughput(Throughput::Elements(size));
- group.bench_with_input(BenchmarkId::new("ThingBuf", size), &size, |b, &i| {
- let rt = runtime::Builder::new_current_thread().build().unwrap();
- b.to_async(rt).iter(|| async {
- use thingbuf::{
- mpsc::{self, TrySendError},
- ThingBuf,
- };
- let (tx, rx) = mpsc::channel(ThingBuf::new(100));
- task::spawn(async move {
- let mut i = 0;
- loop {
- match tx.try_send(i) {
- Ok(()) => {
- i += 1;
- }
- Err(TrySendError::Closed(_)) => break,
- _ => task::yield_now().await,
- }
- }
- });
- for n in 0..i {
- let val = rx.recv().await.unwrap();
- assert_eq!(n, val);
- }
- })
- });
- group.bench_with_input(
- BenchmarkId::new("futures::channel::mpsc", size),
- &size,
- |b, &i| {
- let rt = runtime::Builder::new_current_thread().build().unwrap();
- b.to_async(rt).iter(|| async {
- use futures::{channel::mpsc, stream::StreamExt};
- let (mut tx, mut rx) = mpsc::channel(100);
- task::spawn(async move {
- let mut i = 0;
- loop {
- match tx.try_send(i) {
- Ok(()) => {
- i += 1;
- }
- Err(e) if e.is_disconnected() => break,
- _ => task::yield_now().await,
- }
- }
- });
- for n in 0..i {
- let val = rx.next().await.unwrap();
- assert_eq!(n, val);
- }
- })
- },
- );
- group.bench_with_input(
- BenchmarkId::new("tokio::sync::mpsc", size),
- &size,
- |b, &i| {
- let rt = runtime::Builder::new_current_thread().build().unwrap();
- b.to_async(rt).iter(|| {
-
-
-
-
-
-
-
-
-
-
-
- tokio::task::unconstrained(async {
- use tokio::sync::mpsc::{self, error::TrySendError};
- let (tx, mut rx) = mpsc::channel(100);
- task::spawn(tokio::task::unconstrained(async move {
- let mut i = 0;
- loop {
- match tx.try_send(i) {
- Ok(()) => {
- i += 1;
- }
- Err(TrySendError::Closed(_)) => break,
- _ => task::yield_now().await,
- }
- }
- }));
- for n in 0..i {
- let val = rx.recv().await.unwrap();
- assert_eq!(n, val);
- }
- })
- })
- },
- );
- group.bench_with_input(
- BenchmarkId::new("async_std::channel::bounded", size),
- &size,
- |b, &i| {
- let rt = runtime::Builder::new_current_thread().build().unwrap();
- b.to_async(rt).iter(|| async {
- use async_std::channel::{self, TrySendError};
- let (tx, rx) = channel::bounded(100);
- task::spawn(async move {
- let mut i = 0;
- loop {
- match tx.try_send(i) {
- Ok(()) => {
- i += 1;
- }
- Err(TrySendError::Closed(_)) => break,
- _ => task::yield_now().await,
- }
- }
- });
- for n in 0..i {
- let val = rx.recv().await.unwrap();
- assert_eq!(n, val);
- }
- })
- },
- );
- }
- group.finish();
- }
- criterion_group!(benches, bench_spsc_reusable, bench_spsc_integer);
- criterion_main!(benches);
|