Closed sammyne closed 4 years ago
Port rust-threadpool
diff -Naur '--exclude=.git' '--exclude=sgx-tests' /root/code/crates-sgx/crates/rust-threadpool/Cargo.lock /root/code/crates-sgx-upstream/rust-threadpool/Cargo.lock --- /root/code/crates-sgx/crates/rust-threadpool/Cargo.lock 2020-04-15 01:22:27.617927909 +0000 +++ /root/code/crates-sgx-upstream/rust-threadpool/Cargo.lock 1970-01-01 00:00:00.000000000 +0000 @@ -1,33 +0,0 @@ -# This file is automatically @generated by Cargo. -# It is not intended for manual editing. -[[package]] -name = "hermit-abi" -version = "0.1.10" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "725cf19794cf90aa94e65050cb4191ff5d8fa87a498383774c47b332e3af952e" -dependencies = [ - "libc", -] - -[[package]] -name = "libc" -version = "0.2.69" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "99e85c08494b21a9054e7fe1374a732aeadaff3980b6990b94bfd3a70f690005" - -[[package]] -name = "num_cpus" -version = "1.12.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "46203554f085ff89c235cd12f7075f3233af9b11ed7c9e16dfe2560d03313ce6" -dependencies = [ - "hermit-abi", - "libc", -] - -[[package]] -name = "threadpool" -version = "1.7.1" -dependencies = [ - "num_cpus", -] diff -Naur '--exclude=.git' '--exclude=sgx-tests' /root/code/crates-sgx/crates/rust-threadpool/Cargo.toml /root/code/crates-sgx-upstream/rust-threadpool/Cargo.toml --- /root/code/crates-sgx/crates/rust-threadpool/Cargo.toml 2020-04-15 01:32:06.637116628 +0000 +++ /root/code/crates-sgx-upstream/rust-threadpool/Cargo.toml 2020-04-15 01:19:03.324644762 +0000 @@ -13,23 +13,6 @@ """ keywords = ["threadpool", "thread", "pool", "threading", "parallelism"] categories = ["concurrency", "os"] -edition = "2018" [dependencies] -#num_cpus = "1.6" - -[dependencies.sgx_tstd] -features = ["thread"] -git = "https://github.com/apache/incubator-teaclave-sgx-sdk.git" -optional = true -tag = "v1.1.1" - -[dependencies.sgx_tunittest] -git = "https://github.com/apache/incubator-teaclave-sgx-sdk.git" -optional = true -tag = "v1.1.1" - -[features] -default = ["mesalock_sgx"] -enclave_unit_test = ["sgx_tunittest"] -mesalock_sgx = ["sgx_tstd"] +num_cpus = "1.6" diff -Naur '--exclude=.git' '--exclude=sgx-tests' /root/code/crates-sgx/crates/rust-threadpool/src/lib.rs /root/code/crates-sgx-upstream/rust-threadpool/src/lib.rs --- /root/code/crates-sgx/crates/rust-threadpool/src/lib.rs 2020-04-15 01:36:53.773483335 +0000 +++ /root/code/crates-sgx-upstream/rust-threadpool/src/lib.rs 2020-04-15 01:19:03.325644824 +0000 @@ -78,21 +78,14 @@ //! assert_eq!(an_atomic.load(Ordering::SeqCst), 23); //! ``` -#![no_std] -#[macro_use] -extern crate sgx_tstd as std; -use std::prelude::v1::*; - -//extern crate num_cpus; +extern crate num_cpus; use std::fmt; +use std::sync::mpsc::{channel, Sender, Receiver}; +use std::sync::{Arc, Mutex, Condvar}; use std::sync::atomic::{AtomicUsize, Ordering}; -use std::sync::mpsc::{channel, Receiver, Sender}; -use std::sync::{Arc, SgxCondvar as Condvar, SgxMutex as Mutex}; use std::thread; -pub const NUM_CPUS: usize = 1; - trait FnBox { fn call_box(self: Box<Self>); } @@ -267,10 +260,10 @@ /// }) /// } /// ``` - // pub fn thread_stack_size(mut self, size: usize) -> Builder { - // self.thread_stack_size = Some(size); - // self - // } + pub fn thread_stack_size(mut self, size: usize) -> Builder { + self.thread_stack_size = Some(size); + self + } /// Finalize the [`Builder`] and build the [`ThreadPool`]. /// @@ -288,7 +281,7 @@ pub fn build(self) -> ThreadPool { let (tx, rx) = channel::<Thunk<'static>>(); - let num_threads = self.num_threads.unwrap_or_else(|| NUM_CPUS); + let num_threads = self.num_threads.unwrap_or_else(num_cpus::get); let shared_data = Arc::new(ThreadPoolSharedData { name: self.thread_name, @@ -300,7 +293,7 @@ active_count: AtomicUsize::new(0), max_thread_count: AtomicUsize::new(num_threads), panic_count: AtomicUsize::new(0), - // stack_size: self.thread_stack_size, + stack_size: self.thread_stack_size, }); // Threadpool threads @@ -325,7 +318,7 @@ active_count: AtomicUsize, max_thread_count: AtomicUsize, panic_count: AtomicUsize, - // stack_size: Option<usize>, + stack_size: Option<usize>, } impl ThreadPoolSharedData { @@ -336,10 +329,9 @@ /// Notify all observers joining this pool if there is no more work to do. fn no_work_notify_all(&self) { if !self.has_work() { - *self - .empty_trigger - .lock() - .expect("Unable to notify all joining threads"); + *self.empty_trigger.lock().expect( + "Unable to notify all joining threads", + ); self.empty_condvar.notify_all(); } } @@ -436,9 +428,9 @@ F: FnOnce() + Send + 'static, { self.shared_data.queued_count.fetch_add(1, Ordering::SeqCst); - self.jobs - .send(Box::new(job)) - .expect("ThreadPool::execute unable to send job into queue."); + self.jobs.send(Box::new(job)).expect( + "ThreadPool::execute unable to send job into queue.", + ); } /// Returns the number of jobs waiting to executed in the pool. @@ -576,10 +568,10 @@ /// ``` pub fn set_num_threads(&mut self, num_threads: usize) { assert!(num_threads >= 1); - let prev_num_threads = self - .shared_data - .max_thread_count - .swap(num_threads, Ordering::Release); + let prev_num_threads = self.shared_data.max_thread_count.swap( + num_threads, + Ordering::Release, + ); if let Some(num_spawn) = num_threads.checked_sub(prev_num_threads) { // Spawn new threads for _ in 0..num_spawn { @@ -629,17 +621,12 @@ let mut lock = self.shared_data.empty_trigger.lock().unwrap(); while generation == self.shared_data.join_generation.load(Ordering::Relaxed) - && self.shared_data.has_work() - { + && self.shared_data.has_work() { lock = self.shared_data.empty_condvar.wait(lock).unwrap(); } // increase generation if we are the first thread to come out of the loop - self.shared_data.join_generation.compare_and_swap( - generation, - generation.wrapping_add(1), - Ordering::SeqCst, - ); + self.shared_data.join_generation.compare_and_swap(generation, generation.wrapping_add(1), Ordering::SeqCst); } } @@ -688,15 +675,17 @@ } } + /// Create a thread pool with one thread per CPU. /// On machines with hyperthreading, /// this will create one thread per hyperthread. impl Default for ThreadPool { fn default() -> Self { - ThreadPool::new(NUM_CPUS) + ThreadPool::new(num_cpus::get()) } } + impl fmt::Debug for ThreadPool { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { f.debug_struct("ThreadPool") @@ -734,17 +723,17 @@ } impl Eq for ThreadPool {} + + + fn spawn_in_pool(shared_data: Arc<ThreadPoolSharedData>) { let mut builder = thread::Builder::new(); if let Some(ref name) = shared_data.name { builder = builder.name(name.clone()); } - // Yu Ding: - // Specifying stack size for SGX thread is meaningless - // Commented out - // if let Some(ref stack_size) = shared_data.stack_size { - // builder = builder.stack_size(stack_size.to_owned()); - // } + if let Some(ref stack_size) = shared_data.stack_size { + builder = builder.stack_size(stack_size.to_owned()); + } builder .spawn(move || { // Will spawn a new thread on panic unless it is cancelled. @@ -760,10 +749,9 @@ let message = { // Only lock jobs for the time it takes // to get a job, not run it. - let lock = shared_data - .job_receiver - .lock() - .expect("Worker thread unable to lock job_receiver"); + let lock = shared_data.job_receiver.lock().expect( + "Worker thread unable to lock job_receiver", + ); lock.recv() }; @@ -787,55 +775,18 @@ .unwrap(); } -// #[cfg(test)] -#[cfg(feature = "enclave_unit_test")] -pub mod test { - use std::prelude::v1::*; - - use sgx_tunittest::*; - - // @dev TEST_TASKS should be smaller than TCSNum in enclave.config.xml - // @dev HeapMaxSize should be large enough - pub fn run_tests() -> usize { - rsgx_unit_tests!( - test_active_count, - test_clone, - test_cloned_eq, - test_debug, - test_empty_pool, - test_join_wavesurfer, - test_massive_task_creation, - test_multi_join, - test_name, - test_no_fun_or_joy, - test_recovery_from_subtask_panic, - test_repeate_join, - test_send, - test_send_shared_data, - test_set_num_threads_decreasing, - test_set_num_threads_increasing, - test_shrink, - test_should_not_panic_on_drop_if_subtasks_panic_after_drop, - test_sync_shared_data, - test_works, - should_panic_test_zero_tasks_panic - ) - } - - fn should_panic_test_zero_tasks_panic() { - should_panic!(test_zero_tasks_panic()); - } - - use super::{Builder, ThreadPool}; - use std::sync::atomic::{AtomicUsize, Ordering}; - use std::sync::mpsc::{channel, sync_channel}; +#[cfg(test)] +mod test { + use super::{ThreadPool, Builder}; use std::sync::{Arc, Barrier}; + use std::sync::atomic::{AtomicUsize, Ordering}; + use std::sync::mpsc::{sync_channel, channel}; use std::thread::{self, sleep}; use std::time::Duration; const TEST_TASKS: usize = 4; - //#[test] + #[test] fn test_set_num_threads_increasing() { let new_thread_amount = TEST_TASKS + 8; let mut pool = ThreadPool::new(TEST_TASKS); @@ -856,14 +807,12 @@ pool.join(); } - //#[test] + #[test] fn test_set_num_threads_decreasing() { let new_thread_amount = 2; let mut pool = ThreadPool::new(TEST_TASKS); for _ in 0..TEST_TASKS { - pool.execute(move || { - 1 + 1; - }); + pool.execute(move || { 1 + 1; }); } pool.set_num_threads(new_thread_amount); for _ in 0..new_thread_amount { @@ -875,7 +824,7 @@ pool.join(); } - //#[test] + #[test] fn test_active_count() { let pool = ThreadPool::new(TEST_TASKS); for _ in 0..2 * TEST_TASKS { @@ -890,28 +839,26 @@ assert_eq!(initialized_count, TEST_TASKS); } - //#[test] + #[test] fn test_works() { let pool = ThreadPool::new(TEST_TASKS); let (tx, rx) = channel(); for _ in 0..TEST_TASKS { let tx = tx.clone(); - pool.execute(move || { - tx.send(1).unwrap(); - }); + pool.execute(move || { tx.send(1).unwrap(); }); } assert_eq!(rx.iter().take(TEST_TASKS).fold(0, |a, b| a + b), TEST_TASKS); } - //#[test] + #[test] #[should_panic] fn test_zero_tasks_panic() { ThreadPool::new(0); } - //#[test] + #[test] fn test_recovery_from_subtask_panic() { let pool = ThreadPool::new(TEST_TASKS); @@ -927,16 +874,15 @@ let (tx, rx) = channel(); for _ in 0..TEST_TASKS { let tx = tx.clone(); - pool.execute(move || { - tx.send(1).unwrap(); - }); + pool.execute(move || { tx.send(1).unwrap(); }); } assert_eq!(rx.iter().take(TEST_TASKS).fold(0, |a, b| a + b), TEST_TASKS); } - //#[test] + #[test] fn test_should_not_panic_on_drop_if_subtasks_panic_after_drop() { + let pool = ThreadPool::new(TEST_TASKS); let waiter = Arc::new(Barrier::new(TEST_TASKS + 1)); @@ -955,7 +901,7 @@ waiter.wait(); } - //#[test] + #[test] fn test_massive_task_creation() { let test_tasks = 4_200_000; @@ -970,6 +916,7 @@ let (b0, b1) = (b0.clone(), b1.clone()); pool.execute(move || { + // Wait until the pool has been filled once. if i < TEST_TASKS { b0.wait(); @@ -996,7 +943,7 @@ ); } - //#[test] + #[test] fn test_shrink() { let test_tasks_begin = TEST_TASKS + 2; @@ -1029,12 +976,13 @@ assert_eq!(pool.active_count(), test_tasks_begin); b1.wait(); + b2.wait(); assert_eq!(pool.active_count(), TEST_TASKS); b3.wait(); } - //#[test] + #[test] fn test_name() { let name = "test"; let mut pool = ThreadPool::with_name(name.to_owned(), 2); @@ -1069,7 +1017,7 @@ } } - //#[test] + #[test] fn test_debug() { let pool = ThreadPool::new(4); let debug = format!("{:?}", pool); @@ -1095,7 +1043,7 @@ ); } - //#[test] + #[test] fn test_repeate_join() { let pool = ThreadPool::with_name("repeate join test".into(), 8); let test_count = Arc::new(AtomicUsize::new(0)); @@ -1123,7 +1071,7 @@ assert_eq!(84, test_count.load(Ordering::Relaxed)); } - //#[test] + #[test] fn test_multi_join() { use std::sync::mpsc::TryRecvError::*; @@ -1167,7 +1115,7 @@ ); } - //#[test] + #[test] fn test_empty_pool() { // Joining an empty pool must return imminently let pool = ThreadPool::new(4); @@ -1177,7 +1125,7 @@ assert!(true); } - //#[test] + #[test] fn test_no_fun_or_joy() { // What happens when you keep adding jobs after a join @@ -1197,15 +1145,13 @@ pool.join(); } - //#[test] + #[test] fn test_clone() { let pool = ThreadPool::with_name("clone example".into(), 2); // This batch of jobs will occupy the pool for some time for _ in 0..6 { - pool.execute(move || { - sleep(Duration::from_secs(2)); - }); + pool.execute(move || { sleep(Duration::from_secs(2)); }); } // The following jobs will be inserted into the pool in a random fashion @@ -1218,13 +1164,13 @@ let (tx, rx) = channel(); for i in 0..42 { let tx = tx.clone(); - pool.execute(move || { - tx.send(i).expect("channel will be waiting"); - }); + pool.execute(move || { tx.send(i).expect("channel will be waiting"); }); } drop(tx); - rx.iter() - .fold(0, |accumulator, element| accumulator + element) + rx.iter().fold( + 0, + |accumulator, element| accumulator + element, + ) }) }; let t1 = { @@ -1236,54 +1182,56 @@ let (tx, rx) = channel(); for i in 1..12 { let tx = tx.clone(); - pool.execute(move || { - tx.send(i).expect("channel will be waiting"); - }); + pool.execute(move || { tx.send(i).expect("channel will be waiting"); }); } drop(tx); - rx.iter() - .fold(1, |accumulator, element| accumulator * element) + rx.iter().fold( + 1, + |accumulator, element| accumulator * element, + ) }) }; assert_eq!( 861, - t0.join() - .expect("thread 0 will return after calculating additions",) + t0.join().expect( + "thread 0 will return after calculating additions", + ) ); assert_eq!( 39916800, - t1.join() - .expect("thread 1 will return after calculating multiplications",) + t1.join().expect( + "thread 1 will return after calculating multiplications", + ) ); } - //#[test] + #[test] fn test_sync_shared_data() { fn assert_sync<T: Sync>() {} assert_sync::<super::ThreadPoolSharedData>(); } - //#[test] + #[test] fn test_send_shared_data() { fn assert_send<T: Send>() {} assert_send::<super::ThreadPoolSharedData>(); } - //#[test] + #[test] fn test_send() { fn assert_send<T: Send>() {} assert_send::<ThreadPool>(); } - //#[test] + #[test] fn test_cloned_eq() { let a = ThreadPool::new(2); assert_eq!(a, a.clone()); } - //#[test] + #[test] /// The scenario is joining threads should not be stuck once their wave /// of joins has completed. So once one thread joining on a pool has /// succeded other threads joining on the same pool must get out even if @@ -1296,9 +1244,8 @@ let n_cycles = 4; let n_workers = 4; let (tx, rx) = channel(); - let builder = Builder::new() - .num_threads(n_workers) - .thread_name("join wavesurfer".into()); + let builder = Builder::new().num_threads(n_workers) + .thread_name("join wavesurfer".into()); let p_waiter = builder.clone().build(); let p_clock = builder.build(); @@ -1326,7 +1273,7 @@ } // prepare three waves of jobs - for i in 0..3 * n_workers { + for i in 0..3*n_workers { let p_clock = p_clock.clone(); let tx = tx.clone(); let wave_clock = wave_clock.clone(); @@ -1350,28 +1297,24 @@ let mut data = vec![]; for (now, after, i) in rx.iter() { let mut dur = after - now; - if dur >= n_cycles - 1 { - dur = n_cycles - 1; + if dur >= n_cycles -1 { + dur = n_cycles -1; } hist[dur] += 1; data.push((now, after, i)); } for (i, n) in hist.iter().enumerate() { - println!( - "\t{}: {} {}", - i, - n, - &*(0..*n).fold("".to_owned(), |s, _| s + "*") - ); + println!("\t{}: {} {}", i, n, &*(0..*n).fold("".to_owned(), |s, _| s + "*")); } - assert!(data.iter().all(|&(cycle, stop, i)| { - if i < n_workers { - cycle == stop - } else { - cycle < stop - } - })); + assert!(data.iter() + .all(|&(cycle, stop, i)| { + if i < n_workers { + cycle == stop + } else { + cycle < stop + } + })); clock_thread.join().unwrap(); }
AddCrates
Description
Port rust-threadpool
Crate Meta
Diff of Changes