async_mpsc.rs 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257
  1. use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion, Throughput};
  2. use tokio::{runtime, task};
  3. /// This benchmark simulates sending a bunch of strings over a channel. It's
  4. /// intended to simulate the sort of workload that a `thingbuf` is intended
  5. /// for, where the type of element in the buffer is expensive to allocate,
  6. /// copy, or drop, but they can be re-used in place without
  7. /// allocating/deallocating.
  8. ///
  9. /// So, this may not be strictly representative of performance in the case of,
  10. /// say, sending a bunch of integers over the channel; instead it simulates
  11. /// the kind of scenario that `thingbuf` is optimized for.
  12. fn bench_spsc_reusable(c: &mut Criterion) {
  13. let mut group = c.benchmark_group("async/spsc_reusable");
  14. static THE_STRING: &str = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa\
  15. aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa\
  16. aaaaaaaaaaaaaa";
  17. for size in [100, 500, 1_000, 5_000, 10_000] {
  18. group.throughput(Throughput::Elements(size));
  19. group.bench_with_input(BenchmarkId::new("ThingBuf", size), &size, |b, &i| {
  20. let rt = runtime::Builder::new_current_thread().build().unwrap();
  21. b.to_async(rt).iter(|| async {
  22. use thingbuf::{
  23. mpsc::{self, TrySendError},
  24. ThingBuf,
  25. };
  26. let (tx, rx) = mpsc::channel(ThingBuf::new(100));
  27. task::spawn(async move {
  28. loop {
  29. match tx.try_send_ref() {
  30. Ok(mut r) => r.with_mut(|s: &mut String| {
  31. s.clear();
  32. s.push_str(THE_STRING)
  33. }),
  34. Err(TrySendError::Closed(_)) => break,
  35. _ => task::yield_now().await,
  36. }
  37. }
  38. });
  39. for _ in 0..i {
  40. let r = rx.recv_ref().await.unwrap();
  41. r.with(|val| {
  42. criterion::black_box(val);
  43. });
  44. }
  45. })
  46. });
  47. group.bench_with_input(
  48. BenchmarkId::new("futures::channel::mpsc", size),
  49. &size,
  50. |b, &i| {
  51. let rt = runtime::Builder::new_current_thread().build().unwrap();
  52. b.to_async(rt).iter(|| async {
  53. use futures::{channel::mpsc, stream::StreamExt};
  54. let (mut tx, mut rx) = mpsc::channel(100);
  55. task::spawn(async move {
  56. loop {
  57. match tx.try_send(String::from(THE_STRING)) {
  58. Ok(()) => {}
  59. Err(e) if e.is_disconnected() => break,
  60. _ => task::yield_now().await,
  61. }
  62. }
  63. });
  64. for _ in 0..i {
  65. let val = rx.next().await.unwrap();
  66. criterion::black_box(&val);
  67. }
  68. })
  69. },
  70. );
  71. group.bench_with_input(
  72. BenchmarkId::new("tokio::sync::mpsc", size),
  73. &size,
  74. |b, &i| {
  75. let rt = runtime::Builder::new_current_thread().build().unwrap();
  76. b.to_async(rt).iter(|| async {
  77. use tokio::sync::mpsc::{self, error::TrySendError};
  78. let (tx, mut rx) = mpsc::channel(100);
  79. task::spawn(async move {
  80. loop {
  81. // this actually brings Tokio's MPSC closer to what
  82. // `ThingBuf` can do than all the other impls --- we
  83. // only allocate if we _were_ able to reserve send
  84. // capacity. but, we will still allocate and
  85. // deallocate a string for every message...
  86. match tx.try_reserve() {
  87. Ok(permit) => permit.send(String::from(THE_STRING)),
  88. Err(TrySendError::Closed(_)) => break,
  89. _ => task::yield_now().await,
  90. }
  91. }
  92. });
  93. for _ in 0..i {
  94. let val = rx.recv().await.unwrap();
  95. criterion::black_box(&val);
  96. }
  97. })
  98. },
  99. );
  100. group.bench_with_input(
  101. BenchmarkId::new("async_std::channel::bounded", size),
  102. &size,
  103. |b, &i| {
  104. let rt = runtime::Builder::new_current_thread().build().unwrap();
  105. b.to_async(rt).iter(|| async {
  106. use async_std::channel::{self, TrySendError};
  107. let (tx, rx) = channel::bounded(100);
  108. task::spawn(async move {
  109. loop {
  110. match tx.try_send(String::from(THE_STRING)) {
  111. Ok(()) => {}
  112. Err(TrySendError::Closed(_)) => break,
  113. _ => task::yield_now().await,
  114. }
  115. }
  116. });
  117. for _ in 0..i {
  118. let val = rx.recv().await.unwrap();
  119. criterion::black_box(&val);
  120. }
  121. })
  122. },
  123. );
  124. }
  125. group.finish();
  126. }
  127. /// The same benchmark, but with integers. Messages are not heap allocated, so
  128. /// non-thingbuf channel impls are not burdened by allocator churn for messages.
  129. fn bench_spsc_integer(c: &mut Criterion) {
  130. let mut group = c.benchmark_group("async/spsc_integer");
  131. for size in [100, 500, 1_000, 5_000, 10_000] {
  132. group.throughput(Throughput::Elements(size));
  133. group.bench_with_input(BenchmarkId::new("ThingBuf", size), &size, |b, &i| {
  134. let rt = runtime::Builder::new_current_thread().build().unwrap();
  135. b.to_async(rt).iter(|| async {
  136. use thingbuf::{
  137. mpsc::{self, TrySendError},
  138. ThingBuf,
  139. };
  140. let (tx, rx) = mpsc::channel(ThingBuf::new(100));
  141. task::spawn(async move {
  142. let mut i = 0;
  143. loop {
  144. match tx.try_send(i) {
  145. Ok(()) => {
  146. i += 1;
  147. }
  148. Err(TrySendError::Closed(_)) => break,
  149. _ => task::yield_now().await,
  150. }
  151. }
  152. });
  153. for n in 0..i {
  154. let val = rx.recv().await.unwrap();
  155. assert_eq!(n, val);
  156. }
  157. })
  158. });
  159. group.bench_with_input(
  160. BenchmarkId::new("futures::channel::mpsc", size),
  161. &size,
  162. |b, &i| {
  163. let rt = runtime::Builder::new_current_thread().build().unwrap();
  164. b.to_async(rt).iter(|| async {
  165. use futures::{channel::mpsc, stream::StreamExt};
  166. let (mut tx, mut rx) = mpsc::channel(100);
  167. task::spawn(async move {
  168. let mut i = 0;
  169. loop {
  170. match tx.try_send(i) {
  171. Ok(()) => {
  172. i += 1;
  173. }
  174. Err(e) if e.is_disconnected() => break,
  175. _ => task::yield_now().await,
  176. }
  177. }
  178. });
  179. for n in 0..i {
  180. let val = rx.next().await.unwrap();
  181. assert_eq!(n, val);
  182. }
  183. })
  184. },
  185. );
  186. group.bench_with_input(
  187. BenchmarkId::new("tokio::sync::mpsc", size),
  188. &size,
  189. |b, &i| {
  190. let rt = runtime::Builder::new_current_thread().build().unwrap();
  191. b.to_async(rt).iter(|| async {
  192. use tokio::sync::mpsc::{self, error::TrySendError};
  193. let (tx, mut rx) = mpsc::channel(100);
  194. task::spawn(async move {
  195. let mut i = 0;
  196. loop {
  197. match tx.try_send(i) {
  198. Ok(()) => {
  199. i += 1;
  200. }
  201. Err(TrySendError::Closed(_)) => break,
  202. _ => task::yield_now().await,
  203. }
  204. }
  205. });
  206. for n in 0..i {
  207. let val = rx.recv().await.unwrap();
  208. assert_eq!(n, val);
  209. }
  210. })
  211. },
  212. );
  213. group.bench_with_input(
  214. BenchmarkId::new("async_std::channel::bounded", size),
  215. &size,
  216. |b, &i| {
  217. let rt = runtime::Builder::new_current_thread().build().unwrap();
  218. b.to_async(rt).iter(|| async {
  219. use async_std::channel::{self, TrySendError};
  220. let (tx, rx) = channel::bounded(100);
  221. task::spawn(async move {
  222. let mut i = 0;
  223. loop {
  224. match tx.try_send(i) {
  225. Ok(()) => {
  226. i += 1;
  227. }
  228. Err(TrySendError::Closed(_)) => break,
  229. _ => task::yield_now().await,
  230. }
  231. }
  232. });
  233. for n in 0..i {
  234. let val = rx.recv().await.unwrap();
  235. assert_eq!(n, val);
  236. }
  237. })
  238. },
  239. );
  240. }
  241. group.finish();
  242. }
  243. criterion_group!(benches, bench_spsc_reusable, bench_spsc_integer);
  244. criterion_main!(benches);