1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347
//! Threads that can borrow variables from the stack. //! //! Create a scope when spawned threads need to access variables on the stack: //! //! ``` //! use crossbeam_utils::thread; //! //! let people = vec![ //! "Alice".to_string(), //! "Bob".to_string(), //! "Carol".to_string(), //! ]; //! //! thread::scope(|scope| { //! for person in &people { //! scope.spawn(move |_| { //! println!("Hello, {}!", person); //! }); //! } //! }).unwrap(); //! ``` //! //! # Why scoped threads? //! //! Suppose we wanted to re-write the previous example using plain threads: //! //! ```ignore //! use std::thread; //! //! let people = vec![ //! "Alice".to_string(), //! "Bob".to_string(), //! "Carol".to_string(), //! ]; //! //! let mut threads = Vec::new(); //! //! for person in &people { //! threads.push(thread::spawn(move |_| { //! println!("Hello, {}!", person); //! })); //! } //! //! for thread in threads { //! thread.join().unwrap(); //! } //! ``` //! //! This doesn't work because the borrow checker complains about `people` not living long enough: //! //! ```text //! error[E0597]: `people` does not live long enough //! --> src/main.rs:12:20 //! | //! 12 | for person in &people { //! | ^^^^^^ borrowed value does not live long enough //! ... //! 21 | } //! | - borrowed value only lives until here //! | //! = note: borrowed value must be valid for the static lifetime... //! ``` //! //! The problem here is that spawned threads are not allowed to borrow variables on stack because //! the compiler cannot prove they will be joined before `people` is destroyed. //! //! Scoped threads are a mechanism to guarantee to the compiler that spawned threads will be joined //! before the scope ends. //! //! # How scoped threads work //! //! If a variable is borrowed by a thread, the thread must complete before the variable is //! destroyed. Threads spawned using [`std::thread::spawn`] can only borrow variables with the //! `'static` lifetime because the borrow checker cannot be sure when the thread will complete. //! //! A scope creates a clear boundary between variables outside the scope and threads inside the //! scope. Whenever a scope spawns a thread, it promises to join the thread before the scope ends. //! This way we guarantee to the borrow checker that scoped threads only live within the scope and //! can safely access variables outside it. //! //! [`std::thread::spawn`]: https://doc.rust-lang.org/std/thread/fn.spawn.html use std::fmt; use std::io; use std::marker::PhantomData; use std::mem; use std::panic; use std::sync::{Arc, Mutex, mpsc}; use std::thread; type SharedVec<T> = Arc<Mutex<Vec<T>>>; type SharedOption<T> = Arc<Mutex<Option<T>>>; /// Creates a new `Scope` for [*scoped thread spawning*](struct.Scope.html#method.spawn). /// /// No matter what happens, before the `Scope` is dropped, it is guaranteed that all the unjoined /// spawned scoped threads are joined. /// /// `thread::scope()` returns `Ok(())` if all the unjoined spawned threads did not panic. It returns /// `Err(e)` if one of them panics with `e`. If many of them panic, it is still guaranteed that all /// the threads are joined, and `thread::scope()` returns `Err(e)` with `e` from a panicking thread. /// /// # Examples /// /// Creating and using a scope: /// /// ``` /// crossbeam_utils::thread::scope(|scope| { /// scope.spawn(|_| println!("Exiting scope")); /// scope.spawn(|_| println!("Running child thread in scope")); /// }).unwrap(); /// ``` pub fn scope<'env, F, R>(f: F) -> thread::Result<R> where F: FnOnce(&Scope<'env>) -> R, { let (tx, rx) = mpsc::channel(); let scope = Scope::<'env> { handles: SharedVec::default(), chan: tx, _marker: PhantomData, }; // Execute the scoped function, but catch any panics. let result = panic::catch_unwind(panic::AssertUnwindSafe(|| f(&scope))); // Wait until all nested scopes are dropped. drop(scope.chan); let _ = rx.recv(); // Join all remaining spawned threads. let panics: Vec<_> = { let mut handles = scope.handles.lock().unwrap(); // Filter handles that haven't been joined, join them, and collect errors. let panics = handles .drain(..) .filter_map(|handle| handle.lock().unwrap().take()) .filter_map(|handle| handle.join().err()) .collect(); panics }; // If `f` has panicked, resume unwinding. // If any of the child threads have panicked, return the panic errors. // Otherwise, everything is OK and return the result of `f`. match result { Err(err) => panic::resume_unwind(err), Ok(res) => { if panics.is_empty() { Ok(res) } else { Err(Box::new(panics)) } } } } /// A scope for spawning threads. pub struct Scope<'env> { /// The list of the thread join handles. handles: SharedVec<SharedOption<thread::JoinHandle<()>>>, /// Used to wait until all subscopes all dropped. chan: mpsc::Sender<()>, /// Borrows data with invariant lifetime `'env`. _marker: PhantomData<&'env mut &'env ()>, } unsafe impl<'env> Sync for Scope<'env> {} impl<'env> Scope<'env> { /// Create a scoped thread. /// /// `spawn` is similar to the [`spawn`] function in Rust's standard library. The difference is /// that this thread is scoped, meaning that it's guaranteed to terminate before the current /// stack frame goes away, allowing you to reference the parent stack frame directly. This is /// ensured by having the parent thread join on the child thread before the scope exits. /// /// [`spawn`]: https://doc.rust-lang.org/std/thread/fn.spawn.html pub fn spawn<'scope, F, T>(&'scope self, f: F) -> ScopedJoinHandle<'scope, T> where F: FnOnce(&Scope<'env>) -> T, F: Send + 'env, T: Send + 'env, { self.builder().spawn(f).unwrap() } /// Generates the base configuration for spawning a scoped thread, from which configuration /// methods can be chained. pub fn builder<'scope>(&'scope self) -> ScopedThreadBuilder<'scope, 'env> { ScopedThreadBuilder { scope: self, builder: thread::Builder::new(), } } } impl<'env> fmt::Debug for Scope<'env> { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { write!(f, "Scope {{ ... }}") } } /// Scoped thread configuration. Provides detailed control over the properties and behavior of new /// scoped threads. #[derive(Debug)] pub struct ScopedThreadBuilder<'scope, 'env: 'scope> { scope: &'scope Scope<'env>, builder: thread::Builder, } impl<'scope, 'env> ScopedThreadBuilder<'scope, 'env> { /// Names the thread-to-be. Currently the name is used for identification only in panic /// messages. pub fn name(mut self, name: String) -> ScopedThreadBuilder<'scope, 'env> { self.builder = self.builder.name(name); self } /// Sets the size of the stack for the new thread. pub fn stack_size(mut self, size: usize) -> ScopedThreadBuilder<'scope, 'env> { self.builder = self.builder.stack_size(size); self } /// Spawns a new thread, and returns a join handle for it. pub fn spawn<F, T>(self, f: F) -> io::Result<ScopedJoinHandle<'scope, T>> where F: FnOnce(&Scope<'env>) -> T, F: Send + 'env, T: Send + 'env, { // The result of `f` will be stored here. let result = SharedOption::default(); // Spawn the thread and grab its join handle and thread handle. let (handle, thread) = { let result = Arc::clone(&result); // A clone of the scope that will be moved into the new thread. let scope = Scope::<'env> { handles: Arc::clone(&self.scope.handles), chan: self.scope.chan.clone(), _marker: PhantomData, }; // Spawn the thread. let handle = { let closure = move || { // Make sure the scope is inside the closure with the proper `'env` lifetime. let scope: Scope<'env> = scope; // Run the closure. let res = f(&scope); // Store the result if the closure didn't panic. *result.lock().unwrap() = Some(res); }; // Change the type of `closure` from `FnOnce() -> T` to `FnMut() -> T`. let mut closure = Some(closure); let closure = move || closure.take().unwrap()(); // Allocate `clsoure` on the heap and erase the `'env` bound. let closure: Box<FnMut() + Send + 'env> = Box::new(closure); let closure: Box<FnMut() + Send + 'static> = unsafe { mem::transmute(closure) }; // Finally, spawn the closure. let mut closure = closure; self.builder.spawn(move || closure())? }; let thread = handle.thread().clone(); let handle = Arc::new(Mutex::new(Some(handle))); (handle, thread) }; // Add the handle to the shared list of join handles. self.scope.handles.lock().unwrap().push(Arc::clone(&handle)); Ok(ScopedJoinHandle { handle, result, thread, _marker: PhantomData, }) } } unsafe impl<'scope, T> Send for ScopedJoinHandle<'scope, T> {} unsafe impl<'scope, T> Sync for ScopedJoinHandle<'scope, T> {} /// A handle to a scoped thread pub struct ScopedJoinHandle<'scope, T> { /// A join handle to the spawned thread. handle: SharedOption<thread::JoinHandle<()>>, /// Holds the result of the inner closure. result: SharedOption<T>, /// A handle to the the spawned thread. thread: thread::Thread, /// Borrows the parent scope with lifetime `'scope`. _marker: PhantomData<&'scope ()>, } impl<'scope, T> ScopedJoinHandle<'scope, T> { /// Waits for the associated thread to finish. /// /// If the child thread panics, [`Err`] is returned with the parameter given to [`panic`]. /// /// [`Err`]: https://doc.rust-lang.org/std/result/enum.Result.html#variant.Err /// [`panic`]: https://doc.rust-lang.org/std/macro.panic.html /// /// # Panics /// /// This function may panic on some platforms if a thread attempts to join itself or otherwise /// may create a deadlock with joining threads. pub fn join(self) -> thread::Result<T> { // Take out the handle. The handle will surely be available because the root scope waits // for nested scopes before joining remaining threads. let handle = self.handle.lock().unwrap().take().unwrap(); // Join the thread and then take the result out of its inner closure. handle .join() .map(|()| self.result.lock().unwrap().take().unwrap()) } /// Gets the underlying [`std::thread::Thread`] handle. /// /// [`std::thread::Thread`]: https://doc.rust-lang.org/std/thread/struct.Thread.html pub fn thread(&self) -> &thread::Thread { &self.thread } } impl<'scope, T> fmt::Debug for ScopedJoinHandle<'scope, T> { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { write!(f, "ScopedJoinHandle {{ ... }}") } }