async_mpsc_utils.rs 10.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253
  1. use criterion::{measurement::WallTime, BenchmarkId};
  2. use tokio::{runtime, task};
  3. /// This benchmark simulates sending a bunch of strings over a channel. It's
  4. /// intended to simulate the sort of workload that a `thingbuf` is intended
  5. /// for, where the type of element in the buffer is expensive to allocate,
  6. /// copy, or drop, but they can be re-used in place without
  7. /// allocating/deallocating.
  8. ///
  9. /// So, this may not be strictly representative of performance in the case of,
  10. /// say, sending a bunch of integers over the channel; instead it simulates
  11. /// the kind of scenario that `thingbuf` is optimized for.
  12. pub fn bench_mpsc_reusable(
  13. mut group: criterion::BenchmarkGroup<WallTime>,
  14. size: u64,
  15. capacity: usize,
  16. ) {
  17. static THE_STRING: &str = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa\
  18. aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa\
  19. aaaaaaaaaaaaaa";
  20. for senders in [10, 50, 100] {
  21. group.bench_with_input(
  22. BenchmarkId::new("ThingBuf", senders),
  23. &senders,
  24. |b, &senders| {
  25. b.to_async(rt()).iter(|| async {
  26. use thingbuf::mpsc;
  27. let (tx, rx) = mpsc::channel::<String>(capacity);
  28. for _ in 0..senders {
  29. let tx = tx.clone();
  30. task::spawn(async move {
  31. while let Ok(mut slot) = tx.send_ref().await {
  32. slot.clear();
  33. slot.push_str(THE_STRING);
  34. }
  35. });
  36. }
  37. for _ in 0..size {
  38. let val = rx.recv_ref().await.unwrap();
  39. criterion::black_box(&*val);
  40. }
  41. })
  42. },
  43. );
  44. #[cfg(feature = "futures")]
  45. group.bench_with_input(
  46. BenchmarkId::new("futures::channel::mpsc", senders),
  47. &senders,
  48. |b, &senders| {
  49. b.to_async(rt()).iter(|| async {
  50. use futures::{channel::mpsc, sink::SinkExt, stream::StreamExt};
  51. let (tx, mut rx) = mpsc::channel(capacity);
  52. for _ in 0..senders {
  53. let mut tx = tx.clone();
  54. task::spawn(async move {
  55. while tx.send(String::from(THE_STRING)).await.is_ok() {}
  56. });
  57. }
  58. for _ in 0..size {
  59. let val = rx.next().await.unwrap();
  60. criterion::black_box(&val);
  61. }
  62. })
  63. },
  64. );
  65. #[cfg(feature = "tokio-sync")]
  66. group.bench_with_input(
  67. BenchmarkId::new("tokio::sync::mpsc", senders),
  68. &senders,
  69. |b, &senders| {
  70. b.to_async(rt()).iter(|| {
  71. // turn off Tokio's automatic cooperative yielding for this
  72. // benchmark. in code with a large number of concurrent
  73. // tasks, this feature makes the MPSC channel (and other
  74. // Tokio synchronization primitives) better "team players"
  75. // than other implementations, since it prevents them from
  76. // using too much scheduler time.
  77. //
  78. // in this benchmark, though, there *are* no other tasks
  79. // running, so automatic yielding just means we spend more
  80. // time ping-ponging through the scheduler than every other
  81. // implementation.
  82. tokio::task::unconstrained(async {
  83. use tokio::sync::mpsc;
  84. let (tx, mut rx) = mpsc::channel(capacity);
  85. for _ in 0..senders {
  86. let tx = tx.clone();
  87. task::spawn(tokio::task::unconstrained(async move {
  88. // this actually brings Tokio's MPSC closer to what
  89. // `ThingBuf` can do than all the other impls --- we
  90. // only allocate if we _were_ able to reserve send
  91. // capacity. but, we will still allocate and
  92. // deallocate a string for every message...
  93. while let Ok(permit) = tx.reserve().await {
  94. permit.send(String::from(THE_STRING));
  95. }
  96. }));
  97. }
  98. for _ in 0..size {
  99. let val = rx.recv().await.unwrap();
  100. criterion::black_box(&val);
  101. }
  102. })
  103. })
  104. },
  105. );
  106. #[cfg(feature = "async-std")]
  107. group.bench_with_input(
  108. BenchmarkId::new("async_std::channel::bounded", senders),
  109. &senders,
  110. |b, &senders| {
  111. b.to_async(rt()).iter(|| async {
  112. use async_std::channel;
  113. let (tx, rx) = channel::bounded(capacity);
  114. for _ in 0..senders {
  115. let tx = tx.clone();
  116. task::spawn(async move {
  117. while tx.send(String::from(THE_STRING)).await.is_ok() {}
  118. });
  119. }
  120. for _ in 0..size {
  121. let val = rx.recv().await.unwrap();
  122. criterion::black_box(&val);
  123. }
  124. })
  125. },
  126. );
  127. }
  128. group.finish();
  129. }
  130. pub fn bench_mpsc_integer(
  131. mut group: criterion::BenchmarkGroup<WallTime>,
  132. size: u64,
  133. capacity: usize,
  134. ) {
  135. for senders in [10, 50, 100] {
  136. group.bench_with_input(
  137. BenchmarkId::new("ThingBuf", senders),
  138. &senders,
  139. |b, &senders| {
  140. b.to_async(rt()).iter(|| async {
  141. use thingbuf::mpsc;
  142. let (tx, rx) = mpsc::channel::<i32>(capacity);
  143. for i in 0..senders {
  144. let tx = tx.clone();
  145. task::spawn(async move {
  146. while let Ok(mut slot) = tx.send_ref().await {
  147. *slot = i;
  148. }
  149. });
  150. }
  151. for _ in 0..size {
  152. let val = rx.recv_ref().await.unwrap();
  153. criterion::black_box(&*val);
  154. }
  155. })
  156. },
  157. );
  158. #[cfg(feature = "futures")]
  159. group.bench_with_input(
  160. BenchmarkId::new("futures::channel::mpsc", senders),
  161. &senders,
  162. |b, &senders| {
  163. b.to_async(rt()).iter(|| async {
  164. use futures::{channel::mpsc, sink::SinkExt, stream::StreamExt};
  165. let (tx, mut rx) = mpsc::channel(capacity);
  166. for i in 0..senders {
  167. let mut tx = tx.clone();
  168. task::spawn(async move { while tx.send(i).await.is_ok() {} });
  169. }
  170. for _ in 0..size {
  171. let val = rx.next().await.unwrap();
  172. criterion::black_box(&val);
  173. }
  174. })
  175. },
  176. );
  177. #[cfg(feature = "tokio-sync")]
  178. group.bench_with_input(
  179. BenchmarkId::new("tokio::sync::mpsc", senders),
  180. &senders,
  181. |b, &senders| {
  182. b.to_async(rt()).iter(|| {
  183. // turn off Tokio's automatic cooperative yielding for this
  184. // benchmark. in code with a large number of concurrent
  185. // tasks, this feature makes the MPSC channel (and other
  186. // Tokio synchronization primitives) better "team players"
  187. // than other implementations, since it prevents them from
  188. // using too much scheduler time.
  189. //
  190. // in this benchmark, though, there *are* no other tasks
  191. // running, so automatic yielding just means we spend more
  192. // time ping-ponging through the scheduler than every other
  193. // implementation.
  194. tokio::task::unconstrained(async {
  195. use tokio::sync::mpsc;
  196. let (tx, mut rx) = mpsc::channel(capacity);
  197. for i in 0..senders {
  198. let tx = tx.clone();
  199. task::spawn(tokio::task::unconstrained(async move {
  200. while tx.send(i).await.is_ok() {}
  201. }));
  202. }
  203. for _ in 0..size {
  204. let val = rx.recv().await.unwrap();
  205. criterion::black_box(&val);
  206. }
  207. })
  208. })
  209. },
  210. );
  211. #[cfg(feature = "async-std")]
  212. group.bench_with_input(
  213. BenchmarkId::new("async_std::channel::bounded", senders),
  214. &senders,
  215. |b, &senders| {
  216. b.to_async(rt()).iter(|| async {
  217. use async_std::channel;
  218. let (tx, rx) = channel::bounded(capacity);
  219. for i in 0..senders {
  220. let tx = tx.clone();
  221. task::spawn(async move { while tx.send(i).await.is_ok() {} });
  222. }
  223. for _ in 0..size {
  224. let val = rx.recv().await.unwrap();
  225. criterion::black_box(&val);
  226. }
  227. })
  228. },
  229. );
  230. }
  231. group.finish();
  232. }
  233. fn rt() -> tokio::runtime::Runtime {
  234. runtime::Builder::new_multi_thread().build().unwrap()
  235. }