async_mpsc.rs 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302
  1. use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion, Throughput};
  2. use tokio::{runtime, task};
  3. /// This benchmark simulates sending a bunch of strings over a channel. It's
  4. /// intended to simulate the sort of workload that a `thingbuf` is intended
  5. /// for, where the type of element in the buffer is expensive to allocate,
  6. /// copy, or drop, but they can be re-used in place without
  7. /// allocating/deallocating.
  8. ///
  9. /// So, this may not be strictly representative of performance in the case of,
  10. /// say, sending a bunch of integers over the channel; instead it simulates
  11. /// the kind of scenario that `thingbuf` is optimized for.
  12. fn bench_mpsc_reusable(c: &mut Criterion) {
  13. let mut group = c.benchmark_group("async/mpsc_reusable");
  14. static THE_STRING: &str = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa\
  15. aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa\
  16. aaaaaaaaaaaaaa";
  17. const SIZE: u64 = 100;
  18. group.throughput(Throughput::Elements(SIZE));
  19. for senders in [10, 50, 100] {
  20. group.bench_with_input(
  21. BenchmarkId::new("ThingBuf", senders),
  22. &senders,
  23. |b, &senders| {
  24. b.to_async(rt()).iter(|| async {
  25. use thingbuf::{mpsc, ThingBuf};
  26. let (tx, rx) = mpsc::channel(ThingBuf::<String>::new(100));
  27. for _ in 0..senders {
  28. let tx = tx.clone();
  29. task::spawn(async move {
  30. loop {
  31. match tx.send_ref().await {
  32. Ok(mut slot) => {
  33. slot.clear();
  34. slot.push_str(THE_STRING);
  35. }
  36. Err(_) => break,
  37. }
  38. }
  39. });
  40. }
  41. for _ in 0..SIZE {
  42. let val = rx.recv_ref().await.unwrap();
  43. criterion::black_box(&*val);
  44. }
  45. })
  46. },
  47. );
  48. #[cfg(feature = "futures")]
  49. group.bench_with_input(
  50. BenchmarkId::new("futures::channel::mpsc", senders),
  51. &senders,
  52. |b, &senders| {
  53. b.to_async(rt()).iter(|| async {
  54. use futures::{channel::mpsc, sink::SinkExt, stream::StreamExt};
  55. let (tx, mut rx) = mpsc::channel(100);
  56. for _ in 0..senders {
  57. let mut tx = tx.clone();
  58. task::spawn(async move {
  59. loop {
  60. match tx.send(String::from(THE_STRING)).await {
  61. Ok(_) => {}
  62. Err(_) => break,
  63. }
  64. }
  65. });
  66. }
  67. for _ in 0..SIZE {
  68. let val = rx.next().await.unwrap();
  69. criterion::black_box(&val);
  70. }
  71. })
  72. },
  73. );
  74. #[cfg(feature = "tokio-sync")]
  75. group.bench_with_input(
  76. BenchmarkId::new("tokio::sync::mpsc", senders),
  77. &senders,
  78. |b, &senders| {
  79. b.to_async(rt()).iter(|| {
  80. // turn off Tokio's automatic cooperative yielding for this
  81. // benchmark. in code with a large number of concurrent
  82. // tasks, this feature makes the MPSC channel (and other
  83. // Tokio synchronization primitives) better "team players"
  84. // than other implementations, since it prevents them from
  85. // using too much scheduler time.
  86. //
  87. // in this benchmark, though, there *are* no other tasks
  88. // running, so automatic yielding just means we spend more
  89. // time ping-ponging through the scheduler than every other
  90. // implementation.
  91. tokio::task::unconstrained(async {
  92. use tokio::sync::mpsc;
  93. let (tx, mut rx) = mpsc::channel(100);
  94. for _ in 0..senders {
  95. let tx = tx.clone();
  96. task::spawn(tokio::task::unconstrained(async move {
  97. loop {
  98. // this actually brings Tokio's MPSC closer to what
  99. // `ThingBuf` can do than all the other impls --- we
  100. // only allocate if we _were_ able to reserve send
  101. // capacity. but, we will still allocate and
  102. // deallocate a string for every message...
  103. match tx.reserve().await {
  104. Ok(permit) => permit.send(String::from(THE_STRING)),
  105. Err(_) => break,
  106. }
  107. }
  108. }));
  109. }
  110. for _ in 0..SIZE {
  111. let val = rx.recv().await.unwrap();
  112. criterion::black_box(&val);
  113. }
  114. })
  115. })
  116. },
  117. );
  118. #[cfg(feature = "async-std")]
  119. group.bench_with_input(
  120. BenchmarkId::new("async_std::channel::bounded", senders),
  121. &senders,
  122. |b, &senders| {
  123. b.to_async(rt()).iter(|| async {
  124. use async_std::channel;
  125. let (tx, rx) = channel::bounded(100);
  126. for _ in 0..senders {
  127. let tx = tx.clone();
  128. task::spawn(async move {
  129. loop {
  130. match tx.send(String::from(THE_STRING)).await {
  131. Ok(_) => {}
  132. Err(_) => break,
  133. }
  134. }
  135. });
  136. }
  137. for _ in 0..SIZE {
  138. let val = rx.recv().await.unwrap();
  139. criterion::black_box(&val);
  140. }
  141. })
  142. },
  143. );
  144. }
  145. group.finish();
  146. }
  147. fn bench_mpsc_integer(c: &mut Criterion) {
  148. let mut group = c.benchmark_group("async/mpsc_integer");
  149. const SIZE: u64 = 1_000;
  150. for senders in [10, 50, 100] {
  151. group.throughput(Throughput::Elements(SIZE));
  152. group.bench_with_input(
  153. BenchmarkId::new("ThingBuf", senders),
  154. &senders,
  155. |b, &senders| {
  156. b.to_async(rt()).iter(|| async {
  157. use thingbuf::{mpsc, ThingBuf};
  158. let (tx, rx) = mpsc::channel(ThingBuf::new(100));
  159. for i in 0..senders {
  160. let tx = tx.clone();
  161. task::spawn(async move {
  162. loop {
  163. match tx.send_ref().await {
  164. Ok(mut slot) => {
  165. *slot = i;
  166. }
  167. Err(_) => break,
  168. }
  169. }
  170. });
  171. }
  172. for _ in 0..SIZE {
  173. let val = rx.recv_ref().await.unwrap();
  174. criterion::black_box(&*val);
  175. }
  176. })
  177. },
  178. );
  179. #[cfg(feature = "futures")]
  180. group.bench_with_input(
  181. BenchmarkId::new("futures::channel::mpsc", senders),
  182. &senders,
  183. |b, &senders| {
  184. b.to_async(rt()).iter(|| async {
  185. use futures::{channel::mpsc, sink::SinkExt, stream::StreamExt};
  186. let (tx, mut rx) = mpsc::channel(100);
  187. for i in 0..senders {
  188. let mut tx = tx.clone();
  189. task::spawn(async move {
  190. loop {
  191. match tx.send(i).await {
  192. Ok(_) => {}
  193. Err(_) => break,
  194. }
  195. }
  196. });
  197. }
  198. for _ in 0..SIZE {
  199. let val = rx.next().await.unwrap();
  200. criterion::black_box(&val);
  201. }
  202. })
  203. },
  204. );
  205. #[cfg(feature = "tokio-sync")]
  206. group.bench_with_input(
  207. BenchmarkId::new("tokio::sync::mpsc", senders),
  208. &senders,
  209. |b, &senders| {
  210. b.to_async(rt()).iter(|| {
  211. // turn off Tokio's automatic cooperative yielding for this
  212. // benchmark. in code with a large number of concurrent
  213. // tasks, this feature makes the MPSC channel (and other
  214. // Tokio synchronization primitives) better "team players"
  215. // than other implementations, since it prevents them from
  216. // using too much scheduler time.
  217. //
  218. // in this benchmark, though, there *are* no other tasks
  219. // running, so automatic yielding just means we spend more
  220. // time ping-ponging through the scheduler than every other
  221. // implementation.
  222. tokio::task::unconstrained(async {
  223. use tokio::sync::mpsc;
  224. let (tx, mut rx) = mpsc::channel(100);
  225. for i in 0..senders {
  226. let tx = tx.clone();
  227. task::spawn(tokio::task::unconstrained(async move {
  228. loop {
  229. // this actually brings Tokio's MPSC closer to what
  230. // `ThingBuf` can do than all the other impls --- we
  231. // only allocate if we _were_ able to reserve send
  232. // capacity. but, we will still allocate and
  233. // deallocate a string for every message...
  234. match tx.send(i).await {
  235. Ok(_) => {}
  236. Err(_) => break,
  237. }
  238. }
  239. }));
  240. }
  241. for _ in 0..SIZE {
  242. let val = rx.recv().await.unwrap();
  243. criterion::black_box(&val);
  244. }
  245. })
  246. })
  247. },
  248. );
  249. #[cfg(feature = "async-std")]
  250. group.bench_with_input(
  251. BenchmarkId::new("async_std::channel::bounded", senders),
  252. &senders,
  253. |b, &senders| {
  254. b.to_async(rt()).iter(|| async {
  255. use async_std::channel;
  256. let (tx, rx) = channel::bounded(100);
  257. for i in 0..senders {
  258. let tx = tx.clone();
  259. task::spawn(async move {
  260. loop {
  261. match tx.send(i).await {
  262. Ok(_) => {}
  263. Err(_) => break,
  264. }
  265. }
  266. });
  267. }
  268. for _ in 0..SIZE {
  269. let val = rx.recv().await.unwrap();
  270. criterion::black_box(&val);
  271. }
  272. })
  273. },
  274. );
  275. }
  276. group.finish();
  277. }
  278. fn rt() -> tokio::runtime::Runtime {
  279. runtime::Builder::new_multi_thread().build().unwrap()
  280. }
  281. criterion_group!(benches, bench_mpsc_reusable, bench_mpsc_integer,);
  282. criterion_main!(benches);