Browse Source

test(bench): cleanup benches, fix compiler errors

Signed-off-by: Eliza Weisman <eliza@buoyant.io>
Eliza Weisman 3 years ago
parent
commit
8c2e4a5384
4 changed files with 59 additions and 129 deletions
  1. 1 1
      bench/Cargo.toml
  2. 17 64
      bench/benches/async_mpsc.rs
  3. 24 34
      bench/benches/async_spsc.rs
  4. 17 30
      bench/benches/sync_spsc.rs

+ 1 - 1
bench/Cargo.toml

@@ -18,7 +18,7 @@ thingbuf = { path = ".." }
 criterion = { version = "0.3.5", features = ["async_tokio"] }
 
 # for comparison benchmarks
-tokio = { version = "1.14.0", features = ["rt", "rt-multi-thread"] }
+tokio = { version = "1.14.0", features = ["rt", "rt-multi-thread", "sync"] }
 crossbeam = { version = "0.8.1", optional = true }
 async-std = { version = "1", optional = true }
 futures = { version = "0.3", optional = true }

+ 17 - 64
bench/benches/async_mpsc.rs

@@ -30,14 +30,9 @@ aaaaaaaaaaaaaa";
                     for _ in 0..senders {
                         let tx = tx.clone();
                         task::spawn(async move {
-                            loop {
-                                match tx.send_ref().await {
-                                    Ok(mut slot) => {
-                                        slot.clear();
-                                        slot.push_str(THE_STRING);
-                                    }
-                                    Err(_) => break,
-                                }
+                            while let Ok(mut slot) = tx.send_ref().await {
+                                slot.clear();
+                                slot.push_str(THE_STRING);
                             }
                         });
                     }
@@ -61,12 +56,7 @@ aaaaaaaaaaaaaa";
                     for _ in 0..senders {
                         let mut tx = tx.clone();
                         task::spawn(async move {
-                            loop {
-                                match tx.send(String::from(THE_STRING)).await {
-                                    Ok(_) => {}
-                                    Err(_) => break,
-                                }
-                            }
+                            while let Ok(_) = tx.send(String::from(THE_STRING)).await {}
                         });
                     }
                     for _ in 0..SIZE {
@@ -101,16 +91,13 @@ aaaaaaaaaaaaaa";
                         for _ in 0..senders {
                             let tx = tx.clone();
                             task::spawn(tokio::task::unconstrained(async move {
-                                loop {
-                                    // this actually brings Tokio's MPSC closer to what
-                                    // `ThingBuf` can do than all the other impls --- we
-                                    // only allocate if we _were_ able to reserve send
-                                    // capacity. but, we will still allocate and
-                                    // deallocate a string for every message...
-                                    match tx.reserve().await {
-                                        Ok(permit) => permit.send(String::from(THE_STRING)),
-                                        Err(_) => break,
-                                    }
+                                // this actually brings Tokio's MPSC closer to what
+                                // `ThingBuf` can do than all the other impls --- we
+                                // only allocate if we _were_ able to reserve send
+                                // capacity. but, we will still allocate and
+                                // deallocate a string for every message...
+                                while let Ok(permit) = tx.reserve().await {
+                                    permit.send(String::from(THE_STRING));
                                 }
                             }));
                         }
@@ -135,12 +122,7 @@ aaaaaaaaaaaaaa";
                     for _ in 0..senders {
                         let tx = tx.clone();
                         task::spawn(async move {
-                            loop {
-                                match tx.send(String::from(THE_STRING)).await {
-                                    Ok(_) => {}
-                                    Err(_) => break,
-                                }
-                            }
+                            while let Ok(_) = tx.send(String::from(THE_STRING)).await {}
                         });
                     }
                     for _ in 0..SIZE {
@@ -170,13 +152,8 @@ fn bench_mpsc_integer(c: &mut Criterion) {
                     for i in 0..senders {
                         let tx = tx.clone();
                         task::spawn(async move {
-                            loop {
-                                match tx.send_ref().await {
-                                    Ok(mut slot) => {
-                                        *slot = i;
-                                    }
-                                    Err(_) => break,
-                                }
+                            while let Ok(mut slot) = tx.send_ref().await {
+                                *slot = i;
                             }
                         });
                     }
@@ -199,14 +176,7 @@ fn bench_mpsc_integer(c: &mut Criterion) {
                     let (tx, mut rx) = mpsc::channel(100);
                     for i in 0..senders {
                         let mut tx = tx.clone();
-                        task::spawn(async move {
-                            loop {
-                                match tx.send(i).await {
-                                    Ok(_) => {}
-                                    Err(_) => break,
-                                }
-                            }
-                        });
+                        task::spawn(async move { while let Ok(_) = tx.send(i).await {} });
                     }
                     for _ in 0..SIZE {
                         let val = rx.next().await.unwrap();
@@ -240,17 +210,7 @@ fn bench_mpsc_integer(c: &mut Criterion) {
                         for i in 0..senders {
                             let tx = tx.clone();
                             task::spawn(tokio::task::unconstrained(async move {
-                                loop {
-                                    // this actually brings Tokio's MPSC closer to what
-                                    // `ThingBuf` can do than all the other impls --- we
-                                    // only allocate if we _were_ able to reserve send
-                                    // capacity. but, we will still allocate and
-                                    // deallocate a string for every message...
-                                    match tx.send(i).await {
-                                        Ok(_) => {}
-                                        Err(_) => break,
-                                    }
-                                }
+                                while let Ok(_) = tx.send(i).await {}
                             }));
                         }
                         for _ in 0..SIZE {
@@ -273,14 +233,7 @@ fn bench_mpsc_integer(c: &mut Criterion) {
 
                     for i in 0..senders {
                         let tx = tx.clone();
-                        task::spawn(async move {
-                            loop {
-                                match tx.send(i).await {
-                                    Ok(_) => {}
-                                    Err(_) => break,
-                                }
-                            }
-                        });
+                        task::spawn(async move { while let Ok(_) = tx.send(i).await {} });
                     }
                     for _ in 0..SIZE {
                         let val = rx.recv().await.unwrap();

+ 24 - 34
bench/benches/async_spsc.rs

@@ -18,6 +18,7 @@ aaaaaaaaaaaaaa";
 
     for size in [100, 500, 1_000, 5_000, 10_000] {
         group.throughput(Throughput::Elements(size));
+
         group.bench_with_input(BenchmarkId::new("ThingBuf", size), &size, |b, &i| {
             let rt = runtime::Builder::new_current_thread().build().unwrap();
             b.to_async(rt).iter(|| async {
@@ -45,6 +46,7 @@ aaaaaaaaaaaaaa";
             })
         });
 
+        #[cfg(feature = "futures")]
         group.bench_with_input(
             BenchmarkId::new("futures::channel::mpsc", size),
             &size,
@@ -70,6 +72,7 @@ aaaaaaaaaaaaaa";
             },
         );
 
+        #[cfg(feature = "tokio-sync")]
         group.bench_with_input(
             BenchmarkId::new("tokio::sync::mpsc", size),
             &size,
@@ -113,6 +116,7 @@ aaaaaaaaaaaaaa";
             },
         );
 
+        #[cfg(feature = "async-std")]
         group.bench_with_input(
             BenchmarkId::new("async_std::channel::bounded", size),
             &size,
@@ -150,23 +154,16 @@ aaaaaaaaaaaaaa";
 
     for size in [100, 500, 1_000, 5_000, 10_000] {
         group.throughput(Throughput::Elements(size));
+
         group.bench_with_input(BenchmarkId::new("ThingBuf", size), &size, |b, &i| {
             let rt = runtime::Builder::new_current_thread().build().unwrap();
             b.to_async(rt).iter(|| async {
-                use thingbuf::{
-                    mpsc::{self, TrySendError},
-                    ThingBuf,
-                };
+                use thingbuf::{mpsc, ThingBuf};
                 let (tx, rx) = mpsc::channel(ThingBuf::<String>::new(100));
                 task::spawn(async move {
-                    loop {
-                        match tx.send_ref().await {
-                            Ok(mut slot) => {
-                                slot.clear();
-                                slot.push_str(THE_STRING);
-                            }
-                            Err(_) => break,
-                        }
+                    while let Ok(mut slot) = tx.send_ref().await {
+                        slot.clear();
+                        slot.push_str(THE_STRING);
                     }
                 });
                 for _ in 0..i {
@@ -176,6 +173,7 @@ aaaaaaaaaaaaaa";
             })
         });
 
+        #[cfg(feature = "futures")]
         group.bench_with_input(
             BenchmarkId::new("futures::channel::mpsc", size),
             &size,
@@ -185,12 +183,7 @@ aaaaaaaaaaaaaa";
                     use futures::{channel::mpsc, sink::SinkExt, stream::StreamExt};
                     let (mut tx, mut rx) = mpsc::channel(100);
                     task::spawn(async move {
-                        loop {
-                            match tx.send(String::from(THE_STRING)).await {
-                                Ok(_) => {}
-                                Err(_) => break,
-                            }
-                        }
+                        while let Ok(_) = tx.send(String::from(THE_STRING)).await {}
                     });
                     for _ in 0..i {
                         let val = rx.next().await.unwrap();
@@ -200,6 +193,7 @@ aaaaaaaaaaaaaa";
             },
         );
 
+        #[cfg(feature = "tokio-sync")]
         group.bench_with_input(
             BenchmarkId::new("tokio::sync::mpsc", size),
             &size,
@@ -221,16 +215,13 @@ aaaaaaaaaaaaaa";
                         use tokio::sync::mpsc::{self, error::TrySendError};
                         let (tx, mut rx) = mpsc::channel(100);
                         task::spawn(tokio::task::unconstrained(async move {
-                            loop {
-                                // this actually brings Tokio's MPSC closer to what
-                                // `ThingBuf` can do than all the other impls --- we
-                                // only allocate if we _were_ able to reserve send
-                                // capacity. but, we will still allocate and
-                                // deallocate a string for every message...
-                                match tx.reserve().await {
-                                    Ok(permit) => permit.send(String::from(THE_STRING)),
-                                    Err(_) => break,
-                                }
+                            // this actually brings Tokio's MPSC closer to what
+                            // `ThingBuf` can do than all the other impls --- we
+                            // only allocate if we _were_ able to reserve send
+                            // capacity. but, we will still allocate and
+                            // deallocate a string for every message...
+                            while let Ok(permit) = tx.reserve().await {
+                                permit.send(String::from(THE_STRING));
                             }
                         }));
                         for _ in 0..i {
@@ -242,6 +233,7 @@ aaaaaaaaaaaaaa";
             },
         );
 
+        #[cfg(feature = "async-std")]
         group.bench_with_input(
             BenchmarkId::new("async_std::channel::bounded", size),
             &size,
@@ -251,12 +243,7 @@ aaaaaaaaaaaaaa";
                     use async_std::channel;
                     let (tx, rx) = channel::bounded(100);
                     task::spawn(async move {
-                        loop {
-                            match tx.send(String::from(THE_STRING)).await {
-                                Ok(_) => {}
-                                Err(_) => break,
-                            }
-                        }
+                        while let Ok(_) = tx.send(String::from(THE_STRING)).await {}
                     });
                     for _ in 0..i {
                         let val = rx.recv().await.unwrap();
@@ -304,6 +291,7 @@ fn bench_spsc_try_send_integer(c: &mut Criterion) {
             })
         });
 
+        #[cfg(feature = "futures")]
         group.bench_with_input(
             BenchmarkId::new("futures::channel::mpsc", size),
             &size,
@@ -332,6 +320,7 @@ fn bench_spsc_try_send_integer(c: &mut Criterion) {
             },
         );
 
+        #[cfg(feature = "tokio-sync")]
         group.bench_with_input(
             BenchmarkId::new("tokio::sync::mpsc", size),
             &size,
@@ -373,6 +362,7 @@ fn bench_spsc_try_send_integer(c: &mut Criterion) {
             },
         );
 
+        #[cfg(feature = "async-std")]
         group.bench_with_input(
             BenchmarkId::new("async_std::channel::bounded", size),
             &size,

+ 17 - 30
bench/benches/sync_spsc.rs

@@ -24,13 +24,13 @@ aaaaaaaaaaaaaa";
                     mpsc::{sync, TrySendError},
                     ThingBuf,
                 };
-                let (tx, rx) = sync::channel(ThingBuf::new(100));
+                let (tx, rx) = sync::channel(ThingBuf::<String>::new(100));
                 let producer = thread::spawn(move || loop {
                     match tx.try_send_ref() {
-                        Ok(mut r) => r.with_mut(|s: &mut String| {
-                            s.clear();
-                            s.push_str(THE_STRING)
-                        }),
+                        Ok(mut slot) => {
+                            slot.clear();
+                            slot.push_str(THE_STRING)
+                        }
                         Err(TrySendError::Closed(_)) => break,
                         _ => thread::yield_now(),
                     }
@@ -108,18 +108,12 @@ aaaaaaaaaaaaaa";
         group.throughput(Throughput::Elements(size));
         group.bench_with_input(BenchmarkId::new("ThingBuf", size), &size, |b, &i| {
             b.iter(|| {
-                use thingbuf::{
-                    mpsc::{sync, TrySendError},
-                    ThingBuf,
-                };
-                let (tx, rx) = sync::channel(ThingBuf::new(100));
-                let producer = thread::spawn(move || loop {
-                    match tx.send_ref() {
-                        Ok(mut slot) => {
-                            slot.clear();
-                            slot.push_str(THE_STRING);
-                        }
-                        Err(TrySendError::Closed(_)) => break,
+                use thingbuf::{mpsc::sync, ThingBuf};
+                let (tx, rx) = sync::channel(ThingBuf::<String>::new(100));
+                let producer = thread::spawn(move || {
+                    while let Ok(mut slot) = tx.send_ref() {
+                        slot.clear();
+                        slot.push_str(THE_STRING);
                     }
                 });
                 for _ in 0..i {
@@ -138,12 +132,8 @@ aaaaaaaaaaaaaa";
             b.iter(|| {
                 use std::sync::mpsc::{self, TrySendError};
                 let (tx, rx) = mpsc::sync_channel(100);
-                let producer = thread::spawn(move || loop {
-                    match tx.send(String::from(THE_STRING)) {
-                        Ok(()) => {}
-                        Err(_) => break,
-                    }
-                });
+                let producer =
+                    thread::spawn(move || while let Ok(_) = tx.send(String::from(THE_STRING)) {});
                 for _ in 0..i {
                     let val = rx.recv().unwrap();
                     criterion::black_box(&val);
@@ -162,13 +152,10 @@ aaaaaaaaaaaaaa";
                     use crossbeam::channel::{self, TrySendError};
                     let (tx, rx) = channel::bounded(100);
 
-                    let producer = thread::spawn(move || loop {
-                        match tx.send(String::from(THE_STRING)) {
-                            Ok(()) => {}
-                            Err(TrySendError::Disconnected(_)) => break,
-                            Err(_) => break,
-                        }
-                    });
+                    let producer =
+                        thread::spawn(
+                            move || while let Ok(_) = tx.send(String::from(THE_STRING)) {},
+                        );
 
                     for _ in 0..i {
                         let val = rx.recv().unwrap();