123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398 |
- use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion, Throughput};
- use tokio::{runtime, task};
- /// This benchmark simulates sending a bunch of strings over a channel. It's
- /// intended to simulate the sort of workload that a `thingbuf` is intended
- /// for, where the type of element in the buffer is expensive to allocate,
- /// copy, or drop, but they can be re-used in place without
- /// allocating/deallocating.
- ///
- /// So, this may not be strictly representative of performance in the case of,
- /// say, sending a bunch of integers over the channel; instead it simulates
- /// the kind of scenario that `thingbuf` is optimized for.
- fn bench_spsc_try_send_reusable(c: &mut Criterion) {
- let mut group = c.benchmark_group("async/spsc/try_send_reusable");
- static THE_STRING: &str = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa\
- aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa\
- aaaaaaaaaaaaaa";
- for size in [100, 500, 1_000, 5_000, 10_000] {
- group.throughput(Throughput::Elements(size));
- group.bench_with_input(BenchmarkId::new("ThingBuf", size), &size, |b, &i| {
- let rt = runtime::Builder::new_current_thread().build().unwrap();
- b.to_async(rt).iter(|| async {
- use thingbuf::mpsc::{self, errors::TrySendError};
- let (tx, rx) = mpsc::channel::<String>(100);
- task::spawn(async move {
- loop {
- match tx.try_send_ref() {
- Ok(mut slot) => {
- slot.clear();
- slot.push_str(THE_STRING);
- }
- Err(TrySendError::Closed(_)) => break,
- _ => task::yield_now().await,
- }
- }
- });
- for _ in 0..i {
- let val = rx.recv_ref().await.unwrap();
- criterion::black_box(&*val);
- }
- })
- });
- #[cfg(feature = "futures")]
- group.bench_with_input(
- BenchmarkId::new("futures::channel::mpsc", size),
- &size,
- |b, &i| {
- let rt = runtime::Builder::new_current_thread().build().unwrap();
- b.to_async(rt).iter(|| async {
- use futures::{channel::mpsc, stream::StreamExt};
- let (mut tx, mut rx) = mpsc::channel(100);
- task::spawn(async move {
- loop {
- match tx.try_send(String::from(THE_STRING)) {
- Ok(()) => {}
- Err(e) if e.is_disconnected() => break,
- _ => task::yield_now().await,
- }
- }
- });
- for _ in 0..i {
- let val = rx.next().await.unwrap();
- criterion::black_box(&val);
- }
- })
- },
- );
- #[cfg(feature = "tokio-sync")]
- group.bench_with_input(
- BenchmarkId::new("tokio::sync::mpsc", size),
- &size,
- |b, &i| {
- let rt = runtime::Builder::new_current_thread().build().unwrap();
- b.to_async(rt).iter(|| {
- // turn off Tokio's automatic cooperative yielding for this
- // benchmark. in code with a large number of concurrent
- // tasks, this feature makes the MPSC channel (and other
- // Tokio synchronization primitives) better "team players"
- // than other implementations, since it prevents them from
- // using too much scheduler time.
- //
- // in this benchmark, though, there *are* no other tasks
- // running, so automatic yielding just means we spend more
- // time ping-ponging through the scheduler than every other
- // implementation.
- tokio::task::unconstrained(async {
- use tokio::sync::mpsc::{self, error::TrySendError};
- let (tx, mut rx) = mpsc::channel(100);
- task::spawn(tokio::task::unconstrained(async move {
- loop {
- // this actually brings Tokio's MPSC closer to what
- // `ThingBuf` can do than all the other impls --- we
- // only allocate if we _were_ able to reserve send
- // capacity. but, we will still allocate and
- // deallocate a string for every message...
- match tx.try_reserve() {
- Ok(permit) => permit.send(String::from(THE_STRING)),
- Err(TrySendError::Closed(_)) => break,
- _ => task::yield_now().await,
- }
- }
- }));
- for _ in 0..i {
- let val = rx.recv().await.unwrap();
- criterion::black_box(&val);
- }
- })
- })
- },
- );
- #[cfg(feature = "async-std")]
- group.bench_with_input(
- BenchmarkId::new("async_std::channel::bounded", size),
- &size,
- |b, &i| {
- let rt = runtime::Builder::new_current_thread().build().unwrap();
- b.to_async(rt).iter(|| async {
- use async_std::channel::{self, TrySendError};
- let (tx, rx) = channel::bounded(100);
- task::spawn(async move {
- loop {
- match tx.try_send(String::from(THE_STRING)) {
- Ok(()) => {}
- Err(TrySendError::Closed(_)) => break,
- _ => task::yield_now().await,
- }
- }
- });
- for _ in 0..i {
- let val = rx.recv().await.unwrap();
- criterion::black_box(&val);
- }
- })
- },
- );
- }
- group.finish();
- }
- fn bench_spsc_reusable(c: &mut Criterion) {
- let mut group = c.benchmark_group("async/spsc/reusable");
- static THE_STRING: &str = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa\
- aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa\
- aaaaaaaaaaaaaa";
- for size in [100, 500, 1_000, 5_000, 10_000] {
- group.throughput(Throughput::Elements(size));
- group.bench_with_input(BenchmarkId::new("ThingBuf", size), &size, |b, &i| {
- let rt = runtime::Builder::new_current_thread().build().unwrap();
- b.to_async(rt).iter(|| async {
- use thingbuf::mpsc;
- let (tx, rx) = mpsc::channel::<String>(100);
- task::spawn(async move {
- while let Ok(mut slot) = tx.send_ref().await {
- slot.clear();
- slot.push_str(THE_STRING);
- }
- });
- for _ in 0..i {
- let val = rx.recv_ref().await.unwrap();
- criterion::black_box(&*val);
- }
- })
- });
- #[cfg(feature = "futures")]
- group.bench_with_input(
- BenchmarkId::new("futures::channel::mpsc", size),
- &size,
- |b, &i| {
- let rt = runtime::Builder::new_current_thread().build().unwrap();
- b.to_async(rt).iter(|| async {
- use futures::{channel::mpsc, sink::SinkExt, stream::StreamExt};
- let (mut tx, mut rx) = mpsc::channel(100);
- task::spawn(
- async move { while tx.send(String::from(THE_STRING)).await.is_ok() {} },
- );
- for _ in 0..i {
- let val = rx.next().await.unwrap();
- criterion::black_box(&val);
- }
- })
- },
- );
- #[cfg(feature = "tokio-sync")]
- group.bench_with_input(
- BenchmarkId::new("tokio::sync::mpsc", size),
- &size,
- |b, &i| {
- let rt = runtime::Builder::new_current_thread().build().unwrap();
- b.to_async(rt).iter(|| {
- // turn off Tokio's automatic cooperative yielding for this
- // benchmark. in code with a large number of concurrent
- // tasks, this feature makes the MPSC channel (and other
- // Tokio synchronization primitives) better "team players"
- // than other implementations, since it prevents them from
- // using too much scheduler time.
- //
- // in this benchmark, though, there *are* no other tasks
- // running, so automatic yielding just means we spend more
- // time ping-ponging through the scheduler than every other
- // implementation.
- tokio::task::unconstrained(async {
- use tokio::sync::mpsc;
- let (tx, mut rx) = mpsc::channel(100);
- task::spawn(tokio::task::unconstrained(async move {
- // this actually brings Tokio's MPSC closer to what
- // `ThingBuf` can do than all the other impls --- we
- // only allocate if we _were_ able to reserve send
- // capacity. but, we will still allocate and
- // deallocate a string for every message...
- while let Ok(permit) = tx.reserve().await {
- permit.send(String::from(THE_STRING));
- }
- }));
- for _ in 0..i {
- let val = rx.recv().await.unwrap();
- criterion::black_box(&val);
- }
- })
- })
- },
- );
- #[cfg(feature = "async-std")]
- group.bench_with_input(
- BenchmarkId::new("async_std::channel::bounded", size),
- &size,
- |b, &i| {
- let rt = runtime::Builder::new_current_thread().build().unwrap();
- b.to_async(rt).iter(|| async {
- use async_std::channel;
- let (tx, rx) = channel::bounded(100);
- task::spawn(
- async move { while tx.send(String::from(THE_STRING)).await.is_ok() {} },
- );
- for _ in 0..i {
- let val = rx.recv().await.unwrap();
- criterion::black_box(&val);
- }
- })
- },
- );
- }
- group.finish();
- }
- /// The same benchmark, but with integers. Messages are not heap allocated, so
- /// non-thingbuf channel impls are not burdened by allocator churn for messages.
- fn bench_spsc_try_send_integer(c: &mut Criterion) {
- let mut group = c.benchmark_group("async/spsc/try_send_integer");
- for size in [100, 500, 1_000, 5_000, 10_000] {
- group.throughput(Throughput::Elements(size));
- group.bench_with_input(BenchmarkId::new("ThingBuf", size), &size, |b, &i| {
- let rt = runtime::Builder::new_current_thread().build().unwrap();
- b.to_async(rt).iter(|| async {
- use thingbuf::mpsc::{self, errors::TrySendError};
- let (tx, rx) = mpsc::channel(100);
- task::spawn(async move {
- let mut i = 0;
- loop {
- match tx.try_send(i) {
- Ok(()) => {
- i += 1;
- }
- Err(TrySendError::Closed(_)) => break,
- _ => task::yield_now().await,
- }
- }
- });
- for n in 0..i {
- let val = rx.recv().await.unwrap();
- assert_eq!(n, val);
- }
- })
- });
- #[cfg(feature = "futures")]
- group.bench_with_input(
- BenchmarkId::new("futures::channel::mpsc", size),
- &size,
- |b, &i| {
- let rt = runtime::Builder::new_current_thread().build().unwrap();
- b.to_async(rt).iter(|| async {
- use futures::{channel::mpsc, stream::StreamExt};
- let (mut tx, mut rx) = mpsc::channel(100);
- task::spawn(async move {
- let mut i = 0;
- loop {
- match tx.try_send(i) {
- Ok(()) => {
- i += 1;
- }
- Err(e) if e.is_disconnected() => break,
- _ => task::yield_now().await,
- }
- }
- });
- for n in 0..i {
- let val = rx.next().await.unwrap();
- assert_eq!(n, val);
- }
- })
- },
- );
- #[cfg(feature = "tokio-sync")]
- group.bench_with_input(
- BenchmarkId::new("tokio::sync::mpsc", size),
- &size,
- |b, &i| {
- let rt = runtime::Builder::new_current_thread().build().unwrap();
- b.to_async(rt).iter(|| {
- // turn off Tokio's automatic cooperative yielding for this
- // benchmark. in code with a large number of concurrent
- // tasks, this feature makes the MPSC channel (and other
- // Tokio synchronization primitives) better "team players"
- // than other implementations, since it prevents them from
- // using too much scheduler time.
- //
- // in this benchmark, though, there *are* no other tasks
- // running, so automatic yielding just means we spend more
- // time ping-ponging through the scheduler than every other
- // implementation.
- tokio::task::unconstrained(async {
- use tokio::sync::mpsc::{self, error::TrySendError};
- let (tx, mut rx) = mpsc::channel(100);
- task::spawn(tokio::task::unconstrained(async move {
- let mut i = 0;
- loop {
- match tx.try_send(i) {
- Ok(()) => {
- i += 1;
- }
- Err(TrySendError::Closed(_)) => break,
- _ => task::yield_now().await,
- }
- }
- }));
- for n in 0..i {
- let val = rx.recv().await.unwrap();
- assert_eq!(n, val);
- }
- })
- })
- },
- );
- #[cfg(feature = "async-std")]
- group.bench_with_input(
- BenchmarkId::new("async_std::channel::bounded", size),
- &size,
- |b, &i| {
- let rt = runtime::Builder::new_current_thread().build().unwrap();
- b.to_async(rt).iter(|| async {
- use async_std::channel::{self, TrySendError};
- let (tx, rx) = channel::bounded(100);
- task::spawn(async move {
- let mut i = 0;
- loop {
- match tx.try_send(i) {
- Ok(()) => {
- i += 1;
- }
- Err(TrySendError::Closed(_)) => break,
- _ => task::yield_now().await,
- }
- }
- });
- for n in 0..i {
- let val = rx.recv().await.unwrap();
- assert_eq!(n, val);
- }
- })
- },
- );
- }
- group.finish();
- }
- criterion_group!(
- benches,
- bench_spsc_try_send_reusable,
- bench_spsc_try_send_integer,
- bench_spsc_reusable,
- );
- criterion_main!(benches);
|