async_spsc.rs 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403
  1. use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion, Throughput};
  2. use tokio::{runtime, task};
  3. /// This benchmark simulates sending a bunch of strings over a channel. It's
  4. /// intended to simulate the sort of workload that a `thingbuf` is intended
  5. /// for, where the type of element in the buffer is expensive to allocate,
  6. /// copy, or drop, but they can be re-used in place without
  7. /// allocating/deallocating.
  8. ///
  9. /// So, this may not be strictly representative of performance in the case of,
  10. /// say, sending a bunch of integers over the channel; instead it simulates
  11. /// the kind of scenario that `thingbuf` is optimized for.
  12. fn bench_spsc_try_send_reusable(c: &mut Criterion) {
  13. let mut group = c.benchmark_group("async/spsc/try_send_reusable");
  14. static THE_STRING: &str = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa\
  15. aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa\
  16. aaaaaaaaaaaaaa";
  17. for size in [100, 500, 1_000, 5_000, 10_000] {
  18. group.throughput(Throughput::Elements(size));
  19. group.bench_with_input(BenchmarkId::new("ThingBuf", size), &size, |b, &i| {
  20. let rt = runtime::Builder::new_current_thread().build().unwrap();
  21. b.to_async(rt).iter(|| async {
  22. use thingbuf::{
  23. mpsc::{self, TrySendError},
  24. ThingBuf,
  25. };
  26. let (tx, rx) = mpsc::channel(ThingBuf::<String>::new(100));
  27. task::spawn(async move {
  28. loop {
  29. match tx.try_send_ref() {
  30. Ok(mut slot) => {
  31. slot.clear();
  32. slot.push_str(THE_STRING);
  33. }
  34. Err(TrySendError::Closed(_)) => break,
  35. _ => task::yield_now().await,
  36. }
  37. }
  38. });
  39. for _ in 0..i {
  40. let val = rx.recv_ref().await.unwrap();
  41. criterion::black_box(&*val);
  42. }
  43. })
  44. });
  45. #[cfg(feature = "futures")]
  46. group.bench_with_input(
  47. BenchmarkId::new("futures::channel::mpsc", size),
  48. &size,
  49. |b, &i| {
  50. let rt = runtime::Builder::new_current_thread().build().unwrap();
  51. b.to_async(rt).iter(|| async {
  52. use futures::{channel::mpsc, stream::StreamExt};
  53. let (mut tx, mut rx) = mpsc::channel(100);
  54. task::spawn(async move {
  55. loop {
  56. match tx.try_send(String::from(THE_STRING)) {
  57. Ok(()) => {}
  58. Err(e) if e.is_disconnected() => break,
  59. _ => task::yield_now().await,
  60. }
  61. }
  62. });
  63. for _ in 0..i {
  64. let val = rx.next().await.unwrap();
  65. criterion::black_box(&val);
  66. }
  67. })
  68. },
  69. );
  70. #[cfg(feature = "tokio-sync")]
  71. group.bench_with_input(
  72. BenchmarkId::new("tokio::sync::mpsc", size),
  73. &size,
  74. |b, &i| {
  75. let rt = runtime::Builder::new_current_thread().build().unwrap();
  76. b.to_async(rt).iter(|| {
  77. // turn off Tokio's automatic cooperative yielding for this
  78. // benchmark. in code with a large number of concurrent
  79. // tasks, this feature makes the MPSC channel (and other
  80. // Tokio synchronization primitives) better "team players"
  81. // than other implementations, since it prevents them from
  82. // using too much scheduler time.
  83. //
  84. // in this benchmark, though, there *are* no other tasks
  85. // running, so automatic yielding just means we spend more
  86. // time ping-ponging through the scheduler than every other
  87. // implementation.
  88. tokio::task::unconstrained(async {
  89. use tokio::sync::mpsc::{self, error::TrySendError};
  90. let (tx, mut rx) = mpsc::channel(100);
  91. task::spawn(tokio::task::unconstrained(async move {
  92. loop {
  93. // this actually brings Tokio's MPSC closer to what
  94. // `ThingBuf` can do than all the other impls --- we
  95. // only allocate if we _were_ able to reserve send
  96. // capacity. but, we will still allocate and
  97. // deallocate a string for every message...
  98. match tx.try_reserve() {
  99. Ok(permit) => permit.send(String::from(THE_STRING)),
  100. Err(TrySendError::Closed(_)) => break,
  101. _ => task::yield_now().await,
  102. }
  103. }
  104. }));
  105. for _ in 0..i {
  106. let val = rx.recv().await.unwrap();
  107. criterion::black_box(&val);
  108. }
  109. })
  110. })
  111. },
  112. );
  113. #[cfg(feature = "async-std")]
  114. group.bench_with_input(
  115. BenchmarkId::new("async_std::channel::bounded", size),
  116. &size,
  117. |b, &i| {
  118. let rt = runtime::Builder::new_current_thread().build().unwrap();
  119. b.to_async(rt).iter(|| async {
  120. use async_std::channel::{self, TrySendError};
  121. let (tx, rx) = channel::bounded(100);
  122. task::spawn(async move {
  123. loop {
  124. match tx.try_send(String::from(THE_STRING)) {
  125. Ok(()) => {}
  126. Err(TrySendError::Closed(_)) => break,
  127. _ => task::yield_now().await,
  128. }
  129. }
  130. });
  131. for _ in 0..i {
  132. let val = rx.recv().await.unwrap();
  133. criterion::black_box(&val);
  134. }
  135. })
  136. },
  137. );
  138. }
  139. group.finish();
  140. }
  141. fn bench_spsc_reusable(c: &mut Criterion) {
  142. let mut group = c.benchmark_group("async/spsc/reusable");
  143. static THE_STRING: &str = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa\
  144. aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa\
  145. aaaaaaaaaaaaaa";
  146. for size in [100, 500, 1_000, 5_000, 10_000] {
  147. group.throughput(Throughput::Elements(size));
  148. group.bench_with_input(BenchmarkId::new("ThingBuf", size), &size, |b, &i| {
  149. let rt = runtime::Builder::new_current_thread().build().unwrap();
  150. b.to_async(rt).iter(|| async {
  151. use thingbuf::{mpsc, ThingBuf};
  152. let (tx, rx) = mpsc::channel(ThingBuf::<String>::new(100));
  153. task::spawn(async move {
  154. while let Ok(mut slot) = tx.send_ref().await {
  155. slot.clear();
  156. slot.push_str(THE_STRING);
  157. }
  158. });
  159. for _ in 0..i {
  160. let val = rx.recv_ref().await.unwrap();
  161. criterion::black_box(&*val);
  162. }
  163. })
  164. });
  165. #[cfg(feature = "futures")]
  166. group.bench_with_input(
  167. BenchmarkId::new("futures::channel::mpsc", size),
  168. &size,
  169. |b, &i| {
  170. let rt = runtime::Builder::new_current_thread().build().unwrap();
  171. b.to_async(rt).iter(|| async {
  172. use futures::{channel::mpsc, sink::SinkExt, stream::StreamExt};
  173. let (mut tx, mut rx) = mpsc::channel(100);
  174. task::spawn(
  175. async move { while tx.send(String::from(THE_STRING)).await.is_ok() {} },
  176. );
  177. for _ in 0..i {
  178. let val = rx.next().await.unwrap();
  179. criterion::black_box(&val);
  180. }
  181. })
  182. },
  183. );
  184. #[cfg(feature = "tokio-sync")]
  185. group.bench_with_input(
  186. BenchmarkId::new("tokio::sync::mpsc", size),
  187. &size,
  188. |b, &i| {
  189. let rt = runtime::Builder::new_current_thread().build().unwrap();
  190. b.to_async(rt).iter(|| {
  191. // turn off Tokio's automatic cooperative yielding for this
  192. // benchmark. in code with a large number of concurrent
  193. // tasks, this feature makes the MPSC channel (and other
  194. // Tokio synchronization primitives) better "team players"
  195. // than other implementations, since it prevents them from
  196. // using too much scheduler time.
  197. //
  198. // in this benchmark, though, there *are* no other tasks
  199. // running, so automatic yielding just means we spend more
  200. // time ping-ponging through the scheduler than every other
  201. // implementation.
  202. tokio::task::unconstrained(async {
  203. use tokio::sync::mpsc;
  204. let (tx, mut rx) = mpsc::channel(100);
  205. task::spawn(tokio::task::unconstrained(async move {
  206. // this actually brings Tokio's MPSC closer to what
  207. // `ThingBuf` can do than all the other impls --- we
  208. // only allocate if we _were_ able to reserve send
  209. // capacity. but, we will still allocate and
  210. // deallocate a string for every message...
  211. while let Ok(permit) = tx.reserve().await {
  212. permit.send(String::from(THE_STRING));
  213. }
  214. }));
  215. for _ in 0..i {
  216. let val = rx.recv().await.unwrap();
  217. criterion::black_box(&val);
  218. }
  219. })
  220. })
  221. },
  222. );
  223. #[cfg(feature = "async-std")]
  224. group.bench_with_input(
  225. BenchmarkId::new("async_std::channel::bounded", size),
  226. &size,
  227. |b, &i| {
  228. let rt = runtime::Builder::new_current_thread().build().unwrap();
  229. b.to_async(rt).iter(|| async {
  230. use async_std::channel;
  231. let (tx, rx) = channel::bounded(100);
  232. task::spawn(
  233. async move { while tx.send(String::from(THE_STRING)).await.is_ok() {} },
  234. );
  235. for _ in 0..i {
  236. let val = rx.recv().await.unwrap();
  237. criterion::black_box(&val);
  238. }
  239. })
  240. },
  241. );
  242. }
  243. group.finish();
  244. }
  245. /// The same benchmark, but with integers. Messages are not heap allocated, so
  246. /// non-thingbuf channel impls are not burdened by allocator churn for messages.
  247. fn bench_spsc_try_send_integer(c: &mut Criterion) {
  248. let mut group = c.benchmark_group("async/spsc/try_send_integer");
  249. for size in [100, 500, 1_000, 5_000, 10_000] {
  250. group.throughput(Throughput::Elements(size));
  251. group.bench_with_input(BenchmarkId::new("ThingBuf", size), &size, |b, &i| {
  252. let rt = runtime::Builder::new_current_thread().build().unwrap();
  253. b.to_async(rt).iter(|| async {
  254. use thingbuf::{
  255. mpsc::{self, TrySendError},
  256. ThingBuf,
  257. };
  258. let (tx, rx) = mpsc::channel(ThingBuf::new(100));
  259. task::spawn(async move {
  260. let mut i = 0;
  261. loop {
  262. match tx.try_send(i) {
  263. Ok(()) => {
  264. i += 1;
  265. }
  266. Err(TrySendError::Closed(_)) => break,
  267. _ => task::yield_now().await,
  268. }
  269. }
  270. });
  271. for n in 0..i {
  272. let val = rx.recv().await.unwrap();
  273. assert_eq!(n, val);
  274. }
  275. })
  276. });
  277. #[cfg(feature = "futures")]
  278. group.bench_with_input(
  279. BenchmarkId::new("futures::channel::mpsc", size),
  280. &size,
  281. |b, &i| {
  282. let rt = runtime::Builder::new_current_thread().build().unwrap();
  283. b.to_async(rt).iter(|| async {
  284. use futures::{channel::mpsc, stream::StreamExt};
  285. let (mut tx, mut rx) = mpsc::channel(100);
  286. task::spawn(async move {
  287. let mut i = 0;
  288. loop {
  289. match tx.try_send(i) {
  290. Ok(()) => {
  291. i += 1;
  292. }
  293. Err(e) if e.is_disconnected() => break,
  294. _ => task::yield_now().await,
  295. }
  296. }
  297. });
  298. for n in 0..i {
  299. let val = rx.next().await.unwrap();
  300. assert_eq!(n, val);
  301. }
  302. })
  303. },
  304. );
  305. #[cfg(feature = "tokio-sync")]
  306. group.bench_with_input(
  307. BenchmarkId::new("tokio::sync::mpsc", size),
  308. &size,
  309. |b, &i| {
  310. let rt = runtime::Builder::new_current_thread().build().unwrap();
  311. b.to_async(rt).iter(|| {
  312. // turn off Tokio's automatic cooperative yielding for this
  313. // benchmark. in code with a large number of concurrent
  314. // tasks, this feature makes the MPSC channel (and other
  315. // Tokio synchronization primitives) better "team players"
  316. // than other implementations, since it prevents them from
  317. // using too much scheduler time.
  318. //
  319. // in this benchmark, though, there *are* no other tasks
  320. // running, so automatic yielding just means we spend more
  321. // time ping-ponging through the scheduler than every other
  322. // implementation.
  323. tokio::task::unconstrained(async {
  324. use tokio::sync::mpsc::{self, error::TrySendError};
  325. let (tx, mut rx) = mpsc::channel(100);
  326. task::spawn(tokio::task::unconstrained(async move {
  327. let mut i = 0;
  328. loop {
  329. match tx.try_send(i) {
  330. Ok(()) => {
  331. i += 1;
  332. }
  333. Err(TrySendError::Closed(_)) => break,
  334. _ => task::yield_now().await,
  335. }
  336. }
  337. }));
  338. for n in 0..i {
  339. let val = rx.recv().await.unwrap();
  340. assert_eq!(n, val);
  341. }
  342. })
  343. })
  344. },
  345. );
  346. #[cfg(feature = "async-std")]
  347. group.bench_with_input(
  348. BenchmarkId::new("async_std::channel::bounded", size),
  349. &size,
  350. |b, &i| {
  351. let rt = runtime::Builder::new_current_thread().build().unwrap();
  352. b.to_async(rt).iter(|| async {
  353. use async_std::channel::{self, TrySendError};
  354. let (tx, rx) = channel::bounded(100);
  355. task::spawn(async move {
  356. let mut i = 0;
  357. loop {
  358. match tx.try_send(i) {
  359. Ok(()) => {
  360. i += 1;
  361. }
  362. Err(TrySendError::Closed(_)) => break,
  363. _ => task::yield_now().await,
  364. }
  365. }
  366. });
  367. for n in 0..i {
  368. let val = rx.recv().await.unwrap();
  369. assert_eq!(n, val);
  370. }
  371. })
  372. },
  373. );
  374. }
  375. group.finish();
  376. }
  377. criterion_group!(
  378. benches,
  379. bench_spsc_try_send_reusable,
  380. bench_spsc_try_send_integer,
  381. bench_spsc_reusable,
  382. );
  383. criterion_main!(benches);