async_mpsc.rs 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283
  1. use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion, Throughput};
  2. use tokio::{runtime, task};
  3. /// This benchmark simulates sending a bunch of strings over a channel. It's
  4. /// intended to simulate the sort of workload that a `thingbuf` is intended
  5. /// for, where the type of element in the buffer is expensive to allocate,
  6. /// copy, or drop, but they can be re-used in place without
  7. /// allocating/deallocating.
  8. ///
  9. /// So, this may not be strictly representative of performance in the case of,
  10. /// say, sending a bunch of integers over the channel; instead it simulates
  11. /// the kind of scenario that `thingbuf` is optimized for.
  12. fn bench_spsc_reusable(c: &mut Criterion) {
  13. let mut group = c.benchmark_group("async/spsc_reusable");
  14. static THE_STRING: &str = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa\
  15. aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa\
  16. aaaaaaaaaaaaaa";
  17. for size in [100, 500, 1_000, 5_000, 10_000] {
  18. group.throughput(Throughput::Elements(size));
  19. group.bench_with_input(BenchmarkId::new("ThingBuf", size), &size, |b, &i| {
  20. let rt = runtime::Builder::new_current_thread().build().unwrap();
  21. b.to_async(rt).iter(|| async {
  22. use thingbuf::{
  23. mpsc::{self, TrySendError},
  24. ThingBuf,
  25. };
  26. let (tx, rx) = mpsc::channel(ThingBuf::new(100));
  27. task::spawn(async move {
  28. loop {
  29. match tx.try_send_ref() {
  30. Ok(mut r) => r.with_mut(|s: &mut String| {
  31. s.clear();
  32. s.push_str(THE_STRING)
  33. }),
  34. Err(TrySendError::Closed(_)) => break,
  35. _ => task::yield_now().await,
  36. }
  37. }
  38. });
  39. for _ in 0..i {
  40. let r = rx.recv_ref().await.unwrap();
  41. r.with(|val| {
  42. criterion::black_box(val);
  43. });
  44. }
  45. })
  46. });
  47. group.bench_with_input(
  48. BenchmarkId::new("futures::channel::mpsc", size),
  49. &size,
  50. |b, &i| {
  51. let rt = runtime::Builder::new_current_thread().build().unwrap();
  52. b.to_async(rt).iter(|| async {
  53. use futures::{channel::mpsc, stream::StreamExt};
  54. let (mut tx, mut rx) = mpsc::channel(100);
  55. task::spawn(async move {
  56. loop {
  57. match tx.try_send(String::from(THE_STRING)) {
  58. Ok(()) => {}
  59. Err(e) if e.is_disconnected() => break,
  60. _ => task::yield_now().await,
  61. }
  62. }
  63. });
  64. for _ in 0..i {
  65. let val = rx.next().await.unwrap();
  66. criterion::black_box(&val);
  67. }
  68. })
  69. },
  70. );
  71. group.bench_with_input(
  72. BenchmarkId::new("tokio::sync::mpsc", size),
  73. &size,
  74. |b, &i| {
  75. let rt = runtime::Builder::new_current_thread().build().unwrap();
  76. b.to_async(rt).iter(|| {
  77. // turn off Tokio's automatic cooperative yielding for this
  78. // benchmark. in code with a large number of concurrent
  79. // tasks, this feature makes the MPSC channel (and other
  80. // Tokio synchronization primitives) better "team players"
  81. // than other implementations, since it prevents them from
  82. // using too much scheduler time.
  83. //
  84. // in this benchmark, though, there *are* no other tasks
  85. // running, so automatic yielding just means we spend more
  86. // time ping-ponging through the scheduler than every other
  87. // implementation.
  88. tokio::task::unconstrained(async {
  89. use tokio::sync::mpsc::{self, error::TrySendError};
  90. let (tx, mut rx) = mpsc::channel(100);
  91. task::spawn(tokio::task::unconstrained(async move {
  92. loop {
  93. // this actually brings Tokio's MPSC closer to what
  94. // `ThingBuf` can do than all the other impls --- we
  95. // only allocate if we _were_ able to reserve send
  96. // capacity. but, we will still allocate and
  97. // deallocate a string for every message...
  98. match tx.try_reserve() {
  99. Ok(permit) => permit.send(String::from(THE_STRING)),
  100. Err(TrySendError::Closed(_)) => break,
  101. _ => task::yield_now().await,
  102. }
  103. }
  104. }));
  105. for _ in 0..i {
  106. let val = rx.recv().await.unwrap();
  107. criterion::black_box(&val);
  108. }
  109. })
  110. })
  111. },
  112. );
  113. group.bench_with_input(
  114. BenchmarkId::new("async_std::channel::bounded", size),
  115. &size,
  116. |b, &i| {
  117. let rt = runtime::Builder::new_current_thread().build().unwrap();
  118. b.to_async(rt).iter(|| async {
  119. use async_std::channel::{self, TrySendError};
  120. let (tx, rx) = channel::bounded(100);
  121. task::spawn(async move {
  122. loop {
  123. match tx.try_send(String::from(THE_STRING)) {
  124. Ok(()) => {}
  125. Err(TrySendError::Closed(_)) => break,
  126. _ => task::yield_now().await,
  127. }
  128. }
  129. });
  130. for _ in 0..i {
  131. let val = rx.recv().await.unwrap();
  132. criterion::black_box(&val);
  133. }
  134. })
  135. },
  136. );
  137. }
  138. group.finish();
  139. }
  140. /// The same benchmark, but with integers. Messages are not heap allocated, so
  141. /// non-thingbuf channel impls are not burdened by allocator churn for messages.
  142. fn bench_spsc_integer(c: &mut Criterion) {
  143. let mut group = c.benchmark_group("async/spsc_integer");
  144. for size in [100, 500, 1_000, 5_000, 10_000] {
  145. group.throughput(Throughput::Elements(size));
  146. group.bench_with_input(BenchmarkId::new("ThingBuf", size), &size, |b, &i| {
  147. let rt = runtime::Builder::new_current_thread().build().unwrap();
  148. b.to_async(rt).iter(|| async {
  149. use thingbuf::{
  150. mpsc::{self, TrySendError},
  151. ThingBuf,
  152. };
  153. let (tx, rx) = mpsc::channel(ThingBuf::new(100));
  154. task::spawn(async move {
  155. let mut i = 0;
  156. loop {
  157. match tx.try_send(i) {
  158. Ok(()) => {
  159. i += 1;
  160. }
  161. Err(TrySendError::Closed(_)) => break,
  162. _ => task::yield_now().await,
  163. }
  164. }
  165. });
  166. for n in 0..i {
  167. let val = rx.recv().await.unwrap();
  168. assert_eq!(n, val);
  169. }
  170. })
  171. });
  172. group.bench_with_input(
  173. BenchmarkId::new("futures::channel::mpsc", size),
  174. &size,
  175. |b, &i| {
  176. let rt = runtime::Builder::new_current_thread().build().unwrap();
  177. b.to_async(rt).iter(|| async {
  178. use futures::{channel::mpsc, stream::StreamExt};
  179. let (mut tx, mut rx) = mpsc::channel(100);
  180. task::spawn(async move {
  181. let mut i = 0;
  182. loop {
  183. match tx.try_send(i) {
  184. Ok(()) => {
  185. i += 1;
  186. }
  187. Err(e) if e.is_disconnected() => break,
  188. _ => task::yield_now().await,
  189. }
  190. }
  191. });
  192. for n in 0..i {
  193. let val = rx.next().await.unwrap();
  194. assert_eq!(n, val);
  195. }
  196. })
  197. },
  198. );
  199. group.bench_with_input(
  200. BenchmarkId::new("tokio::sync::mpsc", size),
  201. &size,
  202. |b, &i| {
  203. let rt = runtime::Builder::new_current_thread().build().unwrap();
  204. b.to_async(rt).iter(|| {
  205. // turn off Tokio's automatic cooperative yielding for this
  206. // benchmark. in code with a large number of concurrent
  207. // tasks, this feature makes the MPSC channel (and other
  208. // Tokio synchronization primitives) better "team players"
  209. // than other implementations, since it prevents them from
  210. // using too much scheduler time.
  211. //
  212. // in this benchmark, though, there *are* no other tasks
  213. // running, so automatic yielding just means we spend more
  214. // time ping-ponging through the scheduler than every other
  215. // implementation.
  216. tokio::task::unconstrained(async {
  217. use tokio::sync::mpsc::{self, error::TrySendError};
  218. let (tx, mut rx) = mpsc::channel(100);
  219. task::spawn(tokio::task::unconstrained(async move {
  220. let mut i = 0;
  221. loop {
  222. match tx.try_send(i) {
  223. Ok(()) => {
  224. i += 1;
  225. }
  226. Err(TrySendError::Closed(_)) => break,
  227. _ => task::yield_now().await,
  228. }
  229. }
  230. }));
  231. for n in 0..i {
  232. let val = rx.recv().await.unwrap();
  233. assert_eq!(n, val);
  234. }
  235. })
  236. })
  237. },
  238. );
  239. group.bench_with_input(
  240. BenchmarkId::new("async_std::channel::bounded", size),
  241. &size,
  242. |b, &i| {
  243. let rt = runtime::Builder::new_current_thread().build().unwrap();
  244. b.to_async(rt).iter(|| async {
  245. use async_std::channel::{self, TrySendError};
  246. let (tx, rx) = channel::bounded(100);
  247. task::spawn(async move {
  248. let mut i = 0;
  249. loop {
  250. match tx.try_send(i) {
  251. Ok(()) => {
  252. i += 1;
  253. }
  254. Err(TrySendError::Closed(_)) => break,
  255. _ => task::yield_now().await,
  256. }
  257. }
  258. });
  259. for n in 0..i {
  260. let val = rx.recv().await.unwrap();
  261. assert_eq!(n, val);
  262. }
  263. })
  264. },
  265. );
  266. }
  267. group.finish();
  268. }
  269. criterion_group!(benches, bench_spsc_reusable, bench_spsc_integer);
  270. criterion_main!(benches);