Browse Source

test(bench): benchmarks for waiting/blocking send (#15)

* test(bench): update async-mpsc benches
* split benches
* fix accidental mixing of current/multi thread rts
* test(bench): allow disabling comparison benches

Signed-off-by: Eliza Weisman <eliza@buoyant.io>
Eliza Weisman 3 years ago
parent
commit
c794c06c85
5 changed files with 715 additions and 179 deletions
  1. 16 7
      bench/Cargo.toml
  2. 188 169
      bench/benches/async_mpsc.rs
  3. 413 0
      bench/benches/async_spsc.rs
  4. 95 3
      bench/benches/sync_spsc.rs
  5. 3 0
      src/lib.rs

+ 16 - 7
bench/Cargo.toml

@@ -6,22 +6,31 @@ publish = false
 
 # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
 
-[dependencies]
+[features]
+# These feature flags can be disabled if we don't want to run comparison
+# benchmarks, such as when just comparing two `thingbuf` versions.
+comparisons = ["crossbeam", "async-std", "futures", "tokio-sync", "std-sync"]
+tokio-sync = ["tokio/sync"]
+std-sync = []
 
-[dev-dependencies]
+[dependencies]
 thingbuf = { path = ".." }
 criterion = { version = "0.3.5", features = ["async_tokio"] }
 
 # for comparison benchmarks
-tokio = { version = "1.14.0", features = ["rt", "rt-multi-thread", "sync"] }
-crossbeam = "0.8.1"
-async-std = "1"
-futures = "0.3"
+tokio = { version = "1.14.0", features = ["rt", "rt-multi-thread"] }
+crossbeam = { version = "0.8.1", optional = true }
+async-std = { version = "1", optional = true }
+futures = { version = "0.3", optional = true }
 
 [[bench]]
-name = "sync_mpsc"
+name = "sync_spsc"
 harness = false
 
 [[bench]]
 name = "async_mpsc"
+harness = false
+
+[[bench]]
+name = "async_spsc"
 harness = false

+ 188 - 169
bench/benches/async_mpsc.rs

@@ -10,61 +10,66 @@ use tokio::{runtime, task};
 /// So, this may not be strictly representative of performance in the case of,
 /// say, sending a bunch of integers over the channel; instead it simulates
 /// the kind of scenario that `thingbuf` is optimized for.
-fn bench_spsc_reusable(c: &mut Criterion) {
-    let mut group = c.benchmark_group("async/spsc_reusable");
+fn bench_mpsc_reusable(c: &mut Criterion) {
+    let mut group = c.benchmark_group("async/mpsc_reusable");
     static THE_STRING: &str = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa\
 aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa\
 aaaaaaaaaaaaaa";
 
-    for size in [100, 500, 1_000, 5_000, 10_000] {
-        group.throughput(Throughput::Elements(size));
-        group.bench_with_input(BenchmarkId::new("ThingBuf", size), &size, |b, &i| {
-            let rt = runtime::Builder::new_current_thread().build().unwrap();
-            b.to_async(rt).iter(|| async {
-                use thingbuf::{
-                    mpsc::{self, TrySendError},
-                    ThingBuf,
-                };
-                let (tx, rx) = mpsc::channel(ThingBuf::new(100));
-                task::spawn(async move {
-                    loop {
-                        match tx.try_send_ref() {
-                            Ok(mut r) => r.with_mut(|s: &mut String| {
-                                s.clear();
-                                s.push_str(THE_STRING)
-                            }),
-                            Err(TrySendError::Closed(_)) => break,
-                            _ => task::yield_now().await,
-                        }
+    const SIZE: u64 = 100;
+    group.throughput(Throughput::Elements(SIZE));
+
+    for senders in [10, 50, 100] {
+        group.bench_with_input(
+            BenchmarkId::new("ThingBuf", senders),
+            &senders,
+            |b, &senders| {
+                b.to_async(rt()).iter(|| async {
+                    use thingbuf::{mpsc, ThingBuf};
+                    let (tx, rx) = mpsc::channel(ThingBuf::<String>::new(100));
+                    for _ in 0..senders {
+                        let tx = tx.clone();
+                        task::spawn(async move {
+                            loop {
+                                match tx.send_ref().await {
+                                    Ok(mut slot) => {
+                                        slot.clear();
+                                        slot.push_str(THE_STRING);
+                                    }
+                                    Err(_) => break,
+                                }
+                            }
+                        });
                     }
-                });
-                for _ in 0..i {
-                    let r = rx.recv_ref().await.unwrap();
-                    r.with(|val| {
-                        criterion::black_box(val);
-                    });
-                }
-            })
-        });
 
+                    for _ in 0..SIZE {
+                        let val = rx.recv_ref().await.unwrap();
+                        criterion::black_box(&*val);
+                    }
+                })
+            },
+        );
+
+        #[cfg(feature = "futures")]
         group.bench_with_input(
-            BenchmarkId::new("futures::channel::mpsc", size),
-            &size,
-            |b, &i| {
-                let rt = runtime::Builder::new_current_thread().build().unwrap();
-                b.to_async(rt).iter(|| async {
-                    use futures::{channel::mpsc, stream::StreamExt};
-                    let (mut tx, mut rx) = mpsc::channel(100);
-                    task::spawn(async move {
-                        loop {
-                            match tx.try_send(String::from(THE_STRING)) {
-                                Ok(()) => {}
-                                Err(e) if e.is_disconnected() => break,
-                                _ => task::yield_now().await,
+            BenchmarkId::new("futures::channel::mpsc", senders),
+            &senders,
+            |b, &senders| {
+                b.to_async(rt()).iter(|| async {
+                    use futures::{channel::mpsc, sink::SinkExt, stream::StreamExt};
+                    let (tx, mut rx) = mpsc::channel(100);
+                    for _ in 0..senders {
+                        let mut tx = tx.clone();
+                        task::spawn(async move {
+                            loop {
+                                match tx.send(String::from(THE_STRING)).await {
+                                    Ok(_) => {}
+                                    Err(_) => break,
+                                }
                             }
-                        }
-                    });
-                    for _ in 0..i {
+                        });
+                    }
+                    for _ in 0..SIZE {
                         let val = rx.next().await.unwrap();
                         criterion::black_box(&val);
                     }
@@ -72,12 +77,12 @@ aaaaaaaaaaaaaa";
             },
         );
 
+        #[cfg(feature = "tokio-sync")]
         group.bench_with_input(
-            BenchmarkId::new("tokio::sync::mpsc", size),
-            &size,
-            |b, &i| {
-                let rt = runtime::Builder::new_current_thread().build().unwrap();
-                b.to_async(rt).iter(|| {
+            BenchmarkId::new("tokio::sync::mpsc", senders),
+            &senders,
+            |b, &senders| {
+                b.to_async(rt()).iter(|| {
                     // turn off Tokio's automatic cooperative yielding for this
                     // benchmark. in code with a large number of concurrent
                     // tasks, this feature makes the MPSC channel (and other
@@ -90,23 +95,26 @@ aaaaaaaaaaaaaa";
                     // time ping-ponging through the scheduler than every other
                     // implementation.
                     tokio::task::unconstrained(async {
-                        use tokio::sync::mpsc::{self, error::TrySendError};
+                        use tokio::sync::mpsc;
                         let (tx, mut rx) = mpsc::channel(100);
-                        task::spawn(tokio::task::unconstrained(async move {
-                            loop {
-                                // this actually brings Tokio's MPSC closer to what
-                                // `ThingBuf` can do than all the other impls --- we
-                                // only allocate if we _were_ able to reserve send
-                                // capacity. but, we will still allocate and
-                                // deallocate a string for every message...
-                                match tx.try_reserve() {
-                                    Ok(permit) => permit.send(String::from(THE_STRING)),
-                                    Err(TrySendError::Closed(_)) => break,
-                                    _ => task::yield_now().await,
+
+                        for _ in 0..senders {
+                            let tx = tx.clone();
+                            task::spawn(tokio::task::unconstrained(async move {
+                                loop {
+                                    // this actually brings Tokio's MPSC closer to what
+                                    // `ThingBuf` can do than all the other impls --- we
+                                    // only allocate if we _were_ able to reserve send
+                                    // capacity. but, we will still allocate and
+                                    // deallocate a string for every message...
+                                    match tx.reserve().await {
+                                        Ok(permit) => permit.send(String::from(THE_STRING)),
+                                        Err(_) => break,
+                                    }
                                 }
-                            }
-                        }));
-                        for _ in 0..i {
+                            }));
+                        }
+                        for _ in 0..SIZE {
                             let val = rx.recv().await.unwrap();
                             criterion::black_box(&val);
                         }
@@ -115,24 +123,27 @@ aaaaaaaaaaaaaa";
             },
         );
 
+        #[cfg(feature = "async-std")]
         group.bench_with_input(
-            BenchmarkId::new("async_std::channel::bounded", size),
-            &size,
-            |b, &i| {
-                let rt = runtime::Builder::new_current_thread().build().unwrap();
-                b.to_async(rt).iter(|| async {
-                    use async_std::channel::{self, TrySendError};
+            BenchmarkId::new("async_std::channel::bounded", senders),
+            &senders,
+            |b, &senders| {
+                b.to_async(rt()).iter(|| async {
+                    use async_std::channel;
                     let (tx, rx) = channel::bounded(100);
-                    task::spawn(async move {
-                        loop {
-                            match tx.try_send(String::from(THE_STRING)) {
-                                Ok(()) => {}
-                                Err(TrySendError::Closed(_)) => break,
-                                _ => task::yield_now().await,
+
+                    for _ in 0..senders {
+                        let tx = tx.clone();
+                        task::spawn(async move {
+                            loop {
+                                match tx.send(String::from(THE_STRING)).await {
+                                    Ok(_) => {}
+                                    Err(_) => break,
+                                }
                             }
-                        }
-                    });
-                    for _ in 0..i {
+                        });
+                    }
+                    for _ in 0..SIZE {
                         let val = rx.recv().await.unwrap();
                         criterion::black_box(&val);
                     }
@@ -144,74 +155,73 @@ aaaaaaaaaaaaaa";
     group.finish();
 }
 
-/// The same benchmark, but with integers. Messages are not heap allocated, so
-/// non-thingbuf channel impls are not burdened by allocator churn for messages.
-fn bench_spsc_integer(c: &mut Criterion) {
-    let mut group = c.benchmark_group("async/spsc_integer");
-
-    for size in [100, 500, 1_000, 5_000, 10_000] {
-        group.throughput(Throughput::Elements(size));
-        group.bench_with_input(BenchmarkId::new("ThingBuf", size), &size, |b, &i| {
-            let rt = runtime::Builder::new_current_thread().build().unwrap();
-            b.to_async(rt).iter(|| async {
-                use thingbuf::{
-                    mpsc::{self, TrySendError},
-                    ThingBuf,
-                };
-                let (tx, rx) = mpsc::channel(ThingBuf::new(100));
-                task::spawn(async move {
-                    let mut i = 0;
-                    loop {
-                        match tx.try_send(i) {
-                            Ok(()) => {
-                                i += 1;
+fn bench_mpsc_integer(c: &mut Criterion) {
+    let mut group = c.benchmark_group("async/mpsc_integer");
+    const SIZE: u64 = 1_000;
+    for senders in [10, 50, 100] {
+        group.throughput(Throughput::Elements(SIZE));
+        group.bench_with_input(
+            BenchmarkId::new("ThingBuf", senders),
+            &senders,
+            |b, &senders| {
+                b.to_async(rt()).iter(|| async {
+                    use thingbuf::{mpsc, ThingBuf};
+                    let (tx, rx) = mpsc::channel(ThingBuf::new(100));
+                    for i in 0..senders {
+                        let tx = tx.clone();
+                        task::spawn(async move {
+                            loop {
+                                match tx.send_ref().await {
+                                    Ok(mut slot) => {
+                                        *slot = i;
+                                    }
+                                    Err(_) => break,
+                                }
                             }
-                            Err(TrySendError::Closed(_)) => break,
-                            _ => task::yield_now().await,
-                        }
+                        });
                     }
-                });
-                for n in 0..i {
-                    let val = rx.recv().await.unwrap();
-                    assert_eq!(n, val);
-                }
-            })
-        });
 
+                    for _ in 0..SIZE {
+                        let val = rx.recv_ref().await.unwrap();
+                        criterion::black_box(&*val);
+                    }
+                })
+            },
+        );
+
+        #[cfg(feature = "futures")]
         group.bench_with_input(
-            BenchmarkId::new("futures::channel::mpsc", size),
-            &size,
-            |b, &i| {
-                let rt = runtime::Builder::new_current_thread().build().unwrap();
-                b.to_async(rt).iter(|| async {
-                    use futures::{channel::mpsc, stream::StreamExt};
-                    let (mut tx, mut rx) = mpsc::channel(100);
-                    task::spawn(async move {
-                        let mut i = 0;
-                        loop {
-                            match tx.try_send(i) {
-                                Ok(()) => {
-                                    i += 1;
+            BenchmarkId::new("futures::channel::mpsc", senders),
+            &senders,
+            |b, &senders| {
+                b.to_async(rt()).iter(|| async {
+                    use futures::{channel::mpsc, sink::SinkExt, stream::StreamExt};
+                    let (tx, mut rx) = mpsc::channel(100);
+                    for i in 0..senders {
+                        let mut tx = tx.clone();
+                        task::spawn(async move {
+                            loop {
+                                match tx.send(i).await {
+                                    Ok(_) => {}
+                                    Err(_) => break,
                                 }
-                                Err(e) if e.is_disconnected() => break,
-                                _ => task::yield_now().await,
                             }
-                        }
-                    });
-                    for n in 0..i {
+                        });
+                    }
+                    for _ in 0..SIZE {
                         let val = rx.next().await.unwrap();
-                        assert_eq!(n, val);
+                        criterion::black_box(&val);
                     }
                 })
             },
         );
 
+        #[cfg(feature = "tokio-sync")]
         group.bench_with_input(
-            BenchmarkId::new("tokio::sync::mpsc", size),
-            &size,
-            |b, &i| {
-                let rt = runtime::Builder::new_current_thread().build().unwrap();
-                b.to_async(rt).iter(|| {
+            BenchmarkId::new("tokio::sync::mpsc", senders),
+            &senders,
+            |b, &senders| {
+                b.to_async(rt()).iter(|| {
                     // turn off Tokio's automatic cooperative yielding for this
                     // benchmark. in code with a large number of concurrent
                     // tasks, this feature makes the MPSC channel (and other
@@ -224,52 +234,57 @@ fn bench_spsc_integer(c: &mut Criterion) {
                     // time ping-ponging through the scheduler than every other
                     // implementation.
                     tokio::task::unconstrained(async {
-                        use tokio::sync::mpsc::{self, error::TrySendError};
+                        use tokio::sync::mpsc;
                         let (tx, mut rx) = mpsc::channel(100);
-                        task::spawn(tokio::task::unconstrained(async move {
-                            let mut i = 0;
-                            loop {
-                                match tx.try_send(i) {
-                                    Ok(()) => {
-                                        i += 1;
+
+                        for i in 0..senders {
+                            let tx = tx.clone();
+                            task::spawn(tokio::task::unconstrained(async move {
+                                loop {
+                                    // this actually brings Tokio's MPSC closer to what
+                                    // `ThingBuf` can do than all the other impls --- we
+                                    // only allocate if we _were_ able to reserve send
+                                    // capacity. but, we will still allocate and
+                                    // deallocate a string for every message...
+                                    match tx.send(i).await {
+                                        Ok(_) => {}
+                                        Err(_) => break,
                                     }
-                                    Err(TrySendError::Closed(_)) => break,
-                                    _ => task::yield_now().await,
                                 }
-                            }
-                        }));
-                        for n in 0..i {
+                            }));
+                        }
+                        for _ in 0..SIZE {
                             let val = rx.recv().await.unwrap();
-                            assert_eq!(n, val);
+                            criterion::black_box(&val);
                         }
                     })
                 })
             },
         );
 
+        #[cfg(feature = "async-std")]
         group.bench_with_input(
-            BenchmarkId::new("async_std::channel::bounded", size),
-            &size,
-            |b, &i| {
-                let rt = runtime::Builder::new_current_thread().build().unwrap();
-                b.to_async(rt).iter(|| async {
-                    use async_std::channel::{self, TrySendError};
+            BenchmarkId::new("async_std::channel::bounded", senders),
+            &senders,
+            |b, &senders| {
+                b.to_async(rt()).iter(|| async {
+                    use async_std::channel;
                     let (tx, rx) = channel::bounded(100);
-                    task::spawn(async move {
-                        let mut i = 0;
-                        loop {
-                            match tx.try_send(i) {
-                                Ok(()) => {
-                                    i += 1;
+
+                    for i in 0..senders {
+                        let tx = tx.clone();
+                        task::spawn(async move {
+                            loop {
+                                match tx.send(i).await {
+                                    Ok(_) => {}
+                                    Err(_) => break,
                                 }
-                                Err(TrySendError::Closed(_)) => break,
-                                _ => task::yield_now().await,
                             }
-                        }
-                    });
-                    for n in 0..i {
+                        });
+                    }
+                    for _ in 0..SIZE {
                         let val = rx.recv().await.unwrap();
-                        assert_eq!(n, val);
+                        criterion::black_box(&val);
                     }
                 })
             },
@@ -279,5 +294,9 @@ fn bench_spsc_integer(c: &mut Criterion) {
     group.finish();
 }
 
-criterion_group!(benches, bench_spsc_reusable, bench_spsc_integer);
+fn rt() -> tokio::runtime::Runtime {
+    runtime::Builder::new_multi_thread().build().unwrap()
+}
+
+criterion_group!(benches, bench_mpsc_reusable, bench_mpsc_integer,);
 criterion_main!(benches);

+ 413 - 0
bench/benches/async_spsc.rs

@@ -0,0 +1,413 @@
+use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion, Throughput};
+use tokio::{runtime, task};
+
+/// This benchmark simulates sending a bunch of strings over a channel. It's
+/// intended to simulate the sort of workload that a `thingbuf` is intended
+/// for, where the type of element in the buffer is expensive to allocate,
+/// copy, or drop, but they can be re-used in place without
+/// allocating/deallocating.
+///
+/// So, this may not be strictly representative of performance in the case of,
+/// say, sending a bunch of integers over the channel; instead it simulates
+/// the kind of scenario that `thingbuf` is optimized for.
+fn bench_spsc_try_send_reusable(c: &mut Criterion) {
+    let mut group = c.benchmark_group("async/spsc/try_send_reusable");
+    static THE_STRING: &str = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa\
+aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa\
+aaaaaaaaaaaaaa";
+
+    for size in [100, 500, 1_000, 5_000, 10_000] {
+        group.throughput(Throughput::Elements(size));
+        group.bench_with_input(BenchmarkId::new("ThingBuf", size), &size, |b, &i| {
+            let rt = runtime::Builder::new_current_thread().build().unwrap();
+            b.to_async(rt).iter(|| async {
+                use thingbuf::{
+                    mpsc::{self, TrySendError},
+                    ThingBuf,
+                };
+                let (tx, rx) = mpsc::channel(ThingBuf::<String>::new(100));
+                task::spawn(async move {
+                    loop {
+                        match tx.try_send_ref() {
+                            Ok(mut slot) => {
+                                slot.clear();
+                                slot.push_str(THE_STRING);
+                            }
+                            Err(TrySendError::Closed(_)) => break,
+                            _ => task::yield_now().await,
+                        }
+                    }
+                });
+                for _ in 0..i {
+                    let val = rx.recv_ref().await.unwrap();
+                    criterion::black_box(&*val);
+                }
+            })
+        });
+
+        group.bench_with_input(
+            BenchmarkId::new("futures::channel::mpsc", size),
+            &size,
+            |b, &i| {
+                let rt = runtime::Builder::new_current_thread().build().unwrap();
+                b.to_async(rt).iter(|| async {
+                    use futures::{channel::mpsc, stream::StreamExt};
+                    let (mut tx, mut rx) = mpsc::channel(100);
+                    task::spawn(async move {
+                        loop {
+                            match tx.try_send(String::from(THE_STRING)) {
+                                Ok(()) => {}
+                                Err(e) if e.is_disconnected() => break,
+                                _ => task::yield_now().await,
+                            }
+                        }
+                    });
+                    for _ in 0..i {
+                        let val = rx.next().await.unwrap();
+                        criterion::black_box(&val);
+                    }
+                })
+            },
+        );
+
+        group.bench_with_input(
+            BenchmarkId::new("tokio::sync::mpsc", size),
+            &size,
+            |b, &i| {
+                let rt = runtime::Builder::new_current_thread().build().unwrap();
+                b.to_async(rt).iter(|| {
+                    // turn off Tokio's automatic cooperative yielding for this
+                    // benchmark. in code with a large number of concurrent
+                    // tasks, this feature makes the MPSC channel (and other
+                    // Tokio synchronization primitives) better "team players"
+                    // than other implementations, since it prevents them from
+                    // using too much scheduler time.
+                    //
+                    // in this benchmark, though, there *are* no other tasks
+                    // running, so automatic yielding just means we spend more
+                    // time ping-ponging through the scheduler than every other
+                    // implementation.
+                    tokio::task::unconstrained(async {
+                        use tokio::sync::mpsc::{self, error::TrySendError};
+                        let (tx, mut rx) = mpsc::channel(100);
+                        task::spawn(tokio::task::unconstrained(async move {
+                            loop {
+                                // this actually brings Tokio's MPSC closer to what
+                                // `ThingBuf` can do than all the other impls --- we
+                                // only allocate if we _were_ able to reserve send
+                                // capacity. but, we will still allocate and
+                                // deallocate a string for every message...
+                                match tx.try_reserve() {
+                                    Ok(permit) => permit.send(String::from(THE_STRING)),
+                                    Err(TrySendError::Closed(_)) => break,
+                                    _ => task::yield_now().await,
+                                }
+                            }
+                        }));
+                        for _ in 0..i {
+                            let val = rx.recv().await.unwrap();
+                            criterion::black_box(&val);
+                        }
+                    })
+                })
+            },
+        );
+
+        group.bench_with_input(
+            BenchmarkId::new("async_std::channel::bounded", size),
+            &size,
+            |b, &i| {
+                let rt = runtime::Builder::new_current_thread().build().unwrap();
+                b.to_async(rt).iter(|| async {
+                    use async_std::channel::{self, TrySendError};
+                    let (tx, rx) = channel::bounded(100);
+                    task::spawn(async move {
+                        loop {
+                            match tx.try_send(String::from(THE_STRING)) {
+                                Ok(()) => {}
+                                Err(TrySendError::Closed(_)) => break,
+                                _ => task::yield_now().await,
+                            }
+                        }
+                    });
+                    for _ in 0..i {
+                        let val = rx.recv().await.unwrap();
+                        criterion::black_box(&val);
+                    }
+                })
+            },
+        );
+    }
+
+    group.finish();
+}
+
+fn bench_spsc_reusable(c: &mut Criterion) {
+    let mut group = c.benchmark_group("async/spsc/reusable");
+    static THE_STRING: &str = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa\
+aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa\
+aaaaaaaaaaaaaa";
+
+    for size in [100, 500, 1_000, 5_000, 10_000] {
+        group.throughput(Throughput::Elements(size));
+        group.bench_with_input(BenchmarkId::new("ThingBuf", size), &size, |b, &i| {
+            let rt = runtime::Builder::new_current_thread().build().unwrap();
+            b.to_async(rt).iter(|| async {
+                use thingbuf::{
+                    mpsc::{self, TrySendError},
+                    ThingBuf,
+                };
+                let (tx, rx) = mpsc::channel(ThingBuf::<String>::new(100));
+                task::spawn(async move {
+                    loop {
+                        match tx.send_ref().await {
+                            Ok(mut slot) => {
+                                slot.clear();
+                                slot.push_str(THE_STRING);
+                            }
+                            Err(_) => break,
+                        }
+                    }
+                });
+                for _ in 0..i {
+                    let val = rx.recv_ref().await.unwrap();
+                    criterion::black_box(&*val);
+                }
+            })
+        });
+
+        group.bench_with_input(
+            BenchmarkId::new("futures::channel::mpsc", size),
+            &size,
+            |b, &i| {
+                let rt = runtime::Builder::new_current_thread().build().unwrap();
+                b.to_async(rt).iter(|| async {
+                    use futures::{channel::mpsc, sink::SinkExt, stream::StreamExt};
+                    let (mut tx, mut rx) = mpsc::channel(100);
+                    task::spawn(async move {
+                        loop {
+                            match tx.send(String::from(THE_STRING)).await {
+                                Ok(_) => {}
+                                Err(_) => break,
+                            }
+                        }
+                    });
+                    for _ in 0..i {
+                        let val = rx.next().await.unwrap();
+                        criterion::black_box(&val);
+                    }
+                })
+            },
+        );
+
+        group.bench_with_input(
+            BenchmarkId::new("tokio::sync::mpsc", size),
+            &size,
+            |b, &i| {
+                let rt = runtime::Builder::new_current_thread().build().unwrap();
+                b.to_async(rt).iter(|| {
+                    // turn off Tokio's automatic cooperative yielding for this
+                    // benchmark. in code with a large number of concurrent
+                    // tasks, this feature makes the MPSC channel (and other
+                    // Tokio synchronization primitives) better "team players"
+                    // than other implementations, since it prevents them from
+                    // using too much scheduler time.
+                    //
+                    // in this benchmark, though, there *are* no other tasks
+                    // running, so automatic yielding just means we spend more
+                    // time ping-ponging through the scheduler than every other
+                    // implementation.
+                    tokio::task::unconstrained(async {
+                        use tokio::sync::mpsc::{self, error::TrySendError};
+                        let (tx, mut rx) = mpsc::channel(100);
+                        task::spawn(tokio::task::unconstrained(async move {
+                            loop {
+                                // this actually brings Tokio's MPSC closer to what
+                                // `ThingBuf` can do than all the other impls --- we
+                                // only allocate if we _were_ able to reserve send
+                                // capacity. but, we will still allocate and
+                                // deallocate a string for every message...
+                                match tx.reserve().await {
+                                    Ok(permit) => permit.send(String::from(THE_STRING)),
+                                    Err(_) => break,
+                                }
+                            }
+                        }));
+                        for _ in 0..i {
+                            let val = rx.recv().await.unwrap();
+                            criterion::black_box(&val);
+                        }
+                    })
+                })
+            },
+        );
+
+        group.bench_with_input(
+            BenchmarkId::new("async_std::channel::bounded", size),
+            &size,
+            |b, &i| {
+                let rt = runtime::Builder::new_current_thread().build().unwrap();
+                b.to_async(rt).iter(|| async {
+                    use async_std::channel;
+                    let (tx, rx) = channel::bounded(100);
+                    task::spawn(async move {
+                        loop {
+                            match tx.send(String::from(THE_STRING)).await {
+                                Ok(_) => {}
+                                Err(_) => break,
+                            }
+                        }
+                    });
+                    for _ in 0..i {
+                        let val = rx.recv().await.unwrap();
+                        criterion::black_box(&val);
+                    }
+                })
+            },
+        );
+    }
+
+    group.finish();
+}
+
+/// The same benchmark, but with integers. Messages are not heap allocated, so
+/// non-thingbuf channel impls are not burdened by allocator churn for messages.
+fn bench_spsc_try_send_integer(c: &mut Criterion) {
+    let mut group = c.benchmark_group("async/spsc/try_send_integer");
+
+    for size in [100, 500, 1_000, 5_000, 10_000] {
+        group.throughput(Throughput::Elements(size));
+        group.bench_with_input(BenchmarkId::new("ThingBuf", size), &size, |b, &i| {
+            let rt = runtime::Builder::new_current_thread().build().unwrap();
+            b.to_async(rt).iter(|| async {
+                use thingbuf::{
+                    mpsc::{self, TrySendError},
+                    ThingBuf,
+                };
+                let (tx, rx) = mpsc::channel(ThingBuf::new(100));
+                task::spawn(async move {
+                    let mut i = 0;
+                    loop {
+                        match tx.try_send(i) {
+                            Ok(()) => {
+                                i += 1;
+                            }
+                            Err(TrySendError::Closed(_)) => break,
+                            _ => task::yield_now().await,
+                        }
+                    }
+                });
+                for n in 0..i {
+                    let val = rx.recv().await.unwrap();
+                    assert_eq!(n, val);
+                }
+            })
+        });
+
+        group.bench_with_input(
+            BenchmarkId::new("futures::channel::mpsc", size),
+            &size,
+            |b, &i| {
+                let rt = runtime::Builder::new_current_thread().build().unwrap();
+                b.to_async(rt).iter(|| async {
+                    use futures::{channel::mpsc, stream::StreamExt};
+                    let (mut tx, mut rx) = mpsc::channel(100);
+                    task::spawn(async move {
+                        let mut i = 0;
+                        loop {
+                            match tx.try_send(i) {
+                                Ok(()) => {
+                                    i += 1;
+                                }
+                                Err(e) if e.is_disconnected() => break,
+                                _ => task::yield_now().await,
+                            }
+                        }
+                    });
+                    for n in 0..i {
+                        let val = rx.next().await.unwrap();
+                        assert_eq!(n, val);
+                    }
+                })
+            },
+        );
+
+        group.bench_with_input(
+            BenchmarkId::new("tokio::sync::mpsc", size),
+            &size,
+            |b, &i| {
+                let rt = runtime::Builder::new_current_thread().build().unwrap();
+                b.to_async(rt).iter(|| {
+                    // turn off Tokio's automatic cooperative yielding for this
+                    // benchmark. in code with a large number of concurrent
+                    // tasks, this feature makes the MPSC channel (and other
+                    // Tokio synchronization primitives) better "team players"
+                    // than other implementations, since it prevents them from
+                    // using too much scheduler time.
+                    //
+                    // in this benchmark, though, there *are* no other tasks
+                    // running, so automatic yielding just means we spend more
+                    // time ping-ponging through the scheduler than every other
+                    // implementation.
+                    tokio::task::unconstrained(async {
+                        use tokio::sync::mpsc::{self, error::TrySendError};
+                        let (tx, mut rx) = mpsc::channel(100);
+                        task::spawn(tokio::task::unconstrained(async move {
+                            let mut i = 0;
+                            loop {
+                                match tx.try_send(i) {
+                                    Ok(()) => {
+                                        i += 1;
+                                    }
+                                    Err(TrySendError::Closed(_)) => break,
+                                    _ => task::yield_now().await,
+                                }
+                            }
+                        }));
+                        for n in 0..i {
+                            let val = rx.recv().await.unwrap();
+                            assert_eq!(n, val);
+                        }
+                    })
+                })
+            },
+        );
+
+        group.bench_with_input(
+            BenchmarkId::new("async_std::channel::bounded", size),
+            &size,
+            |b, &i| {
+                let rt = runtime::Builder::new_current_thread().build().unwrap();
+                b.to_async(rt).iter(|| async {
+                    use async_std::channel::{self, TrySendError};
+                    let (tx, rx) = channel::bounded(100);
+                    task::spawn(async move {
+                        let mut i = 0;
+                        loop {
+                            match tx.try_send(i) {
+                                Ok(()) => {
+                                    i += 1;
+                                }
+                                Err(TrySendError::Closed(_)) => break,
+                                _ => task::yield_now().await,
+                            }
+                        }
+                    });
+                    for n in 0..i {
+                        let val = rx.recv().await.unwrap();
+                        assert_eq!(n, val);
+                    }
+                })
+            },
+        );
+    }
+
+    group.finish();
+}
+criterion_group!(
+    benches,
+    bench_spsc_try_send_reusable,
+    bench_spsc_try_send_integer,
+    bench_spsc_reusable,
+);
+criterion_main!(benches);

+ 95 - 3
bench/benches/sync_mpsc.rs → bench/benches/sync_spsc.rs

@@ -10,8 +10,8 @@ use std::thread;
 /// So, this may not be strictly representative of performance in the case of,
 /// say, sending a bunch of integers over the channel; instead it simulates
 /// the kind of scenario that `thingbuf` is optimized for.
-fn bench_spsc_reusable(c: &mut Criterion) {
-    let mut group = c.benchmark_group("sync/spsc_reusable");
+fn bench_spsc_try_send_reusable(c: &mut Criterion) {
+    let mut group = c.benchmark_group("sync/spsc/try_send_reusable");
     static THE_STRING: &str = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa\
 aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa\
 aaaaaaaaaaaaaa";
@@ -46,6 +46,7 @@ aaaaaaaaaaaaaa";
             })
         });
 
+        #[cfg(feature = "std-sync")]
         group.bench_with_input(BenchmarkId::new("std::sync::mpsc", size), &size, |b, &i| {
             b.iter(|| {
                 use std::sync::mpsc::{self, TrySendError};
@@ -66,6 +67,7 @@ aaaaaaaaaaaaaa";
             })
         });
 
+        #[cfg(feature = "crossbeam")]
         group.bench_with_input(
             BenchmarkId::new("crossbeam::channel::bounded", size),
             &size,
@@ -96,5 +98,95 @@ aaaaaaaaaaaaaa";
     group.finish();
 }
 
-criterion_group!(benches, bench_spsc_reusable);
+fn bench_spsc_blocking_send_reusable(c: &mut Criterion) {
+    let mut group = c.benchmark_group("sync/spsc/blocking_send_reusable");
+    static THE_STRING: &str = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa\
+aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa\
+aaaaaaaaaaaaaa";
+
+    for size in [100, 500, 1_000, 5_000, 10_000] {
+        group.throughput(Throughput::Elements(size));
+        group.bench_with_input(BenchmarkId::new("ThingBuf", size), &size, |b, &i| {
+            b.iter(|| {
+                use thingbuf::{
+                    mpsc::{sync, TrySendError},
+                    ThingBuf,
+                };
+                let (tx, rx) = sync::channel(ThingBuf::new(100));
+                let producer = thread::spawn(move || loop {
+                    match tx.send_ref() {
+                        Ok(mut slot) => {
+                            slot.clear();
+                            slot.push_str(THE_STRING);
+                        }
+                        Err(TrySendError::Closed(_)) => break,
+                    }
+                });
+                for _ in 0..i {
+                    let r = rx.recv_ref().unwrap();
+                    r.with(|val| {
+                        criterion::black_box(val);
+                    });
+                }
+                drop(rx);
+                producer.join().unwrap();
+            })
+        });
+
+        #[cfg(feature = "std-sync")]
+        group.bench_with_input(BenchmarkId::new("std::sync::mpsc", size), &size, |b, &i| {
+            b.iter(|| {
+                use std::sync::mpsc::{self, TrySendError};
+                let (tx, rx) = mpsc::sync_channel(100);
+                let producer = thread::spawn(move || loop {
+                    match tx.send(String::from(THE_STRING)) {
+                        Ok(()) => {}
+                        Err(_) => break,
+                    }
+                });
+                for _ in 0..i {
+                    let val = rx.recv().unwrap();
+                    criterion::black_box(&val);
+                }
+                drop(rx);
+                producer.join().unwrap();
+            })
+        });
+
+        #[cfg(feature = "crossbeam")]
+        group.bench_with_input(
+            BenchmarkId::new("crossbeam::channel::bounded", size),
+            &size,
+            |b, &i| {
+                b.iter(|| {
+                    use crossbeam::channel::{self, TrySendError};
+                    let (tx, rx) = channel::bounded(100);
+
+                    let producer = thread::spawn(move || loop {
+                        match tx.send(String::from(THE_STRING)) {
+                            Ok(()) => {}
+                            Err(TrySendError::Disconnected(_)) => break,
+                            Err(_) => break,
+                        }
+                    });
+
+                    for _ in 0..i {
+                        let val = rx.recv().unwrap();
+                        criterion::black_box(&val);
+                    }
+
+                    drop(rx);
+                    producer.join().unwrap();
+                })
+            },
+        );
+    }
+    group.finish();
+}
+
+criterion_group!(
+    benches,
+    bench_spsc_try_send_reusable,
+    bench_spsc_blocking_send_reusable
+);
 criterion_main!(benches);

+ 3 - 0
src/lib.rs

@@ -392,6 +392,9 @@ impl<T: fmt::Write> fmt::Write for Ref<'_, T> {
     }
 }
 
+unsafe impl<T: Send> Send for Ref<'_, T> {}
+unsafe impl<T: Send> Sync for Ref<'_, T> {}
+
 // === impl Slot ===
 
 const EMPTY_STATE: usize = usize::MAX;