async_spsc.rs 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398
  1. use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion, Throughput};
  2. use tokio::{runtime, task};
  3. /// This benchmark simulates sending a bunch of strings over a channel. It's
  4. /// intended to simulate the sort of workload that a `thingbuf` is intended
  5. /// for, where the type of element in the buffer is expensive to allocate,
  6. /// copy, or drop, but they can be re-used in place without
  7. /// allocating/deallocating.
  8. ///
  9. /// So, this may not be strictly representative of performance in the case of,
  10. /// say, sending a bunch of integers over the channel; instead it simulates
  11. /// the kind of scenario that `thingbuf` is optimized for.
  12. fn bench_spsc_try_send_reusable(c: &mut Criterion) {
  13. let mut group = c.benchmark_group("async/spsc/try_send_reusable");
  14. static THE_STRING: &str = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa\
  15. aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa\
  16. aaaaaaaaaaaaaa";
  17. for size in [100, 500, 1_000, 5_000, 10_000] {
  18. group.throughput(Throughput::Elements(size));
  19. group.bench_with_input(BenchmarkId::new("ThingBuf", size), &size, |b, &i| {
  20. let rt = runtime::Builder::new_current_thread().build().unwrap();
  21. b.to_async(rt).iter(|| async {
  22. use thingbuf::mpsc::{self, errors::TrySendError};
  23. let (tx, rx) = mpsc::channel::<String>(100);
  24. task::spawn(async move {
  25. loop {
  26. match tx.try_send_ref() {
  27. Ok(mut slot) => {
  28. slot.clear();
  29. slot.push_str(THE_STRING);
  30. }
  31. Err(TrySendError::Closed(_)) => break,
  32. _ => task::yield_now().await,
  33. }
  34. }
  35. });
  36. for _ in 0..i {
  37. let val = rx.recv_ref().await.unwrap();
  38. criterion::black_box(&*val);
  39. }
  40. })
  41. });
  42. #[cfg(feature = "futures")]
  43. group.bench_with_input(
  44. BenchmarkId::new("futures::channel::mpsc", size),
  45. &size,
  46. |b, &i| {
  47. let rt = runtime::Builder::new_current_thread().build().unwrap();
  48. b.to_async(rt).iter(|| async {
  49. use futures::{channel::mpsc, stream::StreamExt};
  50. let (mut tx, mut rx) = mpsc::channel(100);
  51. task::spawn(async move {
  52. loop {
  53. match tx.try_send(String::from(THE_STRING)) {
  54. Ok(()) => {}
  55. Err(e) if e.is_disconnected() => break,
  56. _ => task::yield_now().await,
  57. }
  58. }
  59. });
  60. for _ in 0..i {
  61. let val = rx.next().await.unwrap();
  62. criterion::black_box(&val);
  63. }
  64. })
  65. },
  66. );
  67. #[cfg(feature = "tokio-sync")]
  68. group.bench_with_input(
  69. BenchmarkId::new("tokio::sync::mpsc", size),
  70. &size,
  71. |b, &i| {
  72. let rt = runtime::Builder::new_current_thread().build().unwrap();
  73. b.to_async(rt).iter(|| {
  74. // turn off Tokio's automatic cooperative yielding for this
  75. // benchmark. in code with a large number of concurrent
  76. // tasks, this feature makes the MPSC channel (and other
  77. // Tokio synchronization primitives) better "team players"
  78. // than other implementations, since it prevents them from
  79. // using too much scheduler time.
  80. //
  81. // in this benchmark, though, there *are* no other tasks
  82. // running, so automatic yielding just means we spend more
  83. // time ping-ponging through the scheduler than every other
  84. // implementation.
  85. tokio::task::unconstrained(async {
  86. use tokio::sync::mpsc::{self, error::TrySendError};
  87. let (tx, mut rx) = mpsc::channel(100);
  88. task::spawn(tokio::task::unconstrained(async move {
  89. loop {
  90. // this actually brings Tokio's MPSC closer to what
  91. // `ThingBuf` can do than all the other impls --- we
  92. // only allocate if we _were_ able to reserve send
  93. // capacity. but, we will still allocate and
  94. // deallocate a string for every message...
  95. match tx.try_reserve() {
  96. Ok(permit) => permit.send(String::from(THE_STRING)),
  97. Err(TrySendError::Closed(_)) => break,
  98. _ => task::yield_now().await,
  99. }
  100. }
  101. }));
  102. for _ in 0..i {
  103. let val = rx.recv().await.unwrap();
  104. criterion::black_box(&val);
  105. }
  106. })
  107. })
  108. },
  109. );
  110. #[cfg(feature = "async-std")]
  111. group.bench_with_input(
  112. BenchmarkId::new("async_std::channel::bounded", size),
  113. &size,
  114. |b, &i| {
  115. let rt = runtime::Builder::new_current_thread().build().unwrap();
  116. b.to_async(rt).iter(|| async {
  117. use async_std::channel::{self, TrySendError};
  118. let (tx, rx) = channel::bounded(100);
  119. task::spawn(async move {
  120. loop {
  121. match tx.try_send(String::from(THE_STRING)) {
  122. Ok(()) => {}
  123. Err(TrySendError::Closed(_)) => break,
  124. _ => task::yield_now().await,
  125. }
  126. }
  127. });
  128. for _ in 0..i {
  129. let val = rx.recv().await.unwrap();
  130. criterion::black_box(&val);
  131. }
  132. })
  133. },
  134. );
  135. }
  136. group.finish();
  137. }
  138. fn bench_spsc_reusable(c: &mut Criterion) {
  139. let mut group = c.benchmark_group("async/spsc/reusable");
  140. static THE_STRING: &str = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa\
  141. aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa\
  142. aaaaaaaaaaaaaa";
  143. for size in [100, 500, 1_000, 5_000, 10_000] {
  144. group.throughput(Throughput::Elements(size));
  145. group.bench_with_input(BenchmarkId::new("ThingBuf", size), &size, |b, &i| {
  146. let rt = runtime::Builder::new_current_thread().build().unwrap();
  147. b.to_async(rt).iter(|| async {
  148. use thingbuf::mpsc;
  149. let (tx, rx) = mpsc::channel::<String>(100);
  150. task::spawn(async move {
  151. while let Ok(mut slot) = tx.send_ref().await {
  152. slot.clear();
  153. slot.push_str(THE_STRING);
  154. }
  155. });
  156. for _ in 0..i {
  157. let val = rx.recv_ref().await.unwrap();
  158. criterion::black_box(&*val);
  159. }
  160. })
  161. });
  162. #[cfg(feature = "futures")]
  163. group.bench_with_input(
  164. BenchmarkId::new("futures::channel::mpsc", size),
  165. &size,
  166. |b, &i| {
  167. let rt = runtime::Builder::new_current_thread().build().unwrap();
  168. b.to_async(rt).iter(|| async {
  169. use futures::{channel::mpsc, sink::SinkExt, stream::StreamExt};
  170. let (mut tx, mut rx) = mpsc::channel(100);
  171. task::spawn(
  172. async move { while tx.send(String::from(THE_STRING)).await.is_ok() {} },
  173. );
  174. for _ in 0..i {
  175. let val = rx.next().await.unwrap();
  176. criterion::black_box(&val);
  177. }
  178. })
  179. },
  180. );
  181. #[cfg(feature = "tokio-sync")]
  182. group.bench_with_input(
  183. BenchmarkId::new("tokio::sync::mpsc", size),
  184. &size,
  185. |b, &i| {
  186. let rt = runtime::Builder::new_current_thread().build().unwrap();
  187. b.to_async(rt).iter(|| {
  188. // turn off Tokio's automatic cooperative yielding for this
  189. // benchmark. in code with a large number of concurrent
  190. // tasks, this feature makes the MPSC channel (and other
  191. // Tokio synchronization primitives) better "team players"
  192. // than other implementations, since it prevents them from
  193. // using too much scheduler time.
  194. //
  195. // in this benchmark, though, there *are* no other tasks
  196. // running, so automatic yielding just means we spend more
  197. // time ping-ponging through the scheduler than every other
  198. // implementation.
  199. tokio::task::unconstrained(async {
  200. use tokio::sync::mpsc;
  201. let (tx, mut rx) = mpsc::channel(100);
  202. task::spawn(tokio::task::unconstrained(async move {
  203. // this actually brings Tokio's MPSC closer to what
  204. // `ThingBuf` can do than all the other impls --- we
  205. // only allocate if we _were_ able to reserve send
  206. // capacity. but, we will still allocate and
  207. // deallocate a string for every message...
  208. while let Ok(permit) = tx.reserve().await {
  209. permit.send(String::from(THE_STRING));
  210. }
  211. }));
  212. for _ in 0..i {
  213. let val = rx.recv().await.unwrap();
  214. criterion::black_box(&val);
  215. }
  216. })
  217. })
  218. },
  219. );
  220. #[cfg(feature = "async-std")]
  221. group.bench_with_input(
  222. BenchmarkId::new("async_std::channel::bounded", size),
  223. &size,
  224. |b, &i| {
  225. let rt = runtime::Builder::new_current_thread().build().unwrap();
  226. b.to_async(rt).iter(|| async {
  227. use async_std::channel;
  228. let (tx, rx) = channel::bounded(100);
  229. task::spawn(
  230. async move { while tx.send(String::from(THE_STRING)).await.is_ok() {} },
  231. );
  232. for _ in 0..i {
  233. let val = rx.recv().await.unwrap();
  234. criterion::black_box(&val);
  235. }
  236. })
  237. },
  238. );
  239. }
  240. group.finish();
  241. }
  242. /// The same benchmark, but with integers. Messages are not heap allocated, so
  243. /// non-thingbuf channel impls are not burdened by allocator churn for messages.
  244. fn bench_spsc_try_send_integer(c: &mut Criterion) {
  245. let mut group = c.benchmark_group("async/spsc/try_send_integer");
  246. for size in [100, 500, 1_000, 5_000, 10_000] {
  247. group.throughput(Throughput::Elements(size));
  248. group.bench_with_input(BenchmarkId::new("ThingBuf", size), &size, |b, &i| {
  249. let rt = runtime::Builder::new_current_thread().build().unwrap();
  250. b.to_async(rt).iter(|| async {
  251. use thingbuf::mpsc::{self, errors::TrySendError};
  252. let (tx, rx) = mpsc::channel(100);
  253. task::spawn(async move {
  254. let mut i = 0;
  255. loop {
  256. match tx.try_send(i) {
  257. Ok(()) => {
  258. i += 1;
  259. }
  260. Err(TrySendError::Closed(_)) => break,
  261. _ => task::yield_now().await,
  262. }
  263. }
  264. });
  265. for n in 0..i {
  266. let val = rx.recv().await.unwrap();
  267. assert_eq!(n, val);
  268. }
  269. })
  270. });
  271. #[cfg(feature = "futures")]
  272. group.bench_with_input(
  273. BenchmarkId::new("futures::channel::mpsc", size),
  274. &size,
  275. |b, &i| {
  276. let rt = runtime::Builder::new_current_thread().build().unwrap();
  277. b.to_async(rt).iter(|| async {
  278. use futures::{channel::mpsc, stream::StreamExt};
  279. let (mut tx, mut rx) = mpsc::channel(100);
  280. task::spawn(async move {
  281. let mut i = 0;
  282. loop {
  283. match tx.try_send(i) {
  284. Ok(()) => {
  285. i += 1;
  286. }
  287. Err(e) if e.is_disconnected() => break,
  288. _ => task::yield_now().await,
  289. }
  290. }
  291. });
  292. for n in 0..i {
  293. let val = rx.next().await.unwrap();
  294. assert_eq!(n, val);
  295. }
  296. })
  297. },
  298. );
  299. #[cfg(feature = "tokio-sync")]
  300. group.bench_with_input(
  301. BenchmarkId::new("tokio::sync::mpsc", size),
  302. &size,
  303. |b, &i| {
  304. let rt = runtime::Builder::new_current_thread().build().unwrap();
  305. b.to_async(rt).iter(|| {
  306. // turn off Tokio's automatic cooperative yielding for this
  307. // benchmark. in code with a large number of concurrent
  308. // tasks, this feature makes the MPSC channel (and other
  309. // Tokio synchronization primitives) better "team players"
  310. // than other implementations, since it prevents them from
  311. // using too much scheduler time.
  312. //
  313. // in this benchmark, though, there *are* no other tasks
  314. // running, so automatic yielding just means we spend more
  315. // time ping-ponging through the scheduler than every other
  316. // implementation.
  317. tokio::task::unconstrained(async {
  318. use tokio::sync::mpsc::{self, error::TrySendError};
  319. let (tx, mut rx) = mpsc::channel(100);
  320. task::spawn(tokio::task::unconstrained(async move {
  321. let mut i = 0;
  322. loop {
  323. match tx.try_send(i) {
  324. Ok(()) => {
  325. i += 1;
  326. }
  327. Err(TrySendError::Closed(_)) => break,
  328. _ => task::yield_now().await,
  329. }
  330. }
  331. }));
  332. for n in 0..i {
  333. let val = rx.recv().await.unwrap();
  334. assert_eq!(n, val);
  335. }
  336. })
  337. })
  338. },
  339. );
  340. #[cfg(feature = "async-std")]
  341. group.bench_with_input(
  342. BenchmarkId::new("async_std::channel::bounded", size),
  343. &size,
  344. |b, &i| {
  345. let rt = runtime::Builder::new_current_thread().build().unwrap();
  346. b.to_async(rt).iter(|| async {
  347. use async_std::channel::{self, TrySendError};
  348. let (tx, rx) = channel::bounded(100);
  349. task::spawn(async move {
  350. let mut i = 0;
  351. loop {
  352. match tx.try_send(i) {
  353. Ok(()) => {
  354. i += 1;
  355. }
  356. Err(TrySendError::Closed(_)) => break,
  357. _ => task::yield_now().await,
  358. }
  359. }
  360. });
  361. for n in 0..i {
  362. let val = rx.recv().await.unwrap();
  363. assert_eq!(n, val);
  364. }
  365. })
  366. },
  367. );
  368. }
  369. group.finish();
  370. }
  371. criterion_group!(
  372. benches,
  373. bench_spsc_try_send_reusable,
  374. bench_spsc_try_send_integer,
  375. bench_spsc_reusable,
  376. );
  377. criterion_main!(benches);