sync_spsc.rs 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177
  1. use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion, Throughput};
  2. use std::thread;
  3. /// This benchmark simulates sending a bunch of strings over a channel. It's
  4. /// intended to simulate the sort of workload that a `thingbuf` is intended
  5. /// for, where the type of element in the buffer is expensive to allocate,
  6. /// copy, or drop, but they can be re-used in place without
  7. /// allocating/deallocating.
  8. ///
  9. /// So, this may not be strictly representative of performance in the case of,
  10. /// say, sending a bunch of integers over the channel; instead it simulates
  11. /// the kind of scenario that `thingbuf` is optimized for.
  12. fn bench_spsc_try_send_reusable(c: &mut Criterion) {
  13. let mut group = c.benchmark_group("sync/spsc/try_send_reusable");
  14. static THE_STRING: &str = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa\
  15. aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa\
  16. aaaaaaaaaaaaaa";
  17. for size in [100, 500, 1_000, 5_000, 10_000] {
  18. group.throughput(Throughput::Elements(size));
  19. group.bench_with_input(BenchmarkId::new("ThingBuf", size), &size, |b, &i| {
  20. b.iter(|| {
  21. use thingbuf::{
  22. mpsc::{sync, TrySendError},
  23. ThingBuf,
  24. };
  25. let (tx, rx) = sync::channel(ThingBuf::<String>::new(100));
  26. let producer = thread::spawn(move || loop {
  27. match tx.try_send_ref() {
  28. Ok(mut slot) => {
  29. slot.clear();
  30. slot.push_str(THE_STRING)
  31. }
  32. Err(TrySendError::Closed(_)) => break,
  33. _ => thread::yield_now(),
  34. }
  35. });
  36. for _ in 0..i {
  37. let r = rx.recv_ref().unwrap();
  38. r.with(|val| {
  39. criterion::black_box(val);
  40. });
  41. }
  42. drop(rx);
  43. producer.join().unwrap();
  44. })
  45. });
  46. #[cfg(feature = "std-sync")]
  47. group.bench_with_input(BenchmarkId::new("std::sync::mpsc", size), &size, |b, &i| {
  48. b.iter(|| {
  49. use std::sync::mpsc::{self, TrySendError};
  50. let (tx, rx) = mpsc::sync_channel(100);
  51. let producer = thread::spawn(move || loop {
  52. match tx.try_send(String::from(THE_STRING)) {
  53. Ok(()) => {}
  54. Err(TrySendError::Disconnected(_)) => break,
  55. _ => thread::yield_now(),
  56. }
  57. });
  58. for _ in 0..i {
  59. let val = rx.recv().unwrap();
  60. criterion::black_box(&val);
  61. }
  62. drop(rx);
  63. producer.join().unwrap();
  64. })
  65. });
  66. #[cfg(feature = "crossbeam")]
  67. group.bench_with_input(
  68. BenchmarkId::new("crossbeam::channel::bounded", size),
  69. &size,
  70. |b, &i| {
  71. b.iter(|| {
  72. use crossbeam::channel::{self, TrySendError};
  73. let (tx, rx) = channel::bounded(100);
  74. let producer = thread::spawn(move || loop {
  75. match tx.try_send(String::from(THE_STRING)) {
  76. Ok(()) => {}
  77. Err(TrySendError::Disconnected(_)) => break,
  78. _ => thread::yield_now(),
  79. }
  80. });
  81. for _ in 0..i {
  82. let val = rx.recv().unwrap();
  83. criterion::black_box(&val);
  84. }
  85. drop(rx);
  86. producer.join().unwrap();
  87. })
  88. },
  89. );
  90. }
  91. group.finish();
  92. }
  93. fn bench_spsc_blocking_send_reusable(c: &mut Criterion) {
  94. let mut group = c.benchmark_group("sync/spsc/blocking_send_reusable");
  95. static THE_STRING: &str = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa\
  96. aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa\
  97. aaaaaaaaaaaaaa";
  98. for size in [100, 500, 1_000, 5_000, 10_000] {
  99. group.throughput(Throughput::Elements(size));
  100. group.bench_with_input(BenchmarkId::new("ThingBuf", size), &size, |b, &i| {
  101. b.iter(|| {
  102. use thingbuf::{mpsc::sync, ThingBuf};
  103. let (tx, rx) = sync::channel(ThingBuf::<String>::new(100));
  104. let producer = thread::spawn(move || {
  105. while let Ok(mut slot) = tx.send_ref() {
  106. slot.clear();
  107. slot.push_str(THE_STRING);
  108. }
  109. });
  110. for _ in 0..i {
  111. let r = rx.recv_ref().unwrap();
  112. r.with(|val| {
  113. criterion::black_box(val);
  114. });
  115. }
  116. drop(rx);
  117. producer.join().unwrap();
  118. })
  119. });
  120. #[cfg(feature = "std-sync")]
  121. group.bench_with_input(BenchmarkId::new("std::sync::mpsc", size), &size, |b, &i| {
  122. b.iter(|| {
  123. use std::sync::mpsc;
  124. let (tx, rx) = mpsc::sync_channel(100);
  125. let producer =
  126. thread::spawn(move || while tx.send(String::from(THE_STRING)).is_ok() {});
  127. for _ in 0..i {
  128. let val = rx.recv().unwrap();
  129. criterion::black_box(&val);
  130. }
  131. drop(rx);
  132. producer.join().unwrap();
  133. })
  134. });
  135. #[cfg(feature = "crossbeam")]
  136. group.bench_with_input(
  137. BenchmarkId::new("crossbeam::channel::bounded", size),
  138. &size,
  139. |b, &i| {
  140. b.iter(|| {
  141. use crossbeam::channel;
  142. let (tx, rx) = channel::bounded(100);
  143. let producer =
  144. thread::spawn(move || while tx.send(String::from(THE_STRING)).is_ok() {});
  145. for _ in 0..i {
  146. let val = rx.recv().unwrap();
  147. criterion::black_box(&val);
  148. }
  149. drop(rx);
  150. producer.join().unwrap();
  151. })
  152. },
  153. );
  154. }
  155. group.finish();
  156. }
  157. criterion_group!(
  158. benches,
  159. bench_spsc_try_send_reusable,
  160. bench_spsc_blocking_send_reusable
  161. );
  162. criterion_main!(benches);