ring_buffer.rs 7.7 KB


  1. use managed::{Managed, ManagedSlice};
  2. use {Error, Result};
  3. use super::Resettable;
  4. /// A ring buffer.
  5. #[derive(Debug)]
  6. pub struct RingBuffer<'a, T: 'a> {
  7. storage: ManagedSlice<'a, T>,
  8. read_at: usize,
  9. length: usize,
  10. }
  11. impl<'a, T: 'a> RingBuffer<'a, T> {
  12. /// Create a ring buffer with the given storage.
  13. ///
  14. /// During creation, every element in `storage` is reset.
  15. pub fn new<S>(storage: S) -> RingBuffer<'a, T>
  16. where S: Into<ManagedSlice<'a, T>>,
  17. {
  18. RingBuffer {
  19. storage: storage.into(),
  20. read_at: 0,
  21. length: 0,
  22. }
  23. }
  24. /// Clear the ring buffer.
  25. pub fn clear(&mut self) {
  26. self.read_at = 0;
  27. self.length = 0;
  28. }
  29. /// Clear the ring buffer, and reset every element.
  30. pub fn reset(&mut self)
  31. where T: Resettable {
  32. self.clear();
  33. for elem in self.storage.iter_mut() {
  34. elem.reset();
  35. }
  36. }
  37. /// Return the current number of elements in the ring buffer.
  38. pub fn len(&self) -> usize {
  39. self.length
  40. }
  41. /// Return the maximum number of elements in the ring buffer.
  42. pub fn capacity(&self) -> usize {
  43. self.storage.len()
  44. }
  45. /// Return the number of elements that can be added to the ring buffer.
  46. pub fn window(&self) -> usize {
  47. self.capacity() - self.len()
  48. }
  49. /// Query whether the buffer is empty.
  50. pub fn empty(&self) -> bool {
  51. self.len() == 0
  52. }
  53. /// Query whether the buffer is full.
  54. pub fn full(&self) -> bool {
  55. self.window() == 0
  56. }
  57. }
  58. // This is the "discrete" ring buffer interface: it operates with single elements,
  59. // and boundary conditions (empty/full) are errors.
  60. impl<'a, T: 'a> RingBuffer<'a, T> {
  61. /// Call `f` with a single buffer element, and enqueue the element if `f`
  62. /// returns successfully, or return `Err(Error::Exhausted)` if the buffer is full.
  63. pub fn try_enqueue<'b, R, F>(&'b mut self, f: F) -> Result<R>
  64. where F: FnOnce(&'b mut T) -> Result<R> {
  65. if self.full() { return Err(Error::Exhausted) }
  66. let index = (self.read_at + self.length) % self.capacity();
  67. match f(&mut self.storage[index]) {
  68. Ok(result) => {
  69. self.length += 1;
  70. Ok(result)
  71. }
  72. Err(error) => Err(error)
  73. }
  74. }
  75. /// Enqueue a single element into the buffer, and return a pointer to it,
  76. /// or return `Err(Error::Exhausted)` if the buffer is full.
  77. pub fn enqueue<'b>(&'b mut self) -> Result<&'b mut T> {
  78. self.try_enqueue(Ok)
  79. }
  80. /// Call `f` with a buffer element, and dequeue the element if `f` returns successfully, or
  81. /// return `Err(Error::Exhausted)` if the buffer is empty.
  82. pub fn try_dequeue<'b, R, F>(&'b mut self, f: F) -> Result<R>
  83. where F: FnOnce(&'b mut T) -> Result<R> {
  84. if self.empty() { return Err(Error::Exhausted) }
  85. let next_at = (self.read_at + 1) % self.capacity();
  86. match f(&mut self.storage[self.read_at]) {
  87. Ok(result) => {
  88. self.length -= 1;
  89. self.read_at = next_at;
  90. Ok(result)
  91. }
  92. Err(error) => Err(error)
  93. }
  94. }
  95. /// Dequeue an element from the buffer, and return a mutable reference to it, or return
  96. /// `Err(Error::Exhausted)` if the buffer is empty.
  97. pub fn dequeue(&mut self) -> Result<&mut T> {
  98. self.try_dequeue(Ok)
  99. }
  100. }
  101. // This is the "continuous" ring buffer interface: it operates with element slices,
  102. // and boundary conditions (empty/full) simply result in empty slices.
  103. impl<'a, T: 'a> RingBuffer<'a, T> {
  104. fn clamp_writer(&self, mut size: usize) -> (usize, usize) {
  105. let write_at = (self.read_at + self.length) % self.capacity();
  106. // We can't enqueue more than there is free space.
  107. let free = self.capacity() - self.length;
  108. if size > free { size = free }
  109. // We can't contiguously enqueue past the beginning of the storage.
  110. let until_end = self.capacity() - write_at;
  111. if size > until_end { size = until_end }
  112. (write_at, size)
  113. }
  114. pub(crate) fn enqueue_slice<'b>(&'b mut self, size: usize) -> &'b mut [T] {
  115. let (write_at, size) = self.clamp_writer(size);
  116. self.length += size;
  117. &mut self.storage[write_at..write_at + size]
  118. }
  119. pub(crate) fn enqueue_slice_all(&mut self, data: &[T])
  120. where T: Copy {
  121. let data = {
  122. let mut dest = self.enqueue_slice(data.len());
  123. let (data, rest) = data.split_at(dest.len());
  124. dest.copy_from_slice(data);
  125. rest
  126. };
  127. // Retry, in case we had a wraparound.
  128. let mut dest = self.enqueue_slice(data.len());
  129. let (data, _) = data.split_at(dest.len());
  130. dest.copy_from_slice(data);
  131. }
  132. fn clamp_reader(&self, offset: usize, mut size: usize) -> (usize, usize) {
  133. let read_at = (self.read_at + offset) % self.capacity();
  134. // We can't read past the end of the queued data.
  135. if offset > self.length { return (read_at, 0) }
  136. // We can't dequeue more than was queued.
  137. let clamped_length = self.length - offset;
  138. if size > clamped_length { size = clamped_length }
  139. // We can't contiguously dequeue past the end of the storage.
  140. let until_end = self.capacity() - read_at;
  141. if size > until_end { size = until_end }
  142. (read_at, size)
  143. }
  144. pub(crate) fn dequeue_slice(&mut self, size: usize) -> &[T] {
  145. let (read_at, size) = self.clamp_reader(0, size);
  146. self.read_at = (self.read_at + size) % self.capacity();
  147. self.length -= size;
  148. &self.storage[read_at..read_at + size]
  149. }
  150. pub(crate) fn peek(&self, offset: usize, size: usize) -> &[T] {
  151. let (read_at, size) = self.clamp_reader(offset, size);
  152. &self.storage[read_at..read_at + size]
  153. }
  154. }
  155. impl<'a, T: 'a> From<ManagedSlice<'a, T>> for RingBuffer<'a, T> {
  156. fn from(slice: ManagedSlice<'a, T>) -> RingBuffer<'a, T> {
  157. RingBuffer::new(slice)
  158. }
  159. }
  160. #[cfg(test)]
  161. mod test {
  162. use super::*;
  163. const SIZE: usize = 5;
  164. #[test]
  165. pub fn test_buffer() {
  166. let mut buf = RingBuffer::new(vec![0; SIZE]);
  167. assert!(buf.empty());
  168. assert!(!buf.full());
  169. assert_eq!(buf.dequeue(), Err(Error::Exhausted));
  170. buf.enqueue().unwrap();
  171. assert!(!buf.empty());
  172. assert!(!buf.full());
  173. for i in 1..SIZE {
  174. *buf.enqueue().unwrap() = i;
  175. assert!(!buf.empty());
  176. }
  177. assert!(buf.full());
  178. assert_eq!(buf.enqueue(), Err(Error::Exhausted));
  179. for i in 0..SIZE {
  180. assert_eq!(*buf.dequeue().unwrap(), i);
  181. assert!(!buf.full());
  182. }
  183. assert_eq!(buf.dequeue(), Err(Error::Exhausted));
  184. assert!(buf.empty());
  185. }
  186. #[test]
  187. pub fn test_buffer_try() {
  188. let mut buf = RingBuffer::new(vec![0; SIZE]);
  189. assert!(buf.empty());
  190. assert!(!buf.full());
  191. assert_eq!(buf.try_dequeue(|_| unreachable!()) as Result<()>,
  192. Err(Error::Exhausted));
  193. buf.try_enqueue(|e| Ok(e)).unwrap();
  194. assert!(!buf.empty());
  195. assert!(!buf.full());
  196. for i in 1..SIZE {
  197. buf.try_enqueue(|e| Ok(*e = i)).unwrap();
  198. assert!(!buf.empty());
  199. }
  200. assert!(buf.full());
  201. assert_eq!(buf.try_enqueue(|_| unreachable!()) as Result<()>,
  202. Err(Error::Exhausted));
  203. for i in 0..SIZE {
  204. assert_eq!(buf.try_dequeue(|e| Ok(*e)).unwrap(), i);
  205. assert!(!buf.full());
  206. }
  207. assert_eq!(buf.try_dequeue(|_| unreachable!()) as Result<()>,
  208. Err(Error::Exhausted));
  209. assert!(buf.empty());
  210. }
  211. }