ring_buffer.rs 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580
  1. use core::cmp;
  2. use managed::{Managed, ManagedSlice};
  3. use {Error, Result};
  4. use super::Resettable;
  5. /// A ring buffer.
  6. ///
  7. /// This ring buffer implementation provides many ways to interact with it:
  8. ///
  9. /// * Enqueueing or dequeueing one element from corresponding side of the buffer;
  10. /// * Enqueueing or dequeueing a slice of elements from corresponding side of the buffer;
  11. /// * Accessing allocated and unallocated areas directly.
  12. ///
  13. /// It is also zero-copy; all methods provide references into the buffer's storage.
  14. /// Note that all references are mutable; it is considered more important to allow
  15. /// in-place processing than to protect from accidental mutation.
  16. ///
  17. /// This implementation is suitable for both simple uses such as a FIFO queue
  18. /// of UDP packets, and advanced ones such as a TCP reassembly buffer.
  19. #[derive(Debug)]
  20. pub struct RingBuffer<'a, T: 'a> {
  21. storage: ManagedSlice<'a, T>,
  22. read_at: usize,
  23. length: usize,
  24. }
  25. impl<'a, T: 'a> RingBuffer<'a, T> {
  26. /// Create a ring buffer with the given storage.
  27. ///
  28. /// During creation, every element in `storage` is reset.
  29. pub fn new<S>(storage: S) -> RingBuffer<'a, T>
  30. where S: Into<ManagedSlice<'a, T>>,
  31. {
  32. RingBuffer {
  33. storage: storage.into(),
  34. read_at: 0,
  35. length: 0,
  36. }
  37. }
  38. /// Clear the ring buffer.
  39. pub fn clear(&mut self) {
  40. self.read_at = 0;
  41. self.length = 0;
  42. }
  43. /// Return the maximum number of elements in the ring buffer.
  44. pub fn capacity(&self) -> usize {
  45. self.storage.len()
  46. }
  47. /// Clear the ring buffer, and reset every element.
  48. pub fn reset(&mut self)
  49. where T: Resettable {
  50. self.clear();
  51. for elem in self.storage.iter_mut() {
  52. elem.reset();
  53. }
  54. }
  55. /// Return the current number of elements in the ring buffer.
  56. pub fn len(&self) -> usize {
  57. self.length
  58. }
  59. /// Return the number of elements that can be added to the ring buffer.
  60. pub fn window(&self) -> usize {
  61. self.capacity() - self.len()
  62. }
  63. /// Query whether the buffer is empty.
  64. pub fn is_empty(&self) -> bool {
  65. self.len() == 0
  66. }
  67. /// Query whether the buffer is full.
  68. pub fn is_full(&self) -> bool {
  69. self.window() == 0
  70. }
  71. }
  72. /// This is the "discrete" ring buffer interface: it operates with single elements,
  73. /// and boundary conditions (empty/full) are errors.
  74. impl<'a, T: 'a> RingBuffer<'a, T> {
  75. /// Call `f` with a single buffer element, and enqueue the element if `f`
  76. /// returns successfully, or return `Err(Error::Exhausted)` if the buffer is full.
  77. pub fn enqueue_one_with<'b, R, F>(&'b mut self, f: F) -> Result<R>
  78. where F: FnOnce(&'b mut T) -> Result<R> {
  79. if self.is_full() { return Err(Error::Exhausted) }
  80. let index = (self.read_at + self.length) % self.capacity();
  81. match f(&mut self.storage[index]) {
  82. Ok(result) => {
  83. self.length += 1;
  84. Ok(result)
  85. }
  86. Err(error) => Err(error)
  87. }
  88. }
  89. /// Enqueue a single element into the buffer, and return a reference to it,
  90. /// or return `Err(Error::Exhausted)` if the buffer is full.
  91. ///
  92. /// This function is a shortcut for `ring_buf.enqueue_one_with(Ok)`.
  93. pub fn enqueue_one<'b>(&'b mut self) -> Result<&'b mut T> {
  94. self.enqueue_one_with(Ok)
  95. }
  96. /// Call `f` with a single buffer element, and dequeue the element if `f`
  97. /// returns successfully, or return `Err(Error::Exhausted)` if the buffer is empty.
  98. pub fn dequeue_one_with<'b, R, F>(&'b mut self, f: F) -> Result<R>
  99. where F: FnOnce(&'b mut T) -> Result<R> {
  100. if self.is_empty() { return Err(Error::Exhausted) }
  101. let next_at = (self.read_at + 1) % self.capacity();
  102. match f(&mut self.storage[self.read_at]) {
  103. Ok(result) => {
  104. self.length -= 1;
  105. self.read_at = next_at;
  106. Ok(result)
  107. }
  108. Err(error) => Err(error)
  109. }
  110. }
  111. /// Dequeue an element from the buffer, and return a reference to it,
  112. /// or return `Err(Error::Exhausted)` if the buffer is empty.
  113. ///
  114. /// This function is a shortcut for `ring_buf.dequeue_one_with(Ok)`.
  115. pub fn dequeue_one(&mut self) -> Result<&mut T> {
  116. self.dequeue_one_with(Ok)
  117. }
  118. }
  119. /// This is the "continuous" ring buffer interface: it operates with element slices,
  120. /// and boundary conditions (empty/full) simply result in empty slices.
  121. impl<'a, T: 'a> RingBuffer<'a, T> {
  122. /// Call `f` with the largest contiguous slice of unallocated buffer elements,
  123. /// and enqueue the amount of elements returned by `f`.
  124. ///
  125. /// # Panics
  126. /// This function panics if the amount of elements returned by `f` is larger
  127. /// than the size of the slice passed into it.
  128. pub fn enqueue_many_with<'b, R, F>(&'b mut self, f: F) -> (usize, R)
  129. where F: FnOnce(&'b mut [T]) -> (usize, R) {
  130. let write_at = (self.read_at + self.length) % self.capacity();
  131. let max_size = cmp::min(self.window(), self.capacity() - write_at);
  132. let (size, result) = f(&mut self.storage[write_at..write_at + max_size]);
  133. assert!(size <= max_size);
  134. self.length += size;
  135. (size, result)
  136. }
  137. /// Enqueue a slice of elements up to the given size into the buffer,
  138. /// and return a reference to them.
  139. ///
  140. /// This function may return a slice smaller than the given size
  141. /// if the free space in the buffer is not contiguous.
  142. pub fn enqueue_many<'b>(&'b mut self, size: usize) -> &'b mut [T] {
  143. self.enqueue_many_with(|buf| {
  144. let size = cmp::min(size, buf.len());
  145. (size, &mut buf[..size])
  146. }).1
  147. }
  148. /// Enqueue as many elements from the given slice into the buffer as possible,
  149. /// and return the amount of elements that could fit.
  150. pub fn enqueue_slice(&mut self, data: &[T]) -> usize
  151. where T: Copy {
  152. let (size_1, data) = self.enqueue_many_with(|buf| {
  153. let size = cmp::min(buf.len(), data.len());
  154. buf[..size].copy_from_slice(&data[..size]);
  155. (size, &data[size..])
  156. });
  157. let (size_2, ()) = self.enqueue_many_with(|buf| {
  158. let size = cmp::min(buf.len(), data.len());
  159. buf[..size].copy_from_slice(&data[..size]);
  160. (size, ())
  161. });
  162. size_1 + size_2
  163. }
  164. /// Call `f` with the largest contiguous slice of allocated buffer elements,
  165. /// and dequeue the amount of elements returned by `f`.
  166. ///
  167. /// # Panics
  168. /// This function panics if the amount of elements returned by `f` is larger
  169. /// than the size of the slice passed into it.
  170. pub fn dequeue_many_with<'b, R, F>(&'b mut self, f: F) -> (usize, R)
  171. where F: FnOnce(&'b mut [T]) -> (usize, R) {
  172. let capacity = self.capacity();
  173. let max_size = cmp::min(self.len(), capacity - self.read_at);
  174. let (size, result) = f(&mut self.storage[self.read_at..self.read_at + max_size]);
  175. assert!(size <= max_size);
  176. self.read_at = (self.read_at + size) % capacity;
  177. self.length -= size;
  178. (size, result)
  179. }
  180. /// Dequeue a slice of elements up to the given size from the buffer,
  181. /// and return a reference to them.
  182. ///
  183. /// This function may return a slice smaller than the given size
  184. /// if the allocated space in the buffer is not contiguous.
  185. pub fn dequeue_many<'b>(&'b mut self, size: usize) -> &'b mut [T] {
  186. self.dequeue_many_with(|buf| {
  187. let size = cmp::min(size, buf.len());
  188. (size, &mut buf[..size])
  189. }).1
  190. }
  191. /// Dequeue as many elements from the buffer into the given slice as possible,
  192. /// and return the amount of elements that could fit.
  193. pub fn dequeue_slice(&mut self, data: &mut [T]) -> usize
  194. where T: Copy {
  195. let (size_1, data) = self.dequeue_many_with(|buf| {
  196. let size = cmp::min(buf.len(), data.len());
  197. data[..size].copy_from_slice(&buf[..size]);
  198. (size, &mut data[size..])
  199. });
  200. let (size_2, ()) = self.dequeue_many_with(|buf| {
  201. let size = cmp::min(buf.len(), data.len());
  202. data[..size].copy_from_slice(&buf[..size]);
  203. (size, ())
  204. });
  205. size_1 + size_2
  206. }
  207. }
  208. /// This is the "random access" ring buffer interface: it operates with element slices,
  209. /// and allows to access elements of the buffer that are not adjacent to its head or tail.
  210. ///
  211. /// After calling these functions to inject or extract elements, one would normally
  212. /// use the `enqueue_many` or `dequeue_many` methods to adjust the head or tail.
  213. impl<'a, T: 'a> RingBuffer<'a, T> {
  214. /// Return the largest contiguous slice of unallocated buffer elements starting
  215. /// at the given offset past the last allocated element, and up to the given size.
  216. pub fn get_unallocated(&mut self, offset: usize, mut size: usize) -> &mut [T] {
  217. let start_at = (self.read_at + self.length + offset) % self.capacity();
  218. // We can't access past the end of unallocated data.
  219. if offset > self.window() { return &mut [] }
  220. // We can't enqueue more than there is free space.
  221. let clamped_window = self.window() - offset;
  222. if size > clamped_window { size = clamped_window }
  223. // We can't contiguously enqueue past the end of the storage.
  224. let until_end = self.capacity() - start_at;
  225. if size > until_end { size = until_end }
  226. &mut self.storage[start_at..start_at + size]
  227. }
  228. /// Return the largest contiguous slice of allocated buffer elements starting
  229. /// at the given offset past the first allocated element, and up to the given size.
  230. pub fn get_allocated(&self, offset: usize, mut size: usize) -> &[T] {
  231. let start_at = (self.read_at + offset) % self.capacity();
  232. // We can't read past the end of the allocated data.
  233. if offset > self.length { return &mut [] }
  234. // We can't read more than we have allocated.
  235. let clamped_length = self.length - offset;
  236. if size > clamped_length { size = clamped_length }
  237. // We can't contiguously dequeue past the end of the storage.
  238. let until_end = self.capacity() - start_at;
  239. if size > until_end { size = until_end }
  240. &self.storage[start_at..start_at + size]
  241. }
  242. }
  243. impl<'a, T: 'a> From<ManagedSlice<'a, T>> for RingBuffer<'a, T> {
  244. fn from(slice: ManagedSlice<'a, T>) -> RingBuffer<'a, T> {
  245. RingBuffer::new(slice)
  246. }
  247. }
  248. #[cfg(test)]
  249. mod test {
  250. use super::*;
  251. #[test]
  252. fn test_buffer_length_changes() {
  253. let mut ring = RingBuffer::new(vec![0; 2]);
  254. assert!(ring.is_empty());
  255. assert!(!ring.is_full());
  256. assert_eq!(ring.len(), 0);
  257. assert_eq!(ring.capacity(), 2);
  258. assert_eq!(ring.window(), 2);
  259. ring.length = 1;
  260. assert!(!ring.is_empty());
  261. assert!(!ring.is_full());
  262. assert_eq!(ring.len(), 1);
  263. assert_eq!(ring.capacity(), 2);
  264. assert_eq!(ring.window(), 1);
  265. ring.length = 2;
  266. assert!(!ring.is_empty());
  267. assert!(ring.is_full());
  268. assert_eq!(ring.len(), 2);
  269. assert_eq!(ring.capacity(), 2);
  270. assert_eq!(ring.window(), 0);
  271. }
  272. #[test]
  273. fn test_buffer_enqueue_dequeue_one_with() {
  274. let mut ring = RingBuffer::new(vec![0; 5]);
  275. assert_eq!(ring.dequeue_one_with(|_| unreachable!()) as Result<()>,
  276. Err(Error::Exhausted));
  277. ring.enqueue_one_with(|e| Ok(e)).unwrap();
  278. assert!(!ring.is_empty());
  279. assert!(!ring.is_full());
  280. for i in 1..5 {
  281. ring.enqueue_one_with(|e| Ok(*e = i)).unwrap();
  282. assert!(!ring.is_empty());
  283. }
  284. assert!(ring.is_full());
  285. assert_eq!(ring.enqueue_one_with(|_| unreachable!()) as Result<()>,
  286. Err(Error::Exhausted));
  287. for i in 0..5 {
  288. assert_eq!(ring.dequeue_one_with(|e| Ok(*e)).unwrap(), i);
  289. assert!(!ring.is_full());
  290. }
  291. assert_eq!(ring.dequeue_one_with(|_| unreachable!()) as Result<()>,
  292. Err(Error::Exhausted));
  293. assert!(ring.is_empty());
  294. }
  295. #[test]
  296. fn test_buffer_enqueue_dequeue_one() {
  297. let mut ring = RingBuffer::new(vec![0; 5]);
  298. assert_eq!(ring.dequeue_one(), Err(Error::Exhausted));
  299. ring.enqueue_one().unwrap();
  300. assert!(!ring.is_empty());
  301. assert!(!ring.is_full());
  302. for i in 1..5 {
  303. *ring.enqueue_one().unwrap() = i;
  304. assert!(!ring.is_empty());
  305. }
  306. assert!(ring.is_full());
  307. assert_eq!(ring.enqueue_one(), Err(Error::Exhausted));
  308. for i in 0..5 {
  309. assert_eq!(*ring.dequeue_one().unwrap(), i);
  310. assert!(!ring.is_full());
  311. }
  312. assert_eq!(ring.dequeue_one(), Err(Error::Exhausted));
  313. assert!(ring.is_empty());
  314. }
  315. #[test]
  316. fn test_buffer_enqueue_many_with() {
  317. let mut ring = RingBuffer::new(vec![b'.'; 12]);
  318. assert_eq!(ring.enqueue_many_with(|buf| {
  319. assert_eq!(buf.len(), 12);
  320. buf[0..2].copy_from_slice(b"ab");
  321. (2, true)
  322. }), (2, true));
  323. assert_eq!(ring.len(), 2);
  324. assert_eq!(&ring.storage[..], b"ab..........");
  325. ring.enqueue_many_with(|buf| {
  326. assert_eq!(buf.len(), 12 - 2);
  327. buf[0..4].copy_from_slice(b"cdXX");
  328. (2, ())
  329. });
  330. assert_eq!(ring.len(), 4);
  331. assert_eq!(&ring.storage[..], b"abcdXX......");
  332. ring.enqueue_many_with(|buf| {
  333. assert_eq!(buf.len(), 12 - 4);
  334. buf[0..4].copy_from_slice(b"efgh");
  335. (4, ())
  336. });
  337. assert_eq!(ring.len(), 8);
  338. assert_eq!(&ring.storage[..], b"abcdefgh....");
  339. for i in 0..4 {
  340. *ring.dequeue_one().unwrap() = b'.';
  341. }
  342. assert_eq!(ring.len(), 4);
  343. assert_eq!(&ring.storage[..], b"....efgh....");
  344. ring.enqueue_many_with(|buf| {
  345. assert_eq!(buf.len(), 12 - 8);
  346. buf[0..4].copy_from_slice(b"ijkl");
  347. (4, ())
  348. });
  349. assert_eq!(ring.len(), 8);
  350. assert_eq!(&ring.storage[..], b"....efghijkl");
  351. ring.enqueue_many_with(|buf| {
  352. assert_eq!(buf.len(), 4);
  353. buf[0..4].copy_from_slice(b"abcd");
  354. (4, ())
  355. });
  356. assert_eq!(ring.len(), 12);
  357. assert_eq!(&ring.storage[..], b"abcdefghijkl");
  358. for i in 0..4 {
  359. *ring.dequeue_one().unwrap() = b'.';
  360. }
  361. assert_eq!(ring.len(), 8);
  362. assert_eq!(&ring.storage[..], b"abcd....ijkl");
  363. }
  364. #[test]
  365. fn test_buffer_enqueue_many() {
  366. let mut ring = RingBuffer::new(vec![b'.'; 12]);
  367. ring.enqueue_many(8).copy_from_slice(b"abcdefgh");
  368. assert_eq!(ring.len(), 8);
  369. assert_eq!(&ring.storage[..], b"abcdefgh....");
  370. ring.enqueue_many(8).copy_from_slice(b"ijkl");
  371. assert_eq!(ring.len(), 12);
  372. assert_eq!(&ring.storage[..], b"abcdefghijkl");
  373. }
  374. #[test]
  375. fn test_buffer_enqueue_slice() {
  376. let mut ring = RingBuffer::new(vec![b'.'; 12]);
  377. assert_eq!(ring.enqueue_slice(b"abcdefgh"), 8);
  378. assert_eq!(ring.len(), 8);
  379. assert_eq!(&ring.storage[..], b"abcdefgh....");
  380. for i in 0..4 {
  381. *ring.dequeue_one().unwrap() = b'.';
  382. }
  383. assert_eq!(ring.len(), 4);
  384. assert_eq!(&ring.storage[..], b"....efgh....");
  385. assert_eq!(ring.enqueue_slice(b"ijklabcd"), 8);
  386. assert_eq!(ring.len(), 12);
  387. assert_eq!(&ring.storage[..], b"abcdefghijkl");
  388. }
  389. #[test]
  390. fn test_buffer_dequeue_many_with() {
  391. let mut ring = RingBuffer::new(vec![b'.'; 12]);
  392. assert_eq!(ring.enqueue_slice(b"abcdefghijkl"), 12);
  393. assert_eq!(ring.dequeue_many_with(|buf| {
  394. assert_eq!(buf.len(), 12);
  395. assert_eq!(buf, b"abcdefghijkl");
  396. buf[..4].copy_from_slice(b"....");
  397. (4, true)
  398. }), (4, true));
  399. assert_eq!(ring.len(), 8);
  400. assert_eq!(&ring.storage[..], b"....efghijkl");
  401. ring.dequeue_many_with(|buf| {
  402. assert_eq!(buf, b"efghijkl");
  403. buf[..4].copy_from_slice(b"....");
  404. (4, ())
  405. });
  406. assert_eq!(ring.len(), 4);
  407. assert_eq!(&ring.storage[..], b"........ijkl");
  408. assert_eq!(ring.enqueue_slice(b"abcd"), 4);
  409. assert_eq!(ring.len(), 8);
  410. ring.dequeue_many_with(|buf| {
  411. assert_eq!(buf, b"ijkl");
  412. buf[..4].copy_from_slice(b"....");
  413. (4, ())
  414. });
  415. ring.dequeue_many_with(|buf| {
  416. assert_eq!(buf, b"abcd");
  417. buf[..4].copy_from_slice(b"....");
  418. (4, ())
  419. });
  420. assert_eq!(ring.len(), 0);
  421. assert_eq!(&ring.storage[..], b"............");
  422. }
  423. #[test]
  424. fn test_buffer_dequeue_many() {
  425. let mut ring = RingBuffer::new(vec![b'.'; 12]);
  426. assert_eq!(ring.enqueue_slice(b"abcdefghijkl"), 12);
  427. {
  428. let mut buf = ring.dequeue_many(8);
  429. assert_eq!(buf, b"abcdefgh");
  430. buf.copy_from_slice(b"........");
  431. }
  432. assert_eq!(ring.len(), 4);
  433. assert_eq!(&ring.storage[..], b"........ijkl");
  434. {
  435. let mut buf = ring.dequeue_many(8);
  436. assert_eq!(buf, b"ijkl");
  437. buf.copy_from_slice(b"....");
  438. }
  439. assert_eq!(ring.len(), 0);
  440. assert_eq!(&ring.storage[..], b"............");
  441. }
  442. #[test]
  443. fn test_buffer_dequeue_slice() {
  444. let mut ring = RingBuffer::new(vec![b'.'; 12]);
  445. assert_eq!(ring.enqueue_slice(b"abcdefghijkl"), 12);
  446. {
  447. let mut buf = [0; 8];
  448. assert_eq!(ring.dequeue_slice(&mut buf[..]), 8);
  449. assert_eq!(&buf[..], b"abcdefgh");
  450. assert_eq!(ring.len(), 4);
  451. }
  452. assert_eq!(ring.enqueue_slice(b"abcd"), 4);
  453. {
  454. let mut buf = [0; 8];
  455. assert_eq!(ring.dequeue_slice(&mut buf[..]), 8);
  456. assert_eq!(&buf[..], b"ijklabcd");
  457. assert_eq!(ring.len(), 0);
  458. }
  459. }
  460. #[test]
  461. fn test_buffer_get_unallocated() {
  462. let mut ring = RingBuffer::new(vec![b'.'; 12]);;
  463. assert_eq!(ring.get_unallocated(16, 4), b"");
  464. {
  465. let buf = ring.get_unallocated(0, 4);
  466. buf.copy_from_slice(b"abcd");
  467. }
  468. assert_eq!(&ring.storage[..], b"abcd........");
  469. ring.enqueue_many(4);
  470. assert_eq!(ring.len(), 4);
  471. {
  472. let buf = ring.get_unallocated(4, 8);
  473. buf.copy_from_slice(b"ijkl");
  474. }
  475. assert_eq!(&ring.storage[..], b"abcd....ijkl");
  476. ring.enqueue_many(8).copy_from_slice(b"EFGHIJKL");
  477. ring.dequeue_many(4).copy_from_slice(b"abcd");
  478. assert_eq!(ring.len(), 8);
  479. assert_eq!(&ring.storage[..], b"abcdEFGHIJKL");
  480. {
  481. let buf = ring.get_unallocated(0, 8);
  482. buf.copy_from_slice(b"ABCD");
  483. }
  484. assert_eq!(&ring.storage[..], b"ABCDEFGHIJKL");
  485. }
  486. #[test]
  487. fn test_buffer_get_allocated() {
  488. let mut ring = RingBuffer::new(vec![b'.'; 12]);;
  489. assert_eq!(ring.get_allocated(16, 4), b"");
  490. assert_eq!(ring.get_allocated(0, 4), b"");
  491. ring.enqueue_slice(b"abcd");
  492. assert_eq!(ring.get_allocated(0, 8), b"abcd");
  493. ring.enqueue_slice(b"efghijkl");
  494. ring.dequeue_many(4).copy_from_slice(b"....");
  495. assert_eq!(ring.get_allocated(4, 8), b"ijkl");
  496. ring.enqueue_slice(b"abcd");
  497. assert_eq!(ring.get_allocated(4, 8), b"ijkl");
  498. }
  499. }