Преглед на файлове

Implement memtrimming for the local allocator.

Memtrimming (releasing memory to a higher allocator or the OS) is
important for avoiding accumulation. This commit introduces memtrimming
for local allocators to avoid fragmentation and stubs accumulating in
local allocators.

By keeping track of how many bytes that are currently in the bookkeeper,
we can estimate the level of fragmentation and thus decide if we should
memtrim (i.e. free to the global allocator).

The next thing is to implement global memtrimming.

We do a few minor changes as well:

- Refactor for_each to use a new method, `pop_iter` a iterator popping
  elements of the vector.
- Introduce a new field `total_bytes` keeping track of the number of
  bytes in the bookkeeper.
- Move the initializers of the allocators to methods defined on the type
  itself.
- Introduce an `on_new_memory()` method, which is called whenever new
  memory is introduced.
ticki преди 8 години
родител
ревизия
f679d9ced0
променени са 4 файла, в които са добавени 213 реда и са изтрити 87 реда
  1. 104 64
      src/allocator.rs
  2. 1 1
      src/block.rs
  3. 87 21
      src/bookkeeper.rs
  4. 21 1
      src/vec.rs

+ 104 - 64
src/allocator.rs

@@ -19,73 +19,11 @@ type ThreadLocalAllocator = MoveCell<Option<LazyInit<fn() -> LocalAllocator, Loc
 /// The global default allocator.
 // TODO: Remove these filthy function pointers.
 static GLOBAL_ALLOCATOR: sync::Mutex<LazyInit<fn() -> GlobalAllocator, GlobalAllocator>> =
-    sync::Mutex::new(LazyInit::new(global_init));
+    sync::Mutex::new(LazyInit::new(GlobalAllocator::init));
 #[cfg(feature = "tls")]
 tls! {
     /// The thread-local allocator.
-    static THREAD_ALLOCATOR: ThreadLocalAllocator = MoveCell::new(Some(LazyInit::new(local_init)));
-}
-
-/// Initialize the global allocator.
-fn global_init() -> GlobalAllocator {
-    // The initial acquired segment.
-    let (aligner, initial_segment, excessive) =
-        brk::get(4 * bookkeeper::EXTRA_ELEMENTS * mem::size_of::<Block>(), mem::align_of::<Block>());
-
-    // Initialize the new allocator.
-    let mut res = GlobalAllocator {
-        inner: Bookkeeper::new(unsafe {
-            Vec::from_raw_parts(initial_segment, 0)
-        }),
-    };
-
-    // Free the secondary space.
-    res.push(aligner);
-    res.push(excessive);
-
-    res
-}
-
-/// Initialize the local allocator.
-#[cfg(feature = "tls")]
-fn local_init() -> LocalAllocator {
-    /// The destructor of the local allocator.
-    ///
-    /// This will simply free everything to the global allocator.
-    extern fn dtor(alloc: &ThreadLocalAllocator) {
-        // This is important! The thread destructors guarantee no other, and thus one could use the
-        // allocator _after_ this destructor have been finished. In fact, this is a real problem,
-        // and happens when using `Arc` and terminating the main thread, for this reason we place
-        // `None` as a permanent marker indicating that the allocator is deinitialized. After such
-        // a state is in place, all allocation calls will be redirected to the global allocator,
-        // which is of course still usable at this moment.
-        let alloc = alloc.replace(None).expect("Thread-local allocator is already freed.");
-
-        // Lock the global allocator.
-        let mut global_alloc = GLOBAL_ALLOCATOR.lock();
-        let global_alloc = global_alloc.get();
-
-        // TODO: we know this is sorted, so we could abuse that fact to faster insertion in the
-        // global allocator.
-
-        alloc.into_inner().inner.for_each(move |block| global_alloc.free(block));
-    }
-
-    // The initial acquired segment.
-    let initial_segment = GLOBAL_ALLOCATOR
-        .lock()
-        .get()
-        .alloc(4 * bookkeeper::EXTRA_ELEMENTS * mem::size_of::<Block>(), mem::align_of::<Block>());
-
-    unsafe {
-        // Register the thread destructor on the current thread.
-        THREAD_ALLOCATOR.register_thread_destructor(dtor)
-            .expect("Unable to register a thread destructor.");
-
-        LocalAllocator {
-            inner: Bookkeeper::new(Vec::from_raw_parts(initial_segment, 0)),
-        }
-    }
+    static THREAD_ALLOCATOR: ThreadLocalAllocator = MoveCell::new(Some(LazyInit::new(LocalAllocator::init)));
 }
 
 /// Temporarily get the allocator.
@@ -175,6 +113,28 @@ struct GlobalAllocator {
     inner: Bookkeeper,
 }
 
+impl GlobalAllocator {
+    /// Initialize the global allocator.
+    fn init() -> GlobalAllocator {
+        // The initial acquired segment.
+        let (aligner, initial_segment, excessive) =
+            brk::get(4 * bookkeeper::EXTRA_ELEMENTS * mem::size_of::<Block>(), mem::align_of::<Block>());
+
+        // Initialize the new allocator.
+        let mut res = GlobalAllocator {
+            inner: Bookkeeper::new(unsafe {
+                Vec::from_raw_parts(initial_segment, 0)
+            }),
+        };
+
+        // Free the secondary space.
+        res.push(aligner);
+        res.push(excessive);
+
+        res
+    }
+}
+
 derive_deref!(GlobalAllocator, Bookkeeper);
 
 impl Allocator for GlobalAllocator {
@@ -202,6 +162,69 @@ pub struct LocalAllocator {
     inner: Bookkeeper,
 }
 
+impl LocalAllocator {
+    /// Initialize the local allocator.
+    #[cfg(feature = "tls")]
+    fn init() -> LocalAllocator {
+        /// The destructor of the local allocator.
+        ///
+        /// This will simply free everything to the global allocator.
+        extern fn dtor(alloc: &ThreadLocalAllocator) {
+            // This is important! The thread destructors guarantee no other, and thus one could use the
+            // allocator _after_ this destructor have been finished. In fact, this is a real problem,
+            // and happens when using `Arc` and terminating the main thread, for this reason we place
+            // `None` as a permanent marker indicating that the allocator is deinitialized. After such
+            // a state is in place, all allocation calls will be redirected to the global allocator,
+            // which is of course still usable at this moment.
+            let alloc = alloc.replace(None).expect("Thread-local allocator is already freed.");
+
+            // Lock the global allocator.
+            let mut global_alloc = GLOBAL_ALLOCATOR.lock();
+            let global_alloc = global_alloc.get();
+
+            // TODO: we know this is sorted, so we could abuse that fact to faster insertion in the
+            // global allocator.
+
+            alloc.into_inner().inner.for_each(move |block| global_alloc.free(block));
+        }
+
+        // The initial acquired segment.
+        let initial_segment = GLOBAL_ALLOCATOR
+            .lock()
+            .get()
+            .alloc(4 * bookkeeper::EXTRA_ELEMENTS * mem::size_of::<Block>(), mem::align_of::<Block>());
+
+        unsafe {
+            // Register the thread destructor on the current thread.
+            THREAD_ALLOCATOR.register_thread_destructor(dtor)
+                .expect("Unable to register a thread destructor.");
+
+            LocalAllocator {
+                inner: Bookkeeper::new(Vec::from_raw_parts(initial_segment, 0)),
+            }
+        }
+    }
+
+    /// Shuld we memtrim this allocator?
+    ///
+    /// The idea is to free memory to the global allocator to unify small stubs and avoid
+    /// fragmentation and thread accumulation.
+    fn should_memtrim(&self) -> bool {
+        // TODO: Tweak this.
+
+        /// The fragmentation scale constant.
+        ///
+        /// This is used for determining the minimum avarage block size before memtrimming.
+        const FRAGMENTATION_SCALE: usize = 10;
+        /// The local memtrim limit.
+        ///
+        /// Whenever an allocator has more free bytes than this value, it will be memtrimmed.
+        const LOCAL_MEMTRIM_LIMIT: usize = 16384;
+
+        self.total_bytes() < FRAGMENTATION_SCALE * self.len() || self.total_bytes() > LOCAL_MEMTRIM_LIMIT
+    }
+}
+
 #[cfg(feature = "tls")]
 derive_deref!(LocalAllocator, Bookkeeper);
 
@@ -213,6 +236,23 @@ impl Allocator for LocalAllocator {
         // due to freeing excessive blocks would change the order.
         GLOBAL_ALLOCATOR.lock().get().alloc(size, align)
     }
+
+    #[inline]
+    fn on_new_memory(&mut self) {
+        if self.should_memtrim() {
+            // Lock the global allocator.
+            let mut global_alloc = GLOBAL_ALLOCATOR.lock();
+            let global_alloc = global_alloc.get();
+
+            while let Some(block) = self.pop() {
+                // Pop'n'free.
+                global_alloc.free(block);
+
+                // Memtrim 'till we can't memtrim anymore.
+                if !self.should_memtrim() { break; }
+            }
+        }
+    }
 }
 
 /// Allocate a block of memory.

+ 1 - 1
src/block.rs

@@ -54,7 +54,7 @@ impl Block {
             ptr: unsafe {
                 Pointer::new(sys::sbrk(size as isize).unwrap_or_else(|()| fail::oom()))
             },
-        }
+        }.mark_uninitialized()
     }
 
     /// Create an empty block starting at `ptr`.

+ 87 - 21
src/bookkeeper.rs

@@ -52,6 +52,8 @@ pub struct Bookkeeper {
     /// These are **not** invariants: If these assumpptions are not held, it will simply act strange
     /// (e.g. logic bugs), but not memory unsafety.
     pool: Vec<Block>,
+    /// The total number of bytes in the pool.
+    total_bytes: usize,
     /// Is this bookkeeper currently reserving?
     ///
     /// This is used to avoid unbounded metacircular reallocation (reservation).
@@ -76,6 +78,7 @@ impl Bookkeeper {
         #[cfg(feature = "alloc_id")]
         let res = Bookkeeper {
             pool: vec,
+            total_bytes: 0,
             reserving: false,
             // Increment the ID counter to get a brand new ID.
             id: BOOKKEEPER_ID_COUNTER.fetch_add(1, atomic::Ordering::SeqCst),
@@ -83,6 +86,7 @@ impl Bookkeeper {
         #[cfg(not(feature = "alloc_id"))]
         let res = Bookkeeper {
             pool: vec,
+            total_bytes: 0,
             reserving: false,
         };
 
@@ -158,7 +162,7 @@ impl Bookkeeper {
         log!(self, "Iterating over the blocks of the bookkeeper...");
 
         // Run over all the blocks in the pool.
-        while let Some(i) = self.pool.pop() {
+        for i in self.pool.pop_iter() {
             f(i);
         }
 
@@ -166,6 +170,21 @@ impl Bookkeeper {
         f(Block::from(self.pool));
     }
 
+    /// Pop the top block from the pool.
+    pub fn pop(&mut self) -> Option<Block> {
+        self.pool.pop()
+    }
+
+    /// Get the length of the pool.
+    pub fn len(&self) -> usize {
+        self.pool.len()
+    }
+
+    /// Get the total bytes of memory in the pool.
+    pub fn total_bytes(&self) -> usize {
+        self.total_bytes
+    }
+
     /// Perform consistency checks.
     ///
     /// This will check for the following conditions:
@@ -179,6 +198,8 @@ impl Bookkeeper {
             // Logging.
             log!(self, "Checking...");
 
+            // The total number of bytes.
+            let mut total_bytes = 0;
             // Reverse iterator over the blocks.
             let mut it = self.pool.iter().enumerate().rev();
 
@@ -191,8 +212,12 @@ impl Bookkeeper {
                 // Make sure there are no leading empty blocks.
                 assert!(!x.is_empty(), "The leading block is empty.");
 
+                total_bytes += x.size();
+
                 let mut next = x;
                 for (n, i) in it {
+                    total_bytes += i.size();
+
                     // Check if sorted.
                     assert!(next >= i, "The block pool is not sorted at index, {} ({:?} < {:?}).",
                             n, next, i);
@@ -210,6 +235,10 @@ impl Bookkeeper {
                 // Check for trailing empty blocks.
                 assert!(!self.pool.last().unwrap().is_empty(), "Trailing empty blocks.");
             }
+
+            // Make sure the sum is maintained properly.
+            assert!(total_bytes == self.total_bytes, "The sum is not equal to the `total_bytes` \
+                    field. ({} ≠ {}).", total_bytes, self.total_bytes);
         }
     }
 }
@@ -243,6 +272,9 @@ pub trait Allocator: ops::DerefMut<Target = Bookkeeper> {
     /// prior to call of this function, it should be too after it.
     fn alloc_fresh(&mut self, size: usize, align: usize) -> Block;
 
+    /// Called right before new memory is added to the pool.
+    fn on_new_memory(&mut self) {}
+
     /// Allocate a chunk of memory.
     ///
     /// This function takes a size and an alignment. From these a fitting block is found, to which
@@ -313,13 +345,15 @@ pub trait Allocator: ops::DerefMut<Target = Bookkeeper> {
                 let _ = self.remove_at(n);
             }
 
-            let (res, excessive) = b.split(size);
+            // Update the pool byte count.
+            self.total_bytes -= b.size();
+
+            // Split and mark the block uninitialized to the debugger.
+            let (res, excessive) = b.mark_uninitialized().split(size);
 
-            // Mark the excessive space as free.
             // There are many corner cases that make knowing where to insert it difficult
             // so we search instead.
-            let bound = self.find_bound(&excessive);
-            self.free_bound(bound, excessive);
+            self.free(excessive);
 
             // Check consistency.
             self.check();
@@ -327,8 +361,7 @@ pub trait Allocator: ops::DerefMut<Target = Bookkeeper> {
             debug_assert!(res.size() == size, "Requested space does not match with the returned \
                           block.");
 
-            // Mark the block uninitialized to the debugger.
-            res.mark_uninitialized()
+            res
         } else {
             // No fitting block found. Allocate a new block.
             self.alloc_external(size, align)
@@ -441,8 +474,7 @@ pub trait Allocator: ops::DerefMut<Target = Bookkeeper> {
 
                 // Free the old block.
                 // Allocation may have moved insertion so we search again.
-                let bound = self.find_bound(&block);
-                self.free_bound(bound, block);
+                self.free(block);
 
                 // Check consistency.
                 self.check();
@@ -583,7 +615,9 @@ pub trait Allocator: ops::DerefMut<Target = Bookkeeper> {
 
             // The merging succeeded. We proceed to try to close in the possible gap.
             if ind.start != 0 && self.pool[ind.start - 1].merge_right(&mut block).is_ok() {
+                // Check consistency.
                 self.check();
+
                 return;
             }
         // Dammit, let's try to merge left.
@@ -616,17 +650,26 @@ pub trait Allocator: ops::DerefMut<Target = Bookkeeper> {
         // Check consistency.
         self.check();
 
-        // Mark the block uninitialized to the debugger.
-        res.mark_uninitialized()
+        res
     }
 
     /// Push an element without reserving.
-    fn push(&mut self, mut block: Block) {
+    // TODO: Make `push` and `free` one.
+    fn push(&mut self, block: Block) {
         // Logging.
         log!(self;self.pool.len(), "Pushing {:?}.", block);
 
+        // Mark the block free.
+        let mut block = block.mark_free();
+
         // Short-circuit in case on empty block.
         if !block.is_empty() {
+            // Trigger the new memory event handler.
+            self.on_new_memory();
+
+            // Update the pool byte count.
+            self.total_bytes += block.size();
+
             // Some assertions...
             debug_assert!(self.pool.is_empty() || &block > self.pool.last().unwrap(), "Pushing will \
                           make the list unsorted.");
@@ -640,6 +683,10 @@ pub trait Allocator: ops::DerefMut<Target = Bookkeeper> {
 
             // Reserve space and free the old buffer.
             if let Some(x) = unborrow!(self.reserve(self.pool.len() + 1)) {
+                // `free` handles the count, so we set it back.
+                // TODO: Find a better way to do so.
+                self.total_bytes -= block.size();
+
                 self.free(x);
             }
 
@@ -663,6 +710,10 @@ pub trait Allocator: ops::DerefMut<Target = Bookkeeper> {
                 // Make some assertions.
                 debug_assert!(res.is_ok(), "Push failed (buffer full).");
             } else {
+                // `free` handles the count, so we set it back.
+                // TODO: Find a better way to do so.
+                self.total_bytes -= block.size();
+
                 // Can't push because reserve changed the end of the pool.
                 self.free(block);
             }
@@ -698,6 +749,9 @@ pub trait Allocator: ops::DerefMut<Target = Bookkeeper> {
             // Go back to the original state.
             self.reserving = false;
 
+            // Check consistency.
+            self.check();
+
             Some(self.pool.refill(new_buf))
         } else {
             None
@@ -782,6 +836,9 @@ pub trait Allocator: ops::DerefMut<Target = Bookkeeper> {
         debug_assert!(self.find(&block) == ind, "Block is not inserted at the appropriate index.");
         debug_assert!(!block.is_empty(), "Inserting an empty block.");
 
+        // Trigger the new memory event handler.
+        self.on_new_memory();
+
         // Find the next gap, where a used block were.
         let gap = self.pool
             .iter()
@@ -829,6 +886,8 @@ pub trait Allocator: ops::DerefMut<Target = Bookkeeper> {
                       // `if` block above with the conditional that `gap` is `None`, which is the
                       // case where the closure is evaluated.
 
+            // Update the pool byte count.
+            self.total_bytes += block.size();
             // Mark it free and set the element.
             ptr::write(self.pool.get_unchecked_mut(ind), block.mark_free());
         }
@@ -845,25 +904,24 @@ pub trait Allocator: ops::DerefMut<Target = Bookkeeper> {
     /// Remove a block.
     fn remove_at(&mut self, ind: usize) -> Block {
         // Logging.
-        log!(self;ind, "Removing block.");
+        log!(self;ind, "Removing block at {}.", ind);
 
-        if ind + 1 == self.pool.len() {
-            let res = self.pool[ind].pop();
+        let res = if ind + 1 == self.pool.len() {
+            let block = self.pool[ind].pop();
             // Make sure there are no trailing empty blocks.
             let new_len = self.pool.len() - self.pool.iter().rev().take_while(|x| x.is_empty()).count();
 
             // Truncate the vector.
             self.pool.truncate(new_len);
 
-            // Mark the block uninitialized to the debugger.
-            res.mark_uninitialized()
+            block
         } else {
             // Calculate the upper and lower bound
             let empty = self.pool[ind + 1].empty_left();
             let empty2 = empty.empty_left();
 
             // Replace the block at `ind` with the left empty block from `ind + 1`.
-            let res = mem::replace(&mut self.pool[ind], empty);
+            let block = mem::replace(&mut self.pool[ind], empty);
 
             // Iterate over the pool from `ind` and down.
             let skip = self.pool.len() - ind;
@@ -872,8 +930,16 @@ pub trait Allocator: ops::DerefMut<Target = Bookkeeper> {
                 *place = empty2.empty_left();
             }
 
-            // Mark the block uninitialized to the debugger.
-            res.mark_uninitialized()
-        }
+            block
+        };
+
+        // Update the pool byte count.
+        self.total_bytes -= res.size();
+
+        // Check consistency.
+        self.check();
+
+        // Mark the block uninitialized to the debugger.
+        res.mark_uninitialized()
     }
 }

+ 21 - 1
src/vec.rs

@@ -10,7 +10,6 @@ use leak::Leak;
 ///
 /// This does not perform allocation nor reallaction, thus these have to be done manually.
 /// Moreover, no destructors are called, making it possible to leak memory.
-// NOTE  ^^^^^^^  This derivation should be carefully reviewed when this struct is changed.
 pub struct Vec<T: Leak> {
     /// A pointer to the start of the buffer.
     ptr: Pointer<T>,
@@ -129,6 +128,27 @@ impl<T: Leak> Vec<T> {
 
         self.len = len;
     }
+
+    /// Yield an iterator popping from the vector.
+    pub fn pop_iter(&mut self) -> PopIter<T> {
+        PopIter {
+            vec: self,
+        }
+    }
+}
+
+/// An iterator popping blocks from the bookkeeper.
+pub struct PopIter<'a, T: 'a + Leak> {
+    vec: &'a mut Vec<T>,
+}
+
+impl<'a, T: Leak> Iterator for PopIter<'a, T> {
+    type Item = T;
+
+    #[inline]
+    fn next(&mut self) -> Option<T> {
+        self.vec.pop()
+    }
 }
 
 // TODO: Remove this in favour of `derive` when rust-lang/rust#35263 is fixed.