async_mpsc.rs 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253
  1. use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion};
  2. use tokio::{runtime, task};
  3. /// This benchmark simulates sending a bunch of strings over a channel. It's
  4. /// intended to simulate the sort of workload that a `thingbuf` is intended
  5. /// for, where the type of element in the buffer is expensive to allocate,
  6. /// copy, or drop, but they can be re-used in place without
  7. /// allocating/deallocating.
  8. ///
  9. /// So, this may not be strictly representative of performance in the case of,
  10. /// say, sending a bunch of integers over the channel; instead it simulates
  11. /// the kind of scenario that `thingbuf` is optimized for.
  12. fn bench_mpsc_reusable(c: &mut Criterion) {
  13. let mut group = c.benchmark_group("async/mpsc_reusable");
  14. static THE_STRING: &str = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa\
  15. aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa\
  16. aaaaaaaaaaaaaa";
  17. const SIZE: u64 = 100;
  18. for senders in [10, 50, 100] {
  19. group.bench_with_input(
  20. BenchmarkId::new("ThingBuf", senders),
  21. &senders,
  22. |b, &senders| {
  23. b.to_async(rt()).iter(|| async {
  24. use thingbuf::{mpsc, ThingBuf};
  25. let (tx, rx) = mpsc::channel(ThingBuf::<String>::new(100));
  26. for _ in 0..senders {
  27. let tx = tx.clone();
  28. task::spawn(async move {
  29. while let Ok(mut slot) = tx.send_ref().await {
  30. slot.clear();
  31. slot.push_str(THE_STRING);
  32. }
  33. });
  34. }
  35. for _ in 0..SIZE {
  36. let val = rx.recv_ref().await.unwrap();
  37. criterion::black_box(&*val);
  38. }
  39. })
  40. },
  41. );
  42. #[cfg(feature = "futures")]
  43. group.bench_with_input(
  44. BenchmarkId::new("futures::channel::mpsc", senders),
  45. &senders,
  46. |b, &senders| {
  47. b.to_async(rt()).iter(|| async {
  48. use futures::{channel::mpsc, sink::SinkExt, stream::StreamExt};
  49. let (tx, mut rx) = mpsc::channel(100);
  50. for _ in 0..senders {
  51. let mut tx = tx.clone();
  52. task::spawn(async move {
  53. while tx.send(String::from(THE_STRING)).await.is_ok() {}
  54. });
  55. }
  56. for _ in 0..SIZE {
  57. let val = rx.next().await.unwrap();
  58. criterion::black_box(&val);
  59. }
  60. })
  61. },
  62. );
  63. #[cfg(feature = "tokio-sync")]
  64. group.bench_with_input(
  65. BenchmarkId::new("tokio::sync::mpsc", senders),
  66. &senders,
  67. |b, &senders| {
  68. b.to_async(rt()).iter(|| {
  69. // turn off Tokio's automatic cooperative yielding for this
  70. // benchmark. in code with a large number of concurrent
  71. // tasks, this feature makes the MPSC channel (and other
  72. // Tokio synchronization primitives) better "team players"
  73. // than other implementations, since it prevents them from
  74. // using too much scheduler time.
  75. //
  76. // in this benchmark, though, there *are* no other tasks
  77. // running, so automatic yielding just means we spend more
  78. // time ping-ponging through the scheduler than every other
  79. // implementation.
  80. tokio::task::unconstrained(async {
  81. use tokio::sync::mpsc;
  82. let (tx, mut rx) = mpsc::channel(100);
  83. for _ in 0..senders {
  84. let tx = tx.clone();
  85. task::spawn(tokio::task::unconstrained(async move {
  86. // this actually brings Tokio's MPSC closer to what
  87. // `ThingBuf` can do than all the other impls --- we
  88. // only allocate if we _were_ able to reserve send
  89. // capacity. but, we will still allocate and
  90. // deallocate a string for every message...
  91. while let Ok(permit) = tx.reserve().await {
  92. permit.send(String::from(THE_STRING));
  93. }
  94. }));
  95. }
  96. for _ in 0..SIZE {
  97. let val = rx.recv().await.unwrap();
  98. criterion::black_box(&val);
  99. }
  100. })
  101. })
  102. },
  103. );
  104. #[cfg(feature = "async-std")]
  105. group.bench_with_input(
  106. BenchmarkId::new("async_std::channel::bounded", senders),
  107. &senders,
  108. |b, &senders| {
  109. b.to_async(rt()).iter(|| async {
  110. use async_std::channel;
  111. let (tx, rx) = channel::bounded(100);
  112. for _ in 0..senders {
  113. let tx = tx.clone();
  114. task::spawn(async move {
  115. while tx.send(String::from(THE_STRING)).await.is_ok() {}
  116. });
  117. }
  118. for _ in 0..SIZE {
  119. let val = rx.recv().await.unwrap();
  120. criterion::black_box(&val);
  121. }
  122. })
  123. },
  124. );
  125. }
  126. group.finish();
  127. }
  128. fn bench_mpsc_integer(c: &mut Criterion) {
  129. let mut group = c.benchmark_group("async/mpsc_integer");
  130. const SIZE: u64 = 1_000;
  131. for senders in [10, 50, 100] {
  132. group.bench_with_input(
  133. BenchmarkId::new("ThingBuf", senders),
  134. &senders,
  135. |b, &senders| {
  136. b.to_async(rt()).iter(|| async {
  137. use thingbuf::{mpsc, ThingBuf};
  138. let (tx, rx) = mpsc::channel(ThingBuf::new(100));
  139. for i in 0..senders {
  140. let tx = tx.clone();
  141. task::spawn(async move {
  142. while let Ok(mut slot) = tx.send_ref().await {
  143. *slot = i;
  144. }
  145. });
  146. }
  147. for _ in 0..SIZE {
  148. let val = rx.recv_ref().await.unwrap();
  149. criterion::black_box(&*val);
  150. }
  151. })
  152. },
  153. );
  154. #[cfg(feature = "futures")]
  155. group.bench_with_input(
  156. BenchmarkId::new("futures::channel::mpsc", senders),
  157. &senders,
  158. |b, &senders| {
  159. b.to_async(rt()).iter(|| async {
  160. use futures::{channel::mpsc, sink::SinkExt, stream::StreamExt};
  161. let (tx, mut rx) = mpsc::channel(100);
  162. for i in 0..senders {
  163. let mut tx = tx.clone();
  164. task::spawn(async move { while tx.send(i).await.is_ok() {} });
  165. }
  166. for _ in 0..SIZE {
  167. let val = rx.next().await.unwrap();
  168. criterion::black_box(&val);
  169. }
  170. })
  171. },
  172. );
  173. #[cfg(feature = "tokio-sync")]
  174. group.bench_with_input(
  175. BenchmarkId::new("tokio::sync::mpsc", senders),
  176. &senders,
  177. |b, &senders| {
  178. b.to_async(rt()).iter(|| {
  179. // turn off Tokio's automatic cooperative yielding for this
  180. // benchmark. in code with a large number of concurrent
  181. // tasks, this feature makes the MPSC channel (and other
  182. // Tokio synchronization primitives) better "team players"
  183. // than other implementations, since it prevents them from
  184. // using too much scheduler time.
  185. //
  186. // in this benchmark, though, there *are* no other tasks
  187. // running, so automatic yielding just means we spend more
  188. // time ping-ponging through the scheduler than every other
  189. // implementation.
  190. tokio::task::unconstrained(async {
  191. use tokio::sync::mpsc;
  192. let (tx, mut rx) = mpsc::channel(100);
  193. for i in 0..senders {
  194. let tx = tx.clone();
  195. task::spawn(tokio::task::unconstrained(async move {
  196. while tx.send(i).await.is_ok() {}
  197. }));
  198. }
  199. for _ in 0..SIZE {
  200. let val = rx.recv().await.unwrap();
  201. criterion::black_box(&val);
  202. }
  203. })
  204. })
  205. },
  206. );
  207. #[cfg(feature = "async-std")]
  208. group.bench_with_input(
  209. BenchmarkId::new("async_std::channel::bounded", senders),
  210. &senders,
  211. |b, &senders| {
  212. b.to_async(rt()).iter(|| async {
  213. use async_std::channel;
  214. let (tx, rx) = channel::bounded(100);
  215. for i in 0..senders {
  216. let tx = tx.clone();
  217. task::spawn(async move { while tx.send(i).await.is_ok() {} });
  218. }
  219. for _ in 0..SIZE {
  220. let val = rx.recv().await.unwrap();
  221. criterion::black_box(&val);
  222. }
  223. })
  224. },
  225. );
  226. }
  227. group.finish();
  228. }
  229. fn rt() -> tokio::runtime::Runtime {
  230. runtime::Builder::new_multi_thread().build().unwrap()
  231. }
  232. criterion_group!(benches, bench_mpsc_reusable, bench_mpsc_integer,);
  233. criterion_main!(benches);