sync_spsc.rs 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170
  1. use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion, Throughput};
  2. use std::thread;
  3. /// This benchmark simulates sending a bunch of strings over a channel. It's
  4. /// intended to simulate the sort of workload that a `thingbuf` is intended
  5. /// for, where the type of element in the buffer is expensive to allocate,
  6. /// copy, or drop, but they can be re-used in place without
  7. /// allocating/deallocating.
  8. ///
  9. /// So, this may not be strictly representative of performance in the case of,
  10. /// say, sending a bunch of integers over the channel; instead it simulates
  11. /// the kind of scenario that `thingbuf` is optimized for.
  12. fn bench_spsc_try_send_reusable(c: &mut Criterion) {
  13. let mut group = c.benchmark_group("sync/spsc/try_send_reusable");
  14. static THE_STRING: &str = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa\
  15. aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa\
  16. aaaaaaaaaaaaaa";
  17. for size in [100, 500, 1_000, 5_000, 10_000] {
  18. group.throughput(Throughput::Elements(size));
  19. group.bench_with_input(BenchmarkId::new("ThingBuf", size), &size, |b, &i| {
  20. b.iter(|| {
  21. use thingbuf::mpsc::{blocking, errors::TrySendError};
  22. let (tx, rx) = blocking::channel::<String>(100);
  23. let producer = thread::spawn(move || loop {
  24. match tx.try_send_ref() {
  25. Ok(mut slot) => {
  26. slot.clear();
  27. slot.push_str(THE_STRING)
  28. }
  29. Err(TrySendError::Closed(_)) => break,
  30. _ => thread::yield_now(),
  31. }
  32. });
  33. for _ in 0..i {
  34. let val = rx.recv_ref().unwrap();
  35. criterion::black_box(val);
  36. }
  37. drop(rx);
  38. producer.join().unwrap();
  39. })
  40. });
  41. #[cfg(feature = "std-sync")]
  42. group.bench_with_input(BenchmarkId::new("std::sync::mpsc", size), &size, |b, &i| {
  43. b.iter(|| {
  44. use std::sync::mpsc::{self, TrySendError};
  45. let (tx, rx) = mpsc::sync_channel(100);
  46. let producer = thread::spawn(move || loop {
  47. match tx.try_send(String::from(THE_STRING)) {
  48. Ok(()) => {}
  49. Err(TrySendError::Disconnected(_)) => break,
  50. _ => thread::yield_now(),
  51. }
  52. });
  53. for _ in 0..i {
  54. let val = rx.recv().unwrap();
  55. criterion::black_box(&val);
  56. }
  57. drop(rx);
  58. producer.join().unwrap();
  59. })
  60. });
  61. #[cfg(feature = "crossbeam")]
  62. group.bench_with_input(
  63. BenchmarkId::new("crossbeam::channel::bounded", size),
  64. &size,
  65. |b, &i| {
  66. b.iter(|| {
  67. use crossbeam::channel::{self, TrySendError};
  68. let (tx, rx) = channel::bounded(100);
  69. let producer = thread::spawn(move || loop {
  70. match tx.try_send(String::from(THE_STRING)) {
  71. Ok(()) => {}
  72. Err(TrySendError::Disconnected(_)) => break,
  73. _ => thread::yield_now(),
  74. }
  75. });
  76. for _ in 0..i {
  77. let val = rx.recv().unwrap();
  78. criterion::black_box(&val);
  79. }
  80. drop(rx);
  81. producer.join().unwrap();
  82. })
  83. },
  84. );
  85. }
  86. group.finish();
  87. }
  88. fn bench_spsc_blocking_send_reusable(c: &mut Criterion) {
  89. let mut group = c.benchmark_group("sync/spsc/blocking_send_reusable");
  90. static THE_STRING: &str = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa\
  91. aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa\
  92. aaaaaaaaaaaaaa";
  93. for size in [100, 500, 1_000, 5_000, 10_000] {
  94. group.throughput(Throughput::Elements(size));
  95. group.bench_with_input(BenchmarkId::new("ThingBuf", size), &size, |b, &i| {
  96. b.iter(|| {
  97. use thingbuf::mpsc::blocking;
  98. let (tx, rx) = blocking::channel::<String>(100);
  99. let producer = thread::spawn(move || {
  100. while let Ok(mut slot) = tx.send_ref() {
  101. slot.clear();
  102. slot.push_str(THE_STRING);
  103. }
  104. });
  105. for _ in 0..i {
  106. let val = rx.recv_ref().unwrap();
  107. criterion::black_box(val);
  108. }
  109. drop(rx);
  110. producer.join().unwrap();
  111. })
  112. });
  113. #[cfg(feature = "std-sync")]
  114. group.bench_with_input(BenchmarkId::new("std::sync::mpsc", size), &size, |b, &i| {
  115. b.iter(|| {
  116. use std::sync::mpsc;
  117. let (tx, rx) = mpsc::sync_channel(100);
  118. let producer =
  119. thread::spawn(move || while tx.send(String::from(THE_STRING)).is_ok() {});
  120. for _ in 0..i {
  121. let val = rx.recv().unwrap();
  122. criterion::black_box(&val);
  123. }
  124. drop(rx);
  125. producer.join().unwrap();
  126. })
  127. });
  128. #[cfg(feature = "crossbeam")]
  129. group.bench_with_input(
  130. BenchmarkId::new("crossbeam::channel::bounded", size),
  131. &size,
  132. |b, &i| {
  133. b.iter(|| {
  134. use crossbeam::channel;
  135. let (tx, rx) = channel::bounded(100);
  136. let producer =
  137. thread::spawn(move || while tx.send(String::from(THE_STRING)).is_ok() {});
  138. for _ in 0..i {
  139. let val = rx.recv().unwrap();
  140. criterion::black_box(&val);
  141. }
  142. drop(rx);
  143. producer.join().unwrap();
  144. })
  145. },
  146. );
  147. }
  148. group.finish();
  149. }
  150. criterion_group!(
  151. benches,
  152. bench_spsc_try_send_reusable,
  153. bench_spsc_blocking_send_reusable
  154. );
  155. criterion_main!(benches);