123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403 |
- use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion, Throughput};
- use tokio::{runtime, task};
- fn bench_spsc_try_send_reusable(c: &mut Criterion) {
- let mut group = c.benchmark_group("async/spsc/try_send_reusable");
- static THE_STRING: &str = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa\
- aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa\
- aaaaaaaaaaaaaa";
- for size in [100, 500, 1_000, 5_000, 10_000] {
- group.throughput(Throughput::Elements(size));
- group.bench_with_input(BenchmarkId::new("ThingBuf", size), &size, |b, &i| {
- let rt = runtime::Builder::new_current_thread().build().unwrap();
- b.to_async(rt).iter(|| async {
- use thingbuf::{
- mpsc::{self, TrySendError},
- ThingBuf,
- };
- let (tx, rx) = mpsc::channel(ThingBuf::<String>::new(100));
- task::spawn(async move {
- loop {
- match tx.try_send_ref() {
- Ok(mut slot) => {
- slot.clear();
- slot.push_str(THE_STRING);
- }
- Err(TrySendError::Closed(_)) => break,
- _ => task::yield_now().await,
- }
- }
- });
- for _ in 0..i {
- let val = rx.recv_ref().await.unwrap();
- criterion::black_box(&*val);
- }
- })
- });
- #[cfg(feature = "futures")]
- group.bench_with_input(
- BenchmarkId::new("futures::channel::mpsc", size),
- &size,
- |b, &i| {
- let rt = runtime::Builder::new_current_thread().build().unwrap();
- b.to_async(rt).iter(|| async {
- use futures::{channel::mpsc, stream::StreamExt};
- let (mut tx, mut rx) = mpsc::channel(100);
- task::spawn(async move {
- loop {
- match tx.try_send(String::from(THE_STRING)) {
- Ok(()) => {}
- Err(e) if e.is_disconnected() => break,
- _ => task::yield_now().await,
- }
- }
- });
- for _ in 0..i {
- let val = rx.next().await.unwrap();
- criterion::black_box(&val);
- }
- })
- },
- );
- #[cfg(feature = "tokio-sync")]
- group.bench_with_input(
- BenchmarkId::new("tokio::sync::mpsc", size),
- &size,
- |b, &i| {
- let rt = runtime::Builder::new_current_thread().build().unwrap();
- b.to_async(rt).iter(|| {
-
-
-
-
-
-
-
-
-
-
-
- tokio::task::unconstrained(async {
- use tokio::sync::mpsc::{self, error::TrySendError};
- let (tx, mut rx) = mpsc::channel(100);
- task::spawn(tokio::task::unconstrained(async move {
- loop {
-
-
-
-
-
- match tx.try_reserve() {
- Ok(permit) => permit.send(String::from(THE_STRING)),
- Err(TrySendError::Closed(_)) => break,
- _ => task::yield_now().await,
- }
- }
- }));
- for _ in 0..i {
- let val = rx.recv().await.unwrap();
- criterion::black_box(&val);
- }
- })
- })
- },
- );
- #[cfg(feature = "async-std")]
- group.bench_with_input(
- BenchmarkId::new("async_std::channel::bounded", size),
- &size,
- |b, &i| {
- let rt = runtime::Builder::new_current_thread().build().unwrap();
- b.to_async(rt).iter(|| async {
- use async_std::channel::{self, TrySendError};
- let (tx, rx) = channel::bounded(100);
- task::spawn(async move {
- loop {
- match tx.try_send(String::from(THE_STRING)) {
- Ok(()) => {}
- Err(TrySendError::Closed(_)) => break,
- _ => task::yield_now().await,
- }
- }
- });
- for _ in 0..i {
- let val = rx.recv().await.unwrap();
- criterion::black_box(&val);
- }
- })
- },
- );
- }
- group.finish();
- }
- fn bench_spsc_reusable(c: &mut Criterion) {
- let mut group = c.benchmark_group("async/spsc/reusable");
- static THE_STRING: &str = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa\
- aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa\
- aaaaaaaaaaaaaa";
- for size in [100, 500, 1_000, 5_000, 10_000] {
- group.throughput(Throughput::Elements(size));
- group.bench_with_input(BenchmarkId::new("ThingBuf", size), &size, |b, &i| {
- let rt = runtime::Builder::new_current_thread().build().unwrap();
- b.to_async(rt).iter(|| async {
- use thingbuf::{mpsc, ThingBuf};
- let (tx, rx) = mpsc::channel(ThingBuf::<String>::new(100));
- task::spawn(async move {
- while let Ok(mut slot) = tx.send_ref().await {
- slot.clear();
- slot.push_str(THE_STRING);
- }
- });
- for _ in 0..i {
- let val = rx.recv_ref().await.unwrap();
- criterion::black_box(&*val);
- }
- })
- });
- #[cfg(feature = "futures")]
- group.bench_with_input(
- BenchmarkId::new("futures::channel::mpsc", size),
- &size,
- |b, &i| {
- let rt = runtime::Builder::new_current_thread().build().unwrap();
- b.to_async(rt).iter(|| async {
- use futures::{channel::mpsc, sink::SinkExt, stream::StreamExt};
- let (mut tx, mut rx) = mpsc::channel(100);
- task::spawn(
- async move { while tx.send(String::from(THE_STRING)).await.is_ok() {} },
- );
- for _ in 0..i {
- let val = rx.next().await.unwrap();
- criterion::black_box(&val);
- }
- })
- },
- );
- #[cfg(feature = "tokio-sync")]
- group.bench_with_input(
- BenchmarkId::new("tokio::sync::mpsc", size),
- &size,
- |b, &i| {
- let rt = runtime::Builder::new_current_thread().build().unwrap();
- b.to_async(rt).iter(|| {
-
-
-
-
-
-
-
-
-
-
-
- tokio::task::unconstrained(async {
- use tokio::sync::mpsc;
- let (tx, mut rx) = mpsc::channel(100);
- task::spawn(tokio::task::unconstrained(async move {
-
-
-
-
-
- while let Ok(permit) = tx.reserve().await {
- permit.send(String::from(THE_STRING));
- }
- }));
- for _ in 0..i {
- let val = rx.recv().await.unwrap();
- criterion::black_box(&val);
- }
- })
- })
- },
- );
- #[cfg(feature = "async-std")]
- group.bench_with_input(
- BenchmarkId::new("async_std::channel::bounded", size),
- &size,
- |b, &i| {
- let rt = runtime::Builder::new_current_thread().build().unwrap();
- b.to_async(rt).iter(|| async {
- use async_std::channel;
- let (tx, rx) = channel::bounded(100);
- task::spawn(
- async move { while tx.send(String::from(THE_STRING)).await.is_ok() {} },
- );
- for _ in 0..i {
- let val = rx.recv().await.unwrap();
- criterion::black_box(&val);
- }
- })
- },
- );
- }
- group.finish();
- }
- fn bench_spsc_try_send_integer(c: &mut Criterion) {
- let mut group = c.benchmark_group("async/spsc/try_send_integer");
- for size in [100, 500, 1_000, 5_000, 10_000] {
- group.throughput(Throughput::Elements(size));
- group.bench_with_input(BenchmarkId::new("ThingBuf", size), &size, |b, &i| {
- let rt = runtime::Builder::new_current_thread().build().unwrap();
- b.to_async(rt).iter(|| async {
- use thingbuf::{
- mpsc::{self, TrySendError},
- ThingBuf,
- };
- let (tx, rx) = mpsc::channel(ThingBuf::new(100));
- task::spawn(async move {
- let mut i = 0;
- loop {
- match tx.try_send(i) {
- Ok(()) => {
- i += 1;
- }
- Err(TrySendError::Closed(_)) => break,
- _ => task::yield_now().await,
- }
- }
- });
- for n in 0..i {
- let val = rx.recv().await.unwrap();
- assert_eq!(n, val);
- }
- })
- });
- #[cfg(feature = "futures")]
- group.bench_with_input(
- BenchmarkId::new("futures::channel::mpsc", size),
- &size,
- |b, &i| {
- let rt = runtime::Builder::new_current_thread().build().unwrap();
- b.to_async(rt).iter(|| async {
- use futures::{channel::mpsc, stream::StreamExt};
- let (mut tx, mut rx) = mpsc::channel(100);
- task::spawn(async move {
- let mut i = 0;
- loop {
- match tx.try_send(i) {
- Ok(()) => {
- i += 1;
- }
- Err(e) if e.is_disconnected() => break,
- _ => task::yield_now().await,
- }
- }
- });
- for n in 0..i {
- let val = rx.next().await.unwrap();
- assert_eq!(n, val);
- }
- })
- },
- );
- #[cfg(feature = "tokio-sync")]
- group.bench_with_input(
- BenchmarkId::new("tokio::sync::mpsc", size),
- &size,
- |b, &i| {
- let rt = runtime::Builder::new_current_thread().build().unwrap();
- b.to_async(rt).iter(|| {
-
-
-
-
-
-
-
-
-
-
-
- tokio::task::unconstrained(async {
- use tokio::sync::mpsc::{self, error::TrySendError};
- let (tx, mut rx) = mpsc::channel(100);
- task::spawn(tokio::task::unconstrained(async move {
- let mut i = 0;
- loop {
- match tx.try_send(i) {
- Ok(()) => {
- i += 1;
- }
- Err(TrySendError::Closed(_)) => break,
- _ => task::yield_now().await,
- }
- }
- }));
- for n in 0..i {
- let val = rx.recv().await.unwrap();
- assert_eq!(n, val);
- }
- })
- })
- },
- );
- #[cfg(feature = "async-std")]
- group.bench_with_input(
- BenchmarkId::new("async_std::channel::bounded", size),
- &size,
- |b, &i| {
- let rt = runtime::Builder::new_current_thread().build().unwrap();
- b.to_async(rt).iter(|| async {
- use async_std::channel::{self, TrySendError};
- let (tx, rx) = channel::bounded(100);
- task::spawn(async move {
- let mut i = 0;
- loop {
- match tx.try_send(i) {
- Ok(()) => {
- i += 1;
- }
- Err(TrySendError::Closed(_)) => break,
- _ => task::yield_now().await,
- }
- }
- });
- for n in 0..i {
- let val = rx.recv().await.unwrap();
- assert_eq!(n, val);
- }
- })
- },
- );
- }
- group.finish();
- }
- criterion_group!(
- benches,
- bench_spsc_try_send_reusable,
- bench_spsc_try_send_integer,
- bench_spsc_reusable,
- );
- criterion_main!(benches);
|