async_spsc.rs 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413
  1. use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion, Throughput};
  2. use tokio::{runtime, task};
  3. /// This benchmark simulates sending a bunch of strings over a channel. It's
  4. /// intended to simulate the sort of workload that a `thingbuf` is intended
  5. /// for, where the type of element in the buffer is expensive to allocate,
  6. /// copy, or drop, but they can be re-used in place without
  7. /// allocating/deallocating.
  8. ///
  9. /// So, this may not be strictly representative of performance in the case of,
  10. /// say, sending a bunch of integers over the channel; instead it simulates
  11. /// the kind of scenario that `thingbuf` is optimized for.
  12. fn bench_spsc_try_send_reusable(c: &mut Criterion) {
  13. let mut group = c.benchmark_group("async/spsc/try_send_reusable");
  14. static THE_STRING: &str = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa\
  15. aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa\
  16. aaaaaaaaaaaaaa";
  17. for size in [100, 500, 1_000, 5_000, 10_000] {
  18. group.throughput(Throughput::Elements(size));
  19. group.bench_with_input(BenchmarkId::new("ThingBuf", size), &size, |b, &i| {
  20. let rt = runtime::Builder::new_current_thread().build().unwrap();
  21. b.to_async(rt).iter(|| async {
  22. use thingbuf::{
  23. mpsc::{self, TrySendError},
  24. ThingBuf,
  25. };
  26. let (tx, rx) = mpsc::channel(ThingBuf::<String>::new(100));
  27. task::spawn(async move {
  28. loop {
  29. match tx.try_send_ref() {
  30. Ok(mut slot) => {
  31. slot.clear();
  32. slot.push_str(THE_STRING);
  33. }
  34. Err(TrySendError::Closed(_)) => break,
  35. _ => task::yield_now().await,
  36. }
  37. }
  38. });
  39. for _ in 0..i {
  40. let val = rx.recv_ref().await.unwrap();
  41. criterion::black_box(&*val);
  42. }
  43. })
  44. });
  45. group.bench_with_input(
  46. BenchmarkId::new("futures::channel::mpsc", size),
  47. &size,
  48. |b, &i| {
  49. let rt = runtime::Builder::new_current_thread().build().unwrap();
  50. b.to_async(rt).iter(|| async {
  51. use futures::{channel::mpsc, stream::StreamExt};
  52. let (mut tx, mut rx) = mpsc::channel(100);
  53. task::spawn(async move {
  54. loop {
  55. match tx.try_send(String::from(THE_STRING)) {
  56. Ok(()) => {}
  57. Err(e) if e.is_disconnected() => break,
  58. _ => task::yield_now().await,
  59. }
  60. }
  61. });
  62. for _ in 0..i {
  63. let val = rx.next().await.unwrap();
  64. criterion::black_box(&val);
  65. }
  66. })
  67. },
  68. );
  69. group.bench_with_input(
  70. BenchmarkId::new("tokio::sync::mpsc", size),
  71. &size,
  72. |b, &i| {
  73. let rt = runtime::Builder::new_current_thread().build().unwrap();
  74. b.to_async(rt).iter(|| {
  75. // turn off Tokio's automatic cooperative yielding for this
  76. // benchmark. in code with a large number of concurrent
  77. // tasks, this feature makes the MPSC channel (and other
  78. // Tokio synchronization primitives) better "team players"
  79. // than other implementations, since it prevents them from
  80. // using too much scheduler time.
  81. //
  82. // in this benchmark, though, there *are* no other tasks
  83. // running, so automatic yielding just means we spend more
  84. // time ping-ponging through the scheduler than every other
  85. // implementation.
  86. tokio::task::unconstrained(async {
  87. use tokio::sync::mpsc::{self, error::TrySendError};
  88. let (tx, mut rx) = mpsc::channel(100);
  89. task::spawn(tokio::task::unconstrained(async move {
  90. loop {
  91. // this actually brings Tokio's MPSC closer to what
  92. // `ThingBuf` can do than all the other impls --- we
  93. // only allocate if we _were_ able to reserve send
  94. // capacity. but, we will still allocate and
  95. // deallocate a string for every message...
  96. match tx.try_reserve() {
  97. Ok(permit) => permit.send(String::from(THE_STRING)),
  98. Err(TrySendError::Closed(_)) => break,
  99. _ => task::yield_now().await,
  100. }
  101. }
  102. }));
  103. for _ in 0..i {
  104. let val = rx.recv().await.unwrap();
  105. criterion::black_box(&val);
  106. }
  107. })
  108. })
  109. },
  110. );
  111. group.bench_with_input(
  112. BenchmarkId::new("async_std::channel::bounded", size),
  113. &size,
  114. |b, &i| {
  115. let rt = runtime::Builder::new_current_thread().build().unwrap();
  116. b.to_async(rt).iter(|| async {
  117. use async_std::channel::{self, TrySendError};
  118. let (tx, rx) = channel::bounded(100);
  119. task::spawn(async move {
  120. loop {
  121. match tx.try_send(String::from(THE_STRING)) {
  122. Ok(()) => {}
  123. Err(TrySendError::Closed(_)) => break,
  124. _ => task::yield_now().await,
  125. }
  126. }
  127. });
  128. for _ in 0..i {
  129. let val = rx.recv().await.unwrap();
  130. criterion::black_box(&val);
  131. }
  132. })
  133. },
  134. );
  135. }
  136. group.finish();
  137. }
  138. fn bench_spsc_reusable(c: &mut Criterion) {
  139. let mut group = c.benchmark_group("async/spsc/reusable");
  140. static THE_STRING: &str = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa\
  141. aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa\
  142. aaaaaaaaaaaaaa";
  143. for size in [100, 500, 1_000, 5_000, 10_000] {
  144. group.throughput(Throughput::Elements(size));
  145. group.bench_with_input(BenchmarkId::new("ThingBuf", size), &size, |b, &i| {
  146. let rt = runtime::Builder::new_current_thread().build().unwrap();
  147. b.to_async(rt).iter(|| async {
  148. use thingbuf::{
  149. mpsc::{self, TrySendError},
  150. ThingBuf,
  151. };
  152. let (tx, rx) = mpsc::channel(ThingBuf::<String>::new(100));
  153. task::spawn(async move {
  154. loop {
  155. match tx.send_ref().await {
  156. Ok(mut slot) => {
  157. slot.clear();
  158. slot.push_str(THE_STRING);
  159. }
  160. Err(_) => break,
  161. }
  162. }
  163. });
  164. for _ in 0..i {
  165. let val = rx.recv_ref().await.unwrap();
  166. criterion::black_box(&*val);
  167. }
  168. })
  169. });
  170. group.bench_with_input(
  171. BenchmarkId::new("futures::channel::mpsc", size),
  172. &size,
  173. |b, &i| {
  174. let rt = runtime::Builder::new_current_thread().build().unwrap();
  175. b.to_async(rt).iter(|| async {
  176. use futures::{channel::mpsc, sink::SinkExt, stream::StreamExt};
  177. let (mut tx, mut rx) = mpsc::channel(100);
  178. task::spawn(async move {
  179. loop {
  180. match tx.send(String::from(THE_STRING)).await {
  181. Ok(_) => {}
  182. Err(_) => break,
  183. }
  184. }
  185. });
  186. for _ in 0..i {
  187. let val = rx.next().await.unwrap();
  188. criterion::black_box(&val);
  189. }
  190. })
  191. },
  192. );
  193. group.bench_with_input(
  194. BenchmarkId::new("tokio::sync::mpsc", size),
  195. &size,
  196. |b, &i| {
  197. let rt = runtime::Builder::new_current_thread().build().unwrap();
  198. b.to_async(rt).iter(|| {
  199. // turn off Tokio's automatic cooperative yielding for this
  200. // benchmark. in code with a large number of concurrent
  201. // tasks, this feature makes the MPSC channel (and other
  202. // Tokio synchronization primitives) better "team players"
  203. // than other implementations, since it prevents them from
  204. // using too much scheduler time.
  205. //
  206. // in this benchmark, though, there *are* no other tasks
  207. // running, so automatic yielding just means we spend more
  208. // time ping-ponging through the scheduler than every other
  209. // implementation.
  210. tokio::task::unconstrained(async {
  211. use tokio::sync::mpsc::{self, error::TrySendError};
  212. let (tx, mut rx) = mpsc::channel(100);
  213. task::spawn(tokio::task::unconstrained(async move {
  214. loop {
  215. // this actually brings Tokio's MPSC closer to what
  216. // `ThingBuf` can do than all the other impls --- we
  217. // only allocate if we _were_ able to reserve send
  218. // capacity. but, we will still allocate and
  219. // deallocate a string for every message...
  220. match tx.reserve().await {
  221. Ok(permit) => permit.send(String::from(THE_STRING)),
  222. Err(_) => break,
  223. }
  224. }
  225. }));
  226. for _ in 0..i {
  227. let val = rx.recv().await.unwrap();
  228. criterion::black_box(&val);
  229. }
  230. })
  231. })
  232. },
  233. );
  234. group.bench_with_input(
  235. BenchmarkId::new("async_std::channel::bounded", size),
  236. &size,
  237. |b, &i| {
  238. let rt = runtime::Builder::new_current_thread().build().unwrap();
  239. b.to_async(rt).iter(|| async {
  240. use async_std::channel;
  241. let (tx, rx) = channel::bounded(100);
  242. task::spawn(async move {
  243. loop {
  244. match tx.send(String::from(THE_STRING)).await {
  245. Ok(_) => {}
  246. Err(_) => break,
  247. }
  248. }
  249. });
  250. for _ in 0..i {
  251. let val = rx.recv().await.unwrap();
  252. criterion::black_box(&val);
  253. }
  254. })
  255. },
  256. );
  257. }
  258. group.finish();
  259. }
  260. /// The same benchmark, but with integers. Messages are not heap allocated, so
  261. /// non-thingbuf channel impls are not burdened by allocator churn for messages.
  262. fn bench_spsc_try_send_integer(c: &mut Criterion) {
  263. let mut group = c.benchmark_group("async/spsc/try_send_integer");
  264. for size in [100, 500, 1_000, 5_000, 10_000] {
  265. group.throughput(Throughput::Elements(size));
  266. group.bench_with_input(BenchmarkId::new("ThingBuf", size), &size, |b, &i| {
  267. let rt = runtime::Builder::new_current_thread().build().unwrap();
  268. b.to_async(rt).iter(|| async {
  269. use thingbuf::{
  270. mpsc::{self, TrySendError},
  271. ThingBuf,
  272. };
  273. let (tx, rx) = mpsc::channel(ThingBuf::new(100));
  274. task::spawn(async move {
  275. let mut i = 0;
  276. loop {
  277. match tx.try_send(i) {
  278. Ok(()) => {
  279. i += 1;
  280. }
  281. Err(TrySendError::Closed(_)) => break,
  282. _ => task::yield_now().await,
  283. }
  284. }
  285. });
  286. for n in 0..i {
  287. let val = rx.recv().await.unwrap();
  288. assert_eq!(n, val);
  289. }
  290. })
  291. });
  292. group.bench_with_input(
  293. BenchmarkId::new("futures::channel::mpsc", size),
  294. &size,
  295. |b, &i| {
  296. let rt = runtime::Builder::new_current_thread().build().unwrap();
  297. b.to_async(rt).iter(|| async {
  298. use futures::{channel::mpsc, stream::StreamExt};
  299. let (mut tx, mut rx) = mpsc::channel(100);
  300. task::spawn(async move {
  301. let mut i = 0;
  302. loop {
  303. match tx.try_send(i) {
  304. Ok(()) => {
  305. i += 1;
  306. }
  307. Err(e) if e.is_disconnected() => break,
  308. _ => task::yield_now().await,
  309. }
  310. }
  311. });
  312. for n in 0..i {
  313. let val = rx.next().await.unwrap();
  314. assert_eq!(n, val);
  315. }
  316. })
  317. },
  318. );
  319. group.bench_with_input(
  320. BenchmarkId::new("tokio::sync::mpsc", size),
  321. &size,
  322. |b, &i| {
  323. let rt = runtime::Builder::new_current_thread().build().unwrap();
  324. b.to_async(rt).iter(|| {
  325. // turn off Tokio's automatic cooperative yielding for this
  326. // benchmark. in code with a large number of concurrent
  327. // tasks, this feature makes the MPSC channel (and other
  328. // Tokio synchronization primitives) better "team players"
  329. // than other implementations, since it prevents them from
  330. // using too much scheduler time.
  331. //
  332. // in this benchmark, though, there *are* no other tasks
  333. // running, so automatic yielding just means we spend more
  334. // time ping-ponging through the scheduler than every other
  335. // implementation.
  336. tokio::task::unconstrained(async {
  337. use tokio::sync::mpsc::{self, error::TrySendError};
  338. let (tx, mut rx) = mpsc::channel(100);
  339. task::spawn(tokio::task::unconstrained(async move {
  340. let mut i = 0;
  341. loop {
  342. match tx.try_send(i) {
  343. Ok(()) => {
  344. i += 1;
  345. }
  346. Err(TrySendError::Closed(_)) => break,
  347. _ => task::yield_now().await,
  348. }
  349. }
  350. }));
  351. for n in 0..i {
  352. let val = rx.recv().await.unwrap();
  353. assert_eq!(n, val);
  354. }
  355. })
  356. })
  357. },
  358. );
  359. group.bench_with_input(
  360. BenchmarkId::new("async_std::channel::bounded", size),
  361. &size,
  362. |b, &i| {
  363. let rt = runtime::Builder::new_current_thread().build().unwrap();
  364. b.to_async(rt).iter(|| async {
  365. use async_std::channel::{self, TrySendError};
  366. let (tx, rx) = channel::bounded(100);
  367. task::spawn(async move {
  368. let mut i = 0;
  369. loop {
  370. match tx.try_send(i) {
  371. Ok(()) => {
  372. i += 1;
  373. }
  374. Err(TrySendError::Closed(_)) => break,
  375. _ => task::yield_now().await,
  376. }
  377. }
  378. });
  379. for n in 0..i {
  380. let val = rx.recv().await.unwrap();
  381. assert_eq!(n, val);
  382. }
  383. })
  384. },
  385. );
  386. }
  387. group.finish();
  388. }
  389. criterion_group!(
  390. benches,
  391. bench_spsc_try_send_reusable,
  392. bench_spsc_try_send_integer,
  393. bench_spsc_reusable,
  394. );
  395. criterion_main!(benches);