123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580 |
- use core::cmp;
- use managed::{Managed, ManagedSlice};
- use {Error, Result};
- use super::Resettable;
- /// A ring buffer.
- ///
- /// This ring buffer implementation provides many ways to interact with it:
- ///
- /// * Enqueueing or dequeueing one element from corresponding side of the buffer;
- /// * Enqueueing or dequeueing a slice of elements from corresponding side of the buffer;
- /// * Accessing allocated and unallocated areas directly.
- ///
- /// It is also zero-copy; all methods provide references into the buffer's storage.
- /// Note that all references are mutable; it is considered more important to allow
- /// in-place processing than to protect from accidental mutation.
- ///
- /// This implementation is suitable for both simple uses such as a FIFO queue
- /// of UDP packets, and advanced ones such as a TCP reassembly buffer.
- #[derive(Debug)]
- pub struct RingBuffer<'a, T: 'a> {
- storage: ManagedSlice<'a, T>,
- read_at: usize,
- length: usize,
- }
- impl<'a, T: 'a> RingBuffer<'a, T> {
- /// Create a ring buffer with the given storage.
- ///
- /// During creation, every element in `storage` is reset.
- pub fn new<S>(storage: S) -> RingBuffer<'a, T>
- where S: Into<ManagedSlice<'a, T>>,
- {
- RingBuffer {
- storage: storage.into(),
- read_at: 0,
- length: 0,
- }
- }
- /// Clear the ring buffer.
- pub fn clear(&mut self) {
- self.read_at = 0;
- self.length = 0;
- }
- /// Return the maximum number of elements in the ring buffer.
- pub fn capacity(&self) -> usize {
- self.storage.len()
- }
- /// Clear the ring buffer, and reset every element.
- pub fn reset(&mut self)
- where T: Resettable {
- self.clear();
- for elem in self.storage.iter_mut() {
- elem.reset();
- }
- }
- /// Return the current number of elements in the ring buffer.
- pub fn len(&self) -> usize {
- self.length
- }
- /// Return the number of elements that can be added to the ring buffer.
- pub fn window(&self) -> usize {
- self.capacity() - self.len()
- }
- /// Query whether the buffer is empty.
- pub fn is_empty(&self) -> bool {
- self.len() == 0
- }
- /// Query whether the buffer is full.
- pub fn is_full(&self) -> bool {
- self.window() == 0
- }
- }
- /// This is the "discrete" ring buffer interface: it operates with single elements,
- /// and boundary conditions (empty/full) are errors.
- impl<'a, T: 'a> RingBuffer<'a, T> {
- /// Call `f` with a single buffer element, and enqueue the element if `f`
- /// returns successfully, or return `Err(Error::Exhausted)` if the buffer is full.
- pub fn enqueue_one_with<'b, R, F>(&'b mut self, f: F) -> Result<R>
- where F: FnOnce(&'b mut T) -> Result<R> {
- if self.is_full() { return Err(Error::Exhausted) }
- let index = (self.read_at + self.length) % self.capacity();
- match f(&mut self.storage[index]) {
- Ok(result) => {
- self.length += 1;
- Ok(result)
- }
- Err(error) => Err(error)
- }
- }
- /// Enqueue a single element into the buffer, and return a reference to it,
- /// or return `Err(Error::Exhausted)` if the buffer is full.
- ///
- /// This function is a shortcut for `ring_buf.enqueue_one_with(Ok)`.
- pub fn enqueue_one<'b>(&'b mut self) -> Result<&'b mut T> {
- self.enqueue_one_with(Ok)
- }
- /// Call `f` with a single buffer element, and dequeue the element if `f`
- /// returns successfully, or return `Err(Error::Exhausted)` if the buffer is empty.
- pub fn dequeue_one_with<'b, R, F>(&'b mut self, f: F) -> Result<R>
- where F: FnOnce(&'b mut T) -> Result<R> {
- if self.is_empty() { return Err(Error::Exhausted) }
- let next_at = (self.read_at + 1) % self.capacity();
- match f(&mut self.storage[self.read_at]) {
- Ok(result) => {
- self.length -= 1;
- self.read_at = next_at;
- Ok(result)
- }
- Err(error) => Err(error)
- }
- }
- /// Dequeue an element from the buffer, and return a reference to it,
- /// or return `Err(Error::Exhausted)` if the buffer is empty.
- ///
- /// This function is a shortcut for `ring_buf.dequeue_one_with(Ok)`.
- pub fn dequeue_one(&mut self) -> Result<&mut T> {
- self.dequeue_one_with(Ok)
- }
- }
- /// This is the "continuous" ring buffer interface: it operates with element slices,
- /// and boundary conditions (empty/full) simply result in empty slices.
- impl<'a, T: 'a> RingBuffer<'a, T> {
- /// Call `f` with the largest contiguous slice of unallocated buffer elements,
- /// and enqueue the amount of elements returned by `f`.
- ///
- /// # Panics
- /// This function panics if the amount of elements returned by `f` is larger
- /// than the size of the slice passed into it.
- pub fn enqueue_many_with<'b, R, F>(&'b mut self, f: F) -> (usize, R)
- where F: FnOnce(&'b mut [T]) -> (usize, R) {
- let write_at = (self.read_at + self.length) % self.capacity();
- let max_size = cmp::min(self.window(), self.capacity() - write_at);
- let (size, result) = f(&mut self.storage[write_at..write_at + max_size]);
- assert!(size <= max_size);
- self.length += size;
- (size, result)
- }
- /// Enqueue a slice of elements up to the given size into the buffer,
- /// and return a reference to them.
- ///
- /// This function may return a slice smaller than the given size
- /// if the free space in the buffer is not contiguous.
- pub fn enqueue_many<'b>(&'b mut self, size: usize) -> &'b mut [T] {
- self.enqueue_many_with(|buf| {
- let size = cmp::min(size, buf.len());
- (size, &mut buf[..size])
- }).1
- }
- /// Enqueue as many elements from the given slice into the buffer as possible,
- /// and return the amount of elements that could fit.
- pub fn enqueue_slice(&mut self, data: &[T]) -> usize
- where T: Copy {
- let (size_1, data) = self.enqueue_many_with(|buf| {
- let size = cmp::min(buf.len(), data.len());
- buf[..size].copy_from_slice(&data[..size]);
- (size, &data[size..])
- });
- let (size_2, ()) = self.enqueue_many_with(|buf| {
- let size = cmp::min(buf.len(), data.len());
- buf[..size].copy_from_slice(&data[..size]);
- (size, ())
- });
- size_1 + size_2
- }
- /// Call `f` with the largest contiguous slice of allocated buffer elements,
- /// and dequeue the amount of elements returned by `f`.
- ///
- /// # Panics
- /// This function panics if the amount of elements returned by `f` is larger
- /// than the size of the slice passed into it.
- pub fn dequeue_many_with<'b, R, F>(&'b mut self, f: F) -> (usize, R)
- where F: FnOnce(&'b mut [T]) -> (usize, R) {
- let capacity = self.capacity();
- let max_size = cmp::min(self.len(), capacity - self.read_at);
- let (size, result) = f(&mut self.storage[self.read_at..self.read_at + max_size]);
- assert!(size <= max_size);
- self.read_at = (self.read_at + size) % capacity;
- self.length -= size;
- (size, result)
- }
- /// Dequeue a slice of elements up to the given size from the buffer,
- /// and return a reference to them.
- ///
- /// This function may return a slice smaller than the given size
- /// if the allocated space in the buffer is not contiguous.
- pub fn dequeue_many<'b>(&'b mut self, size: usize) -> &'b mut [T] {
- self.dequeue_many_with(|buf| {
- let size = cmp::min(size, buf.len());
- (size, &mut buf[..size])
- }).1
- }
- /// Dequeue as many elements from the buffer into the given slice as possible,
- /// and return the amount of elements that could fit.
- pub fn dequeue_slice(&mut self, data: &mut [T]) -> usize
- where T: Copy {
- let (size_1, data) = self.dequeue_many_with(|buf| {
- let size = cmp::min(buf.len(), data.len());
- data[..size].copy_from_slice(&buf[..size]);
- (size, &mut data[size..])
- });
- let (size_2, ()) = self.dequeue_many_with(|buf| {
- let size = cmp::min(buf.len(), data.len());
- data[..size].copy_from_slice(&buf[..size]);
- (size, ())
- });
- size_1 + size_2
- }
- }
- /// This is the "random access" ring buffer interface: it operates with element slices,
- /// and allows to access elements of the buffer that are not adjacent to its head or tail.
- ///
- /// After calling these functions to inject or extract elements, one would normally
- /// use the `enqueue_many` or `dequeue_many` methods to adjust the head or tail.
- impl<'a, T: 'a> RingBuffer<'a, T> {
- /// Return the largest contiguous slice of unallocated buffer elements starting
- /// at the given offset past the last allocated element, and up to the given size.
- pub fn get_unallocated(&mut self, offset: usize, mut size: usize) -> &mut [T] {
- let start_at = (self.read_at + self.length + offset) % self.capacity();
- // We can't access past the end of unallocated data.
- if offset > self.window() { return &mut [] }
- // We can't enqueue more than there is free space.
- let clamped_window = self.window() - offset;
- if size > clamped_window { size = clamped_window }
- // We can't contiguously enqueue past the end of the storage.
- let until_end = self.capacity() - start_at;
- if size > until_end { size = until_end }
- &mut self.storage[start_at..start_at + size]
- }
- /// Return the largest contiguous slice of allocated buffer elements starting
- /// at the given offset past the first allocated element, and up to the given size.
- pub fn get_allocated(&self, offset: usize, mut size: usize) -> &[T] {
- let start_at = (self.read_at + offset) % self.capacity();
- // We can't read past the end of the allocated data.
- if offset > self.length { return &mut [] }
- // We can't read more than we have allocated.
- let clamped_length = self.length - offset;
- if size > clamped_length { size = clamped_length }
- // We can't contiguously dequeue past the end of the storage.
- let until_end = self.capacity() - start_at;
- if size > until_end { size = until_end }
- &self.storage[start_at..start_at + size]
- }
- }
- impl<'a, T: 'a> From<ManagedSlice<'a, T>> for RingBuffer<'a, T> {
- fn from(slice: ManagedSlice<'a, T>) -> RingBuffer<'a, T> {
- RingBuffer::new(slice)
- }
- }
- #[cfg(test)]
- mod test {
- use super::*;
- #[test]
- fn test_buffer_length_changes() {
- let mut ring = RingBuffer::new(vec![0; 2]);
- assert!(ring.is_empty());
- assert!(!ring.is_full());
- assert_eq!(ring.len(), 0);
- assert_eq!(ring.capacity(), 2);
- assert_eq!(ring.window(), 2);
- ring.length = 1;
- assert!(!ring.is_empty());
- assert!(!ring.is_full());
- assert_eq!(ring.len(), 1);
- assert_eq!(ring.capacity(), 2);
- assert_eq!(ring.window(), 1);
- ring.length = 2;
- assert!(!ring.is_empty());
- assert!(ring.is_full());
- assert_eq!(ring.len(), 2);
- assert_eq!(ring.capacity(), 2);
- assert_eq!(ring.window(), 0);
- }
- #[test]
- fn test_buffer_enqueue_dequeue_one_with() {
- let mut ring = RingBuffer::new(vec![0; 5]);
- assert_eq!(ring.dequeue_one_with(|_| unreachable!()) as Result<()>,
- Err(Error::Exhausted));
- ring.enqueue_one_with(|e| Ok(e)).unwrap();
- assert!(!ring.is_empty());
- assert!(!ring.is_full());
- for i in 1..5 {
- ring.enqueue_one_with(|e| Ok(*e = i)).unwrap();
- assert!(!ring.is_empty());
- }
- assert!(ring.is_full());
- assert_eq!(ring.enqueue_one_with(|_| unreachable!()) as Result<()>,
- Err(Error::Exhausted));
- for i in 0..5 {
- assert_eq!(ring.dequeue_one_with(|e| Ok(*e)).unwrap(), i);
- assert!(!ring.is_full());
- }
- assert_eq!(ring.dequeue_one_with(|_| unreachable!()) as Result<()>,
- Err(Error::Exhausted));
- assert!(ring.is_empty());
- }
- #[test]
- fn test_buffer_enqueue_dequeue_one() {
- let mut ring = RingBuffer::new(vec![0; 5]);
- assert_eq!(ring.dequeue_one(), Err(Error::Exhausted));
- ring.enqueue_one().unwrap();
- assert!(!ring.is_empty());
- assert!(!ring.is_full());
- for i in 1..5 {
- *ring.enqueue_one().unwrap() = i;
- assert!(!ring.is_empty());
- }
- assert!(ring.is_full());
- assert_eq!(ring.enqueue_one(), Err(Error::Exhausted));
- for i in 0..5 {
- assert_eq!(*ring.dequeue_one().unwrap(), i);
- assert!(!ring.is_full());
- }
- assert_eq!(ring.dequeue_one(), Err(Error::Exhausted));
- assert!(ring.is_empty());
- }
- #[test]
- fn test_buffer_enqueue_many_with() {
- let mut ring = RingBuffer::new(vec![b'.'; 12]);
- assert_eq!(ring.enqueue_many_with(|buf| {
- assert_eq!(buf.len(), 12);
- buf[0..2].copy_from_slice(b"ab");
- (2, true)
- }), (2, true));
- assert_eq!(ring.len(), 2);
- assert_eq!(&ring.storage[..], b"ab..........");
- ring.enqueue_many_with(|buf| {
- assert_eq!(buf.len(), 12 - 2);
- buf[0..4].copy_from_slice(b"cdXX");
- (2, ())
- });
- assert_eq!(ring.len(), 4);
- assert_eq!(&ring.storage[..], b"abcdXX......");
- ring.enqueue_many_with(|buf| {
- assert_eq!(buf.len(), 12 - 4);
- buf[0..4].copy_from_slice(b"efgh");
- (4, ())
- });
- assert_eq!(ring.len(), 8);
- assert_eq!(&ring.storage[..], b"abcdefgh....");
- for i in 0..4 {
- *ring.dequeue_one().unwrap() = b'.';
- }
- assert_eq!(ring.len(), 4);
- assert_eq!(&ring.storage[..], b"....efgh....");
- ring.enqueue_many_with(|buf| {
- assert_eq!(buf.len(), 12 - 8);
- buf[0..4].copy_from_slice(b"ijkl");
- (4, ())
- });
- assert_eq!(ring.len(), 8);
- assert_eq!(&ring.storage[..], b"....efghijkl");
- ring.enqueue_many_with(|buf| {
- assert_eq!(buf.len(), 4);
- buf[0..4].copy_from_slice(b"abcd");
- (4, ())
- });
- assert_eq!(ring.len(), 12);
- assert_eq!(&ring.storage[..], b"abcdefghijkl");
- for i in 0..4 {
- *ring.dequeue_one().unwrap() = b'.';
- }
- assert_eq!(ring.len(), 8);
- assert_eq!(&ring.storage[..], b"abcd....ijkl");
- }
- #[test]
- fn test_buffer_enqueue_many() {
- let mut ring = RingBuffer::new(vec![b'.'; 12]);
- ring.enqueue_many(8).copy_from_slice(b"abcdefgh");
- assert_eq!(ring.len(), 8);
- assert_eq!(&ring.storage[..], b"abcdefgh....");
- ring.enqueue_many(8).copy_from_slice(b"ijkl");
- assert_eq!(ring.len(), 12);
- assert_eq!(&ring.storage[..], b"abcdefghijkl");
- }
- #[test]
- fn test_buffer_enqueue_slice() {
- let mut ring = RingBuffer::new(vec![b'.'; 12]);
- assert_eq!(ring.enqueue_slice(b"abcdefgh"), 8);
- assert_eq!(ring.len(), 8);
- assert_eq!(&ring.storage[..], b"abcdefgh....");
- for i in 0..4 {
- *ring.dequeue_one().unwrap() = b'.';
- }
- assert_eq!(ring.len(), 4);
- assert_eq!(&ring.storage[..], b"....efgh....");
- assert_eq!(ring.enqueue_slice(b"ijklabcd"), 8);
- assert_eq!(ring.len(), 12);
- assert_eq!(&ring.storage[..], b"abcdefghijkl");
- }
- #[test]
- fn test_buffer_dequeue_many_with() {
- let mut ring = RingBuffer::new(vec![b'.'; 12]);
- assert_eq!(ring.enqueue_slice(b"abcdefghijkl"), 12);
- assert_eq!(ring.dequeue_many_with(|buf| {
- assert_eq!(buf.len(), 12);
- assert_eq!(buf, b"abcdefghijkl");
- buf[..4].copy_from_slice(b"....");
- (4, true)
- }), (4, true));
- assert_eq!(ring.len(), 8);
- assert_eq!(&ring.storage[..], b"....efghijkl");
- ring.dequeue_many_with(|buf| {
- assert_eq!(buf, b"efghijkl");
- buf[..4].copy_from_slice(b"....");
- (4, ())
- });
- assert_eq!(ring.len(), 4);
- assert_eq!(&ring.storage[..], b"........ijkl");
- assert_eq!(ring.enqueue_slice(b"abcd"), 4);
- assert_eq!(ring.len(), 8);
- ring.dequeue_many_with(|buf| {
- assert_eq!(buf, b"ijkl");
- buf[..4].copy_from_slice(b"....");
- (4, ())
- });
- ring.dequeue_many_with(|buf| {
- assert_eq!(buf, b"abcd");
- buf[..4].copy_from_slice(b"....");
- (4, ())
- });
- assert_eq!(ring.len(), 0);
- assert_eq!(&ring.storage[..], b"............");
- }
- #[test]
- fn test_buffer_dequeue_many() {
- let mut ring = RingBuffer::new(vec![b'.'; 12]);
- assert_eq!(ring.enqueue_slice(b"abcdefghijkl"), 12);
- {
- let mut buf = ring.dequeue_many(8);
- assert_eq!(buf, b"abcdefgh");
- buf.copy_from_slice(b"........");
- }
- assert_eq!(ring.len(), 4);
- assert_eq!(&ring.storage[..], b"........ijkl");
- {
- let mut buf = ring.dequeue_many(8);
- assert_eq!(buf, b"ijkl");
- buf.copy_from_slice(b"....");
- }
- assert_eq!(ring.len(), 0);
- assert_eq!(&ring.storage[..], b"............");
- }
- #[test]
- fn test_buffer_dequeue_slice() {
- let mut ring = RingBuffer::new(vec![b'.'; 12]);
- assert_eq!(ring.enqueue_slice(b"abcdefghijkl"), 12);
- {
- let mut buf = [0; 8];
- assert_eq!(ring.dequeue_slice(&mut buf[..]), 8);
- assert_eq!(&buf[..], b"abcdefgh");
- assert_eq!(ring.len(), 4);
- }
- assert_eq!(ring.enqueue_slice(b"abcd"), 4);
- {
- let mut buf = [0; 8];
- assert_eq!(ring.dequeue_slice(&mut buf[..]), 8);
- assert_eq!(&buf[..], b"ijklabcd");
- assert_eq!(ring.len(), 0);
- }
- }
- #[test]
- fn test_buffer_get_unallocated() {
- let mut ring = RingBuffer::new(vec![b'.'; 12]);;
- assert_eq!(ring.get_unallocated(16, 4), b"");
- {
- let buf = ring.get_unallocated(0, 4);
- buf.copy_from_slice(b"abcd");
- }
- assert_eq!(&ring.storage[..], b"abcd........");
- ring.enqueue_many(4);
- assert_eq!(ring.len(), 4);
- {
- let buf = ring.get_unallocated(4, 8);
- buf.copy_from_slice(b"ijkl");
- }
- assert_eq!(&ring.storage[..], b"abcd....ijkl");
- ring.enqueue_many(8).copy_from_slice(b"EFGHIJKL");
- ring.dequeue_many(4).copy_from_slice(b"abcd");
- assert_eq!(ring.len(), 8);
- assert_eq!(&ring.storage[..], b"abcdEFGHIJKL");
- {
- let buf = ring.get_unallocated(0, 8);
- buf.copy_from_slice(b"ABCD");
- }
- assert_eq!(&ring.storage[..], b"ABCDEFGHIJKL");
- }
- #[test]
- fn test_buffer_get_allocated() {
- let mut ring = RingBuffer::new(vec![b'.'; 12]);;
- assert_eq!(ring.get_allocated(16, 4), b"");
- assert_eq!(ring.get_allocated(0, 4), b"");
- ring.enqueue_slice(b"abcd");
- assert_eq!(ring.get_allocated(0, 8), b"abcd");
- ring.enqueue_slice(b"efghijkl");
- ring.dequeue_many(4).copy_from_slice(b"....");
- assert_eq!(ring.get_allocated(4, 8), b"ijkl");
- ring.enqueue_slice(b"abcd");
- assert_eq!(ring.get_allocated(4, 8), b"ijkl");
- }
- }
|