async_mpsc.rs 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257
  1. use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion};
  2. use tokio::{runtime, task};
  3. /// This benchmark simulates sending a bunch of strings over a channel. It's
  4. /// intended to simulate the sort of workload that a `thingbuf` is intended
  5. /// for, where the type of element in the buffer is expensive to allocate,
  6. /// copy, or drop, but they can be re-used in place without
  7. /// allocating/deallocating.
  8. ///
  9. /// So, this may not be strictly representative of performance in the case of,
  10. /// say, sending a bunch of integers over the channel; instead it simulates
  11. /// the kind of scenario that `thingbuf` is optimized for.
  12. fn bench_mpsc_reusable(c: &mut Criterion) {
  13. let mut group = c.benchmark_group("async/mpsc_reusable");
  14. static THE_STRING: &str = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa\
  15. aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa\
  16. aaaaaaaaaaaaaa";
  17. const SIZE: u64 = 200;
  18. const CAPACITY: usize = 50;
  19. for senders in [10, 50, 100] {
  20. group.bench_with_input(
  21. BenchmarkId::new("ThingBuf", senders),
  22. &senders,
  23. |b, &senders| {
  24. b.to_async(rt()).iter(|| async {
  25. use thingbuf::mpsc;
  26. let (tx, rx) = mpsc::channel::<String>(CAPACITY);
  27. for _ in 0..senders {
  28. let tx = tx.clone();
  29. task::spawn(async move {
  30. while let Ok(mut slot) = tx.send_ref().await {
  31. slot.clear();
  32. slot.push_str(THE_STRING);
  33. }
  34. });
  35. }
  36. for _ in 0..SIZE {
  37. let val = rx.recv_ref().await.unwrap();
  38. criterion::black_box(&*val);
  39. }
  40. })
  41. },
  42. );
  43. #[cfg(feature = "futures")]
  44. group.bench_with_input(
  45. BenchmarkId::new("futures::channel::mpsc", senders),
  46. &senders,
  47. |b, &senders| {
  48. b.to_async(rt()).iter(|| async {
  49. use futures::{channel::mpsc, sink::SinkExt, stream::StreamExt};
  50. let (tx, mut rx) = mpsc::channel(CAPACITY);
  51. for _ in 0..senders {
  52. let mut tx = tx.clone();
  53. task::spawn(async move {
  54. while tx.send(String::from(THE_STRING)).await.is_ok() {}
  55. });
  56. }
  57. for _ in 0..SIZE {
  58. let val = rx.next().await.unwrap();
  59. criterion::black_box(&val);
  60. }
  61. })
  62. },
  63. );
  64. #[cfg(feature = "tokio-sync")]
  65. group.bench_with_input(
  66. BenchmarkId::new("tokio::sync::mpsc", senders),
  67. &senders,
  68. |b, &senders| {
  69. b.to_async(rt()).iter(|| {
  70. // turn off Tokio's automatic cooperative yielding for this
  71. // benchmark. in code with a large number of concurrent
  72. // tasks, this feature makes the MPSC channel (and other
  73. // Tokio synchronization primitives) better "team players"
  74. // than other implementations, since it prevents them from
  75. // using too much scheduler time.
  76. //
  77. // in this benchmark, though, there *are* no other tasks
  78. // running, so automatic yielding just means we spend more
  79. // time ping-ponging through the scheduler than every other
  80. // implementation.
  81. tokio::task::unconstrained(async {
  82. use tokio::sync::mpsc;
  83. let (tx, mut rx) = mpsc::channel(CAPACITY);
  84. for _ in 0..senders {
  85. let tx = tx.clone();
  86. task::spawn(tokio::task::unconstrained(async move {
  87. // this actually brings Tokio's MPSC closer to what
  88. // `ThingBuf` can do than all the other impls --- we
  89. // only allocate if we _were_ able to reserve send
  90. // capacity. but, we will still allocate and
  91. // deallocate a string for every message...
  92. while let Ok(permit) = tx.reserve().await {
  93. permit.send(String::from(THE_STRING));
  94. }
  95. }));
  96. }
  97. for _ in 0..SIZE {
  98. let val = rx.recv().await.unwrap();
  99. criterion::black_box(&val);
  100. }
  101. })
  102. })
  103. },
  104. );
  105. #[cfg(feature = "async-std")]
  106. group.bench_with_input(
  107. BenchmarkId::new("async_std::channel::bounded", senders),
  108. &senders,
  109. |b, &senders| {
  110. b.to_async(rt()).iter(|| async {
  111. use async_std::channel;
  112. let (tx, rx) = channel::bounded(CAPACITY);
  113. for _ in 0..senders {
  114. let tx = tx.clone();
  115. task::spawn(async move {
  116. while tx.send(String::from(THE_STRING)).await.is_ok() {}
  117. });
  118. }
  119. for _ in 0..SIZE {
  120. let val = rx.recv().await.unwrap();
  121. criterion::black_box(&val);
  122. }
  123. })
  124. },
  125. );
  126. }
  127. group.finish();
  128. }
  129. fn bench_mpsc_integer(c: &mut Criterion) {
  130. let mut group = c.benchmark_group("async/mpsc_integer");
  131. const SIZE: u64 = 1_000;
  132. const CAPACITY: usize = 100;
  133. for senders in [10, 50, 100] {
  134. group.bench_with_input(
  135. BenchmarkId::new("ThingBuf", senders),
  136. &senders,
  137. |b, &senders| {
  138. b.to_async(rt()).iter(|| async {
  139. use thingbuf::mpsc;
  140. let (tx, rx) = mpsc::channel::<i32>(CAPACITY);
  141. for i in 0..senders {
  142. let tx = tx.clone();
  143. task::spawn(async move {
  144. while let Ok(mut slot) = tx.send_ref().await {
  145. *slot = i;
  146. }
  147. });
  148. }
  149. for _ in 0..SIZE {
  150. let val = rx.recv_ref().await.unwrap();
  151. criterion::black_box(&*val);
  152. }
  153. })
  154. },
  155. );
  156. #[cfg(feature = "futures")]
  157. group.bench_with_input(
  158. BenchmarkId::new("futures::channel::mpsc", senders),
  159. &senders,
  160. |b, &senders| {
  161. b.to_async(rt()).iter(|| async {
  162. use futures::{channel::mpsc, sink::SinkExt, stream::StreamExt};
  163. let (tx, mut rx) = mpsc::channel(CAPACITY);
  164. for i in 0..senders {
  165. let mut tx = tx.clone();
  166. task::spawn(async move { while tx.send(i).await.is_ok() {} });
  167. }
  168. for _ in 0..SIZE {
  169. let val = rx.next().await.unwrap();
  170. criterion::black_box(&val);
  171. }
  172. })
  173. },
  174. );
  175. #[cfg(feature = "tokio-sync")]
  176. group.bench_with_input(
  177. BenchmarkId::new("tokio::sync::mpsc", senders),
  178. &senders,
  179. |b, &senders| {
  180. b.to_async(rt()).iter(|| {
  181. // turn off Tokio's automatic cooperative yielding for this
  182. // benchmark. in code with a large number of concurrent
  183. // tasks, this feature makes the MPSC channel (and other
  184. // Tokio synchronization primitives) better "team players"
  185. // than other implementations, since it prevents them from
  186. // using too much scheduler time.
  187. //
  188. // in this benchmark, though, there *are* no other tasks
  189. // running, so automatic yielding just means we spend more
  190. // time ping-ponging through the scheduler than every other
  191. // implementation.
  192. tokio::task::unconstrained(async {
  193. use tokio::sync::mpsc;
  194. let (tx, mut rx) = mpsc::channel(CAPACITY);
  195. for i in 0..senders {
  196. let tx = tx.clone();
  197. task::spawn(tokio::task::unconstrained(async move {
  198. while tx.send(i).await.is_ok() {}
  199. }));
  200. }
  201. for _ in 0..SIZE {
  202. let val = rx.recv().await.unwrap();
  203. criterion::black_box(&val);
  204. }
  205. })
  206. })
  207. },
  208. );
  209. #[cfg(feature = "async-std")]
  210. group.bench_with_input(
  211. BenchmarkId::new("async_std::channel::bounded", senders),
  212. &senders,
  213. |b, &senders| {
  214. b.to_async(rt()).iter(|| async {
  215. use async_std::channel;
  216. let (tx, rx) = channel::bounded(CAPACITY);
  217. for i in 0..senders {
  218. let tx = tx.clone();
  219. task::spawn(async move { while tx.send(i).await.is_ok() {} });
  220. }
  221. for _ in 0..SIZE {
  222. let val = rx.recv().await.unwrap();
  223. criterion::black_box(&val);
  224. }
  225. })
  226. },
  227. );
  228. }
  229. group.finish();
  230. }
  231. fn rt() -> tokio::runtime::Runtime {
  232. runtime::Builder::new_multi_thread().build().unwrap()
  233. }
  234. criterion_group!(benches, bench_mpsc_reusable, bench_mpsc_integer,);
  235. criterion_main!(benches);