1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333
// This module provides a relatively simple thread-safe pool of reusable // objects. For the most part, it's implemented by a stack represented by a // Mutex<Vec<T>>. It has one small trick: because unlocking a mutex is somewhat // costly, in the case where a pool is accessed by the first thread that tried // to get a value, we bypass the mutex. Here are some benchmarks showing the // difference. // // 1) misc::anchored_literal_long_non_match 21 (18571 MB/s) // 2) misc::anchored_literal_long_non_match 107 (3644 MB/s) // 3) misc::anchored_literal_long_non_match 45 (8666 MB/s) // 4) misc::anchored_literal_long_non_match 19 (20526 MB/s) // // (1) represents our baseline: the master branch at the time of writing when // using the 'thread_local' crate to implement the pool below. // // (2) represents a naive pool implemented completely via Mutex<Vec<T>>. There // is no special trick for bypassing the mutex. // // (3) is the same as (2), except it uses Mutex<Vec<Box<T>>>. It is twice as // fast because a Box<T> is much smaller than the T we use with a Pool in this // crate. So pushing and popping a Box<T> from a Vec is quite a bit faster // than for T. // // (4) is the same as (3), but with the trick for bypassing the mutex in the // case of the first-to-get thread. // // Why move off of thread_local? Even though (4) is a hair faster than (1) // above, this was not the main goal. The main goal was to move off of // thread_local and find a way to *simply* re-capture some of its speed for // regex's specific case. So again, why move off of it? The *primary* reason is // because of memory leaks. See https://github.com/rust-lang/regex/issues/362 // for example. (Why do I want it to be simple? Well, I suppose what I mean is, // "use as much safe code as possible to minimize risk and be as sure as I can // be that it is correct.") // // My guess is that the thread_local design is probably not appropriate for // regex since its memory usage scales to the number of active threads that // have used a regex, where as the pool below scales to the number of threads // that simultaneously use a regex. While neither case permits contraction, // since we own the pool data structure below, we can add contraction if a // clear use case pops up in the wild. More pressingly though, it seems that // there are at least some use case patterns where one might have many threads // sitting around that might have used a regex at one point. While thread_local // does try to reuse space previously used by a thread that has since stopped, // its maximal memory usage still scales with the total number of active // threads. In contrast, the pool below scales with the total number of threads // *simultaneously* using the pool. The hope is that this uses less memory // overall. And if it doesn't, we can hopefully tune it somehow. // // It seems that these sort of conditions happen frequently // in FFI inside of other more "managed" languages. This was // mentioned in the issue linked above, and also mentioned here: // https://github.com/BurntSushi/rure-go/issues/3. And in particular, users // confirm that disabling the use of thread_local resolves the leak. // // There were other weaker reasons for moving off of thread_local as well. // Namely, at the time, I was looking to reduce dependencies. And for something // like regex, maintenance can be simpler when we own the full dependency tree. use std::panic::{RefUnwindSafe, UnwindSafe}; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Mutex; /// An atomic counter used to allocate thread IDs. static COUNTER: AtomicUsize = AtomicUsize::new(1); thread_local!( /// A thread local used to assign an ID to a thread. static THREAD_ID: usize = { let next = COUNTER.fetch_add(1, Ordering::Relaxed); // SAFETY: We cannot permit the reuse of thread IDs since reusing a // thread ID might result in more than one thread "owning" a pool, // and thus, permit accessing a mutable value from multiple threads // simultaneously without synchronization. The intent of this panic is // to be a sanity check. It is not expected that the thread ID space // will actually be exhausted in practice. // // This checks that the counter never wraps around, since atomic // addition wraps around on overflow. if next == 0 { panic!("regex: thread ID allocation space exhausted"); } next }; ); /// The type of the function used to create values in a pool when the pool is /// empty and the caller requests one. type CreateFn<T> = Box<dyn Fn() -> T + Send + Sync + UnwindSafe + RefUnwindSafe + 'static>; /// A simple thread safe pool for reusing values. /// /// Getting a value out comes with a guard. When that guard is dropped, the /// value is automatically put back in the pool. /// /// A Pool<T> impls Sync when T is Send (even if it's not Sync). This means /// that T can use interior mutability. This is possible because a pool is /// guaranteed to provide a value to exactly one thread at any time. /// /// Currently, a pool never contracts in size. Its size is proportional to the /// number of simultaneous uses. pub struct Pool<T> { /// A stack of T values to hand out. These are used when a Pool is /// accessed by a thread that didn't create it. stack: Mutex<Vec<Box<T>>>, /// A function to create more T values when stack is empty and a caller /// has requested a T. create: CreateFn<T>, /// The ID of the thread that owns this pool. The owner is the thread /// that makes the first call to 'get'. When the owner calls 'get', it /// gets 'owner_val' directly instead of returning a T from 'stack'. /// See comments elsewhere for details, but this is intended to be an /// optimization for the common case that makes getting a T faster. /// /// It is initialized to a value of zero (an impossible thread ID) as a /// sentinel to indicate that it is unowned. owner: AtomicUsize, /// A value to return when the caller is in the same thread that created /// the Pool. owner_val: T, } // SAFETY: Since we want to use a Pool from multiple threads simultaneously // behind an Arc, we need for it to be Sync. In cases where T is sync, Pool<T> // would be Sync. However, since we use a Pool to store mutable scratch space, // we wind up using a T that has interior mutability and is thus itself not // Sync. So what we *really* want is for our Pool<T> to by Sync even when T is // not Sync (but is at least Send). // // The only non-sync aspect of a Pool is its 'owner_val' field, which is used // to implement faster access to a pool value in the common case of a pool // being accessed in the same thread in which it was created. The 'stack' field // is also shared, but a Mutex<T> where T: Send is already Sync. So we only // need to worry about 'owner_val'. // // The key is to guarantee that 'owner_val' can only ever be accessed from one // thread. In our implementation below, we guarantee this by only returning the // 'owner_val' when the ID of the current thread matches the ID of the thread // that created the Pool. Since this can only ever be one thread, it follows // that only one thread can access 'owner_val' at any point in time. Thus, it // is safe to declare that Pool<T> is Sync when T is Send. // // NOTE: It would also be possible to make the owning thread be the *first* // thread that tries to get a value out of a Pool. However, the current // implementation is a little simpler and it's not clear if making the first // thread (rather than the creating thread) is meaningfully better. // // If there is a way to achieve our performance goals using safe code, then // I would very much welcome a patch. As it stands, the implementation below // tries to balance safety with performance. The case where a Regex is used // from multiple threads simultaneously will suffer a bit since getting a cache // will require unlocking a mutex. unsafe impl<T: Send> Sync for Pool<T> {} impl<T: ::std::fmt::Debug> ::std::fmt::Debug for Pool<T> { fn fmt(&self, f: &mut ::std::fmt::Formatter) -> ::std::fmt::Result { f.debug_struct("Pool") .field("stack", &self.stack) .field("owner", &self.owner) .field("owner_val", &self.owner_val) .finish() } } /// A guard that is returned when a caller requests a value from the pool. /// /// The purpose of the guard is to use RAII to automatically put the value back /// in the pool once it's dropped. #[derive(Debug)] pub struct PoolGuard<'a, T: 'a + Send> { /// The pool that this guard is attached to. pool: &'a Pool<T>, /// This is None when the guard represents the special "owned" value. In /// which case, the value is retrieved from 'pool.owner_val'. value: Option<Box<T>>, } impl<T: Send> Pool<T> { /// Create a new pool. The given closure is used to create values in the /// pool when necessary. pub fn new(create: CreateFn<T>) -> Pool<T> { let owner = AtomicUsize::new(0); let owner_val = create(); Pool { stack: Mutex::new(vec![]), create, owner, owner_val } } /// Get a value from the pool. The caller is guaranteed to have exclusive /// access to the given value. /// /// Note that there is no guarantee provided about which value in the /// pool is returned. That is, calling get, dropping the guard (causing /// the value to go back into the pool) and then calling get again is NOT /// guaranteed to return the same value received in the first get call. #[cfg_attr(feature = "perf-inline", inline(always))] pub fn get(&self) -> PoolGuard<T> { // Our fast path checks if the caller is the thread that "owns" this // pool. Or stated differently, whether it is the first thread that // tried to extract a value from the pool. If it is, then we can return // a T to the caller without going through a mutex. // // SAFETY: We must guarantee that only one thread gets access to this // value. Since a thread is uniquely identified by the THREAD_ID thread // local, it follows that is the caller's thread ID is equal to the // owner, then only one thread may receive this value. let caller = THREAD_ID.with(|id| *id); let owner = self.owner.load(Ordering::Relaxed); if caller == owner { return self.guard_owned(); } self.get_slow(caller, owner) } /// This is the "slow" version that goes through a mutex to pop an /// allocated value off a stack to return to the caller. (Or, if the stack /// is empty, a new value is created.) /// /// If the pool has no owner, then this will set the owner. #[cold] fn get_slow(&self, caller: usize, owner: usize) -> PoolGuard<T> { use std::sync::atomic::Ordering::Relaxed; if owner == 0 { // The sentinel 0 value means this pool is not yet owned. We // try to atomically set the owner. If we do, then this thread // becomes the owner and we can return a guard that represents // the special T for the owner. let res = self.owner.compare_exchange(0, caller, Relaxed, Relaxed); if res.is_ok() { return self.guard_owned(); } } let mut stack = self.stack.lock().unwrap(); let value = match stack.pop() { None => Box::new((self.create)()), Some(value) => value, }; self.guard_stack(value) } /// Puts a value back into the pool. Callers don't need to call this. Once /// the guard that's returned by 'get' is dropped, it is put back into the /// pool automatically. fn put(&self, value: Box<T>) { let mut stack = self.stack.lock().unwrap(); stack.push(value); } /// Create a guard that represents the special owned T. fn guard_owned(&self) -> PoolGuard<'_, T> { PoolGuard { pool: self, value: None } } /// Create a guard that contains a value from the pool's stack. fn guard_stack(&self, value: Box<T>) -> PoolGuard<'_, T> { PoolGuard { pool: self, value: Some(value) } } } impl<'a, T: Send> PoolGuard<'a, T> { /// Return the underlying value. pub fn value(&self) -> &T { match self.value { None => &self.pool.owner_val, Some(ref v) => &**v, } } } impl<'a, T: Send> Drop for PoolGuard<'a, T> { #[cfg_attr(feature = "perf-inline", inline(always))] fn drop(&mut self) { if let Some(value) = self.value.take() { self.pool.put(value); } } } #[cfg(test)] mod tests { use std::panic::{RefUnwindSafe, UnwindSafe}; use super::*; #[test] fn oibits() { use exec::ProgramCache; fn has_oibits<T: Send + Sync + UnwindSafe + RefUnwindSafe>() {} has_oibits::<Pool<ProgramCache>>(); } // Tests that Pool implements the "single owner" optimization. That is, the // thread that first accesses the pool gets its own copy, while all other // threads get distinct copies. #[test] fn thread_owner_optimization() { use std::cell::RefCell; use std::sync::Arc; let pool: Arc<Pool<RefCell<Vec<char>>>> = Arc::new(Pool::new(Box::new(|| RefCell::new(vec!['a'])))); pool.get().value().borrow_mut().push('x'); let pool1 = pool.clone(); let t1 = std::thread::spawn(move || { let guard = pool1.get(); let v = guard.value(); v.borrow_mut().push('y'); }); let pool2 = pool.clone(); let t2 = std::thread::spawn(move || { let guard = pool2.get(); let v = guard.value(); v.borrow_mut().push('z'); }); t1.join().unwrap(); t2.join().unwrap(); // If we didn't implement the single owner optimization, then one of // the threads above is likely to have mutated the [a, x] vec that // we stuffed in the pool before spawning the threads. But since // neither thread was first to access the pool, and because of the // optimization, we should be guaranteed that neither thread mutates // the special owned pool value. // // (Technically this is an implementation detail and not a contract of // Pool's API.) assert_eq!(vec!['a', 'x'], *pool.get().value().borrow()); } }