sync_spsc.rs 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174
  1. use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion, Throughput};
  2. use std::thread;
  3. /// This benchmark simulates sending a bunch of strings over a channel. It's
  4. /// intended to simulate the sort of workload that a `thingbuf` is intended
  5. /// for, where the type of element in the buffer is expensive to allocate,
  6. /// copy, or drop, but they can be re-used in place without
  7. /// allocating/deallocating.
  8. ///
  9. /// So, this may not be strictly representative of performance in the case of,
  10. /// say, sending a bunch of integers over the channel; instead it simulates
  11. /// the kind of scenario that `thingbuf` is optimized for.
  12. fn bench_spsc_try_send_reusable(c: &mut Criterion) {
  13. let mut group = c.benchmark_group("sync/spsc/try_send_reusable");
  14. static THE_STRING: &str = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa\
  15. aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa\
  16. aaaaaaaaaaaaaa";
  17. for size in [100, 500, 1_000, 5_000, 10_000] {
  18. group.throughput(Throughput::Elements(size));
  19. group.bench_with_input(BenchmarkId::new("ThingBuf", size), &size, |b, &i| {
  20. b.iter(|| {
  21. use thingbuf::mpsc::{sync, TrySendError};
  22. let (tx, rx) = sync::channel::<String>(100);
  23. let producer = thread::spawn(move || loop {
  24. match tx.try_send_ref() {
  25. Ok(mut slot) => {
  26. slot.clear();
  27. slot.push_str(THE_STRING)
  28. }
  29. Err(TrySendError::Closed(_)) => break,
  30. _ => thread::yield_now(),
  31. }
  32. });
  33. for _ in 0..i {
  34. let r = rx.recv_ref().unwrap();
  35. r.with(|val| {
  36. criterion::black_box(val);
  37. });
  38. }
  39. drop(rx);
  40. producer.join().unwrap();
  41. })
  42. });
  43. #[cfg(feature = "std-sync")]
  44. group.bench_with_input(BenchmarkId::new("std::sync::mpsc", size), &size, |b, &i| {
  45. b.iter(|| {
  46. use std::sync::mpsc::{self, TrySendError};
  47. let (tx, rx) = mpsc::sync_channel(100);
  48. let producer = thread::spawn(move || loop {
  49. match tx.try_send(String::from(THE_STRING)) {
  50. Ok(()) => {}
  51. Err(TrySendError::Disconnected(_)) => break,
  52. _ => thread::yield_now(),
  53. }
  54. });
  55. for _ in 0..i {
  56. let val = rx.recv().unwrap();
  57. criterion::black_box(&val);
  58. }
  59. drop(rx);
  60. producer.join().unwrap();
  61. })
  62. });
  63. #[cfg(feature = "crossbeam")]
  64. group.bench_with_input(
  65. BenchmarkId::new("crossbeam::channel::bounded", size),
  66. &size,
  67. |b, &i| {
  68. b.iter(|| {
  69. use crossbeam::channel::{self, TrySendError};
  70. let (tx, rx) = channel::bounded(100);
  71. let producer = thread::spawn(move || loop {
  72. match tx.try_send(String::from(THE_STRING)) {
  73. Ok(()) => {}
  74. Err(TrySendError::Disconnected(_)) => break,
  75. _ => thread::yield_now(),
  76. }
  77. });
  78. for _ in 0..i {
  79. let val = rx.recv().unwrap();
  80. criterion::black_box(&val);
  81. }
  82. drop(rx);
  83. producer.join().unwrap();
  84. })
  85. },
  86. );
  87. }
  88. group.finish();
  89. }
  90. fn bench_spsc_blocking_send_reusable(c: &mut Criterion) {
  91. let mut group = c.benchmark_group("sync/spsc/blocking_send_reusable");
  92. static THE_STRING: &str = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa\
  93. aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa\
  94. aaaaaaaaaaaaaa";
  95. for size in [100, 500, 1_000, 5_000, 10_000] {
  96. group.throughput(Throughput::Elements(size));
  97. group.bench_with_input(BenchmarkId::new("ThingBuf", size), &size, |b, &i| {
  98. b.iter(|| {
  99. use thingbuf::mpsc::sync;
  100. let (tx, rx) = sync::channel::<String>(100);
  101. let producer = thread::spawn(move || {
  102. while let Ok(mut slot) = tx.send_ref() {
  103. slot.clear();
  104. slot.push_str(THE_STRING);
  105. }
  106. });
  107. for _ in 0..i {
  108. let r = rx.recv_ref().unwrap();
  109. r.with(|val| {
  110. criterion::black_box(val);
  111. });
  112. }
  113. drop(rx);
  114. producer.join().unwrap();
  115. })
  116. });
  117. #[cfg(feature = "std-sync")]
  118. group.bench_with_input(BenchmarkId::new("std::sync::mpsc", size), &size, |b, &i| {
  119. b.iter(|| {
  120. use std::sync::mpsc;
  121. let (tx, rx) = mpsc::sync_channel(100);
  122. let producer =
  123. thread::spawn(move || while tx.send(String::from(THE_STRING)).is_ok() {});
  124. for _ in 0..i {
  125. let val = rx.recv().unwrap();
  126. criterion::black_box(&val);
  127. }
  128. drop(rx);
  129. producer.join().unwrap();
  130. })
  131. });
  132. #[cfg(feature = "crossbeam")]
  133. group.bench_with_input(
  134. BenchmarkId::new("crossbeam::channel::bounded", size),
  135. &size,
  136. |b, &i| {
  137. b.iter(|| {
  138. use crossbeam::channel;
  139. let (tx, rx) = channel::bounded(100);
  140. let producer =
  141. thread::spawn(move || while tx.send(String::from(THE_STRING)).is_ok() {});
  142. for _ in 0..i {
  143. let val = rx.recv().unwrap();
  144. criterion::black_box(&val);
  145. }
  146. drop(rx);
  147. producer.join().unwrap();
  148. })
  149. },
  150. );
  151. }
  152. group.finish();
  153. }
  154. criterion_group!(
  155. benches,
  156. bench_spsc_try_send_reusable,
  157. bench_spsc_blocking_send_reusable
  158. );
  159. criterion_main!(benches);