japaric / steed

[INACTIVE] Rust's standard library, free of C dependencies, for Linux systems
519 stars 22 forks source link

Copy a working mutex implementation from `parking_lot` #149

Closed tbu- closed 7 years ago

tbu- commented 7 years ago

On platforms without threading support, this is stubbed on platforms that do not have support for threads yet.

tbu- commented 7 years ago

Diff to parking_lot:

Only in parking_lot/core/src/: lib.rs
Only in src/sys/linux/parking_lot/core/: mod.rs
diff -ur parking_lot/core/src/parking_lot.rs src/sys/linux/parking_lot/core/parking_lot.rs
--- parking_lot/core/src/parking_lot.rs 2017-04-23 12:10:21.202664782 +0200
+++ src/sys/linux/parking_lot/core/parking_lot.rs   2017-05-07 04:35:56.059836037 +0200
@@ -5,23 +5,22 @@
 // http://opensource.org/licenses/MIT>, at your option. This file may not be
 // copied, modified, or distributed except according to those terms.

-#[cfg(feature = "nightly")]
-use std::sync::atomic::{AtomicUsize, ATOMIC_USIZE_INIT, Ordering};
-#[cfg(not(feature = "nightly"))]
-use stable::{AtomicUsize, ATOMIC_USIZE_INIT, Ordering};
-use std::time::{Instant, Duration};
-use std::cell::{Cell, UnsafeCell};
-use std::ptr;
-use std::mem;
+use sync::atomic::{AtomicUsize, ATOMIC_USIZE_INIT, Ordering};
+use time::{Instant, Duration};
+use cell::{Cell, UnsafeCell};
+use libc;
+use ptr;
+use mem;
+/*
 use smallvec::SmallVec;
 use rand::{self, XorShiftRng, Rng};
-use thread_parker::ThreadParker;
-use word_lock::WordLock;
-use util::UncheckedOptionExt;
+*/
+use super::thread_parker::ThreadParker;
+use super::word_lock::WordLock;
+use super::util::UncheckedOptionExt;

 static NUM_THREADS: AtomicUsize = ATOMIC_USIZE_INIT;
 static HASHTABLE: AtomicUsize = ATOMIC_USIZE_INIT;
-thread_local!(static THREAD_DATA: ThreadData = ThreadData::new());

 // Even with 3x more buckets than threads, the memory overhead per thread is
 // still only a few hundred bytes per thread.
@@ -46,7 +45,9 @@
             mutex: WordLock::new(),
             queue_head: Cell::new(ptr::null()),
             queue_tail: Cell::new(ptr::null()),
+            /*
             fair_timeout: UnsafeCell::new(FairTimeout::new()),
+            */
             _padding: unsafe { mem::uninitialized() },
         };
         Box::new(HashTable {
@@ -65,8 +66,10 @@
     queue_head: Cell<*const ThreadData>,
     queue_tail: Cell<*const ThreadData>,

+    /*
     // Next time at which point be_fair should be set
     fair_timeout: UnsafeCell<FairTimeout>,
+    */

     // Padding to avoid false sharing between buckets. Ideally we would just
     // align the bucket structure to 64 bytes, but Rust doesn't support that
@@ -81,12 +84,15 @@
             mutex: WordLock::new(),
             queue_head: Cell::new(ptr::null()),
             queue_tail: Cell::new(ptr::null()),
+            /*
             fair_timeout: UnsafeCell::new(FairTimeout::new()),
+            */
             _padding: unsafe { mem::uninitialized() },
         }
     }
 }

+/*
 struct FairTimeout {
     // Next time at which point be_fair should be set
     timeout: Instant,
@@ -114,8 +120,9 @@
         }
     }
 }
+*/

-struct ThreadData {
+pub struct ThreadData {
     parker: ThreadParker,

     // Key that this thread is sleeping on. This may change if the thread is
@@ -133,7 +140,7 @@
 }

 impl ThreadData {
-    fn new() -> ThreadData {
+    pub fn new() -> ThreadData {
         // Keep track of the total number of live ThreadData objects and resize
         // the hash table accordingly.
         let num_threads = NUM_THREADS.fetch_add(1, Ordering::Relaxed) + 1;
@@ -512,7 +519,7 @@
                         timeout: Option<Instant>)
                         -> ParkResult {
     // Grab our thread data, this also ensures that the hash table exists
-    let thread_data = &*THREAD_DATA.with(|x| x as *const ThreadData);
+    let thread_data = &*libc::tls_parking_lot_data();

     // Lock the bucket for the given key
     let bucket = lock_bucket(key);
@@ -675,7 +682,7 @@

             // Invoke the callback before waking up the thread
             result.unparked_threads = 1;
-            result.be_fair = (*bucket.fair_timeout.get()).should_timeout();
+            result.be_fair = false/*(*bucket.fair_timeout.get()).should_timeout()*/;
             let token = callback(result);

             // Set the token for the target thread
@@ -723,7 +730,10 @@
     let mut link = &bucket.queue_head;
     let mut current = bucket.queue_head.get();
     let mut previous = ptr::null();
+    let mut threads = Vec::new();
+    /*
     let mut threads = SmallVec::<[_; 8]>::new();
+    */
     while !current.is_null() {
         if (*current).key.load(Ordering::Relaxed) == key {
             // Remove the thread from the queue
@@ -878,7 +888,7 @@

     // Invoke the callback before waking up the thread
     if result.unparked_threads != 0 {
-        result.be_fair = (*bucket_from.fair_timeout.get()).should_timeout();
+        result.be_fair = false/*(*bucket_from.fair_timeout.get()).should_timeout()*/;
     }
     let token = callback(op, result);

@@ -942,7 +952,10 @@
     let mut link = &bucket.queue_head;
     let mut current = bucket.queue_head.get();
     let mut previous = ptr::null();
+    /*
     let mut threads = SmallVec::<[_; 8]>::new();
+    */
+    let mut threads = Vec::new();
     let mut result = UnparkResult {
         unparked_threads: 0,
         have_more_threads: false,
@@ -986,7 +999,7 @@
     // Invoke the callback before waking up the threads
     result.unparked_threads = threads.len();
     if result.unparked_threads != 0 {
-        result.be_fair = (*bucket.fair_timeout.get()).should_timeout();
+        result.be_fair = false/*(*bucket.fair_timeout.get()).should_timeout()*/;
     }
     let token = callback(result);

diff -ur parking_lot/core/src/spinwait.rs src/sys/linux/parking_lot/core/spinwait.rs
--- parking_lot/core/src/spinwait.rs    2017-04-23 12:10:21.202664782 +0200
+++ src/sys/linux/parking_lot/core/spinwait.rs  2017-05-07 04:35:56.059836037 +0200
@@ -5,42 +5,15 @@
 // http://opensource.org/licenses/MIT>, at your option. This file may not be
 // copied, modified, or distributed except according to those terms.

-#[cfg(windows)]
-use kernel32;
-#[cfg(unix)]
 use libc;
-#[cfg(not(any(windows, unix)))]
-use std::thread;
-#[cfg(not(feature = "nightly"))]
-use std::sync::atomic::{Ordering, fence};

-// Yields the rest of the current timeslice to the OS
-#[cfg(windows)]
-#[inline]
-fn thread_yield() {
-    unsafe {
-        // We don't use SwitchToThread here because it doesn't consider all
-        // threads in the system and the thread we are waiting for may not get
-        // selected.
-        kernel32::Sleep(0);
-    }
-}
-#[cfg(unix)]
 #[inline]
 fn thread_yield() {
     unsafe {
         libc::sched_yield();
     }
 }
-#[cfg(not(any(windows, unix)))]
-#[inline]
-fn thread_yield() {
-    thread::yield_now();
-}
-
-// Wastes some CPU time for the given number of iterations, preferably also
-// using a hint to indicate to the CPU that we are spinning.
-#[cfg(all(feature = "nightly", any(target_arch = "x86", target_arch = "x86_64")))]
+#[cfg(any(target_arch = "x86", target_arch = "x86_64"))]
 #[inline]
 fn cpu_relax(iterations: u32) {
     for _ in 0..iterations {
@@ -49,7 +22,7 @@
         }
     }
 }
-#[cfg(all(feature = "nightly", target_arch = "aarch64"))]
+#[cfg(target_arch = "aarch64")]
 #[inline]
 fn cpu_relax(iterations: u32) {
     for _ in 0..iterations {
@@ -58,9 +31,9 @@
         }
     }
 }
-#[cfg(all(feature = "nightly", not(any(target_arch = "x86",
-                                       target_arch = "x86_64",
-                                       target_arch = "aarch64"))))]
+#[cfg(not(any(target_arch = "x86",
+              target_arch = "x86_64",
+              target_arch = "aarch64")))]
 #[inline]
 fn cpu_relax(iterations: u32) {
     for _ in 0..iterations {
@@ -69,15 +42,6 @@
         }
     }
 }
-#[cfg(not(feature = "nightly"))]
-#[inline]
-fn cpu_relax(iterations: u32) {
-    // This is a bit tricky: we rely on the fact that LLVM doesn't optimize
-    // atomic operations and effectively treats them as volatile.
-    for _ in 0..iterations {
-        fence(Ordering::SeqCst);
-    }
-}

 /// A counter used to perform exponential backoff in spin loops.
 pub struct SpinWait {
@@ -86,19 +50,11 @@

 impl SpinWait {
     /// Creates a new `SpinWait`.
-    #[cfg(feature = "nightly")]
     #[inline]
     pub const fn new() -> SpinWait {
         SpinWait { counter: 0 }
     }

-    /// Creates a new `SpinWait`.
-    #[cfg(not(feature = "nightly"))]
-    #[inline]
-    pub fn new() -> SpinWait {
-        SpinWait { counter: 0 }
-    }
-
     /// Resets a `SpinWait` to its initial state.
     #[inline]
     pub fn reset(&mut self) {
Only in parking_lot/core/src/: stable.rs
Only in parking_lot/core/src/thread_parker: generic.rs
diff -ur parking_lot/core/src/thread_parker/linux.rs src/sys/linux/parking_lot/core/thread_parker/linux.rs
--- parking_lot/core/src/thread_parker/linux.rs 2017-04-23 12:10:21.202664782 +0200
+++ src/sys/linux/parking_lot/core/thread_parker/linux.rs   2017-05-07 04:35:56.059836037 +0200
@@ -5,22 +5,21 @@
 // http://opensource.org/licenses/MIT>, at your option. This file may not be
 // copied, modified, or distributed except according to those terms.

-use std::sync::atomic::{AtomicI32, Ordering};
-use std::time::Instant;
 use libc;
-
-#[cfg(target_arch = "x86")]
-const SYS_FUTEX: libc::c_long = 240;
-#[cfg(target_arch = "x86_64")]
-const SYS_FUTEX: libc::c_long = 202;
-#[cfg(target_arch = "arm")]
-const SYS_FUTEX: libc::c_long = 240;
-#[cfg(target_arch = "aarch64")]
-const SYS_FUTEX: libc::c_long = 98;
-
-const FUTEX_WAIT: i32 = 0;
-const FUTEX_WAKE: i32 = 1;
-const FUTEX_PRIVATE: i32 = 128;
+use linux::{self, FUTEX_PRIVATE, FUTEX_WAIT, FUTEX_WAKE};
+use ptr;
+use sync::atomic::{AtomicI32, Ordering};
+use time::Instant;
+
+#[inline(always)]
+pub unsafe fn futex(uaddr: *const AtomicI32,
+                    op: libc::c_int,
+                    val: i32,
+                    utime: *const libc::timespec)
+    -> libc::c_int
+{
+    linux::futex(uaddr as *mut _, op, val as u32, utime, ptr::null_mut(), 0)
+}

 // Helper type for putting a thread to sleep until some other thread wakes it up
 pub struct ThreadParker {
@@ -47,12 +46,10 @@
     // been added to the queue, after unlocking the queue.
     pub unsafe fn park(&self) {
         while self.futex.load(Ordering::Acquire) != 0 {
-            let r = libc::syscall(SYS_FUTEX, &self.futex, FUTEX_WAIT | FUTEX_PRIVATE, 1, 0);
-            debug_assert!(r == 0 || r == -1);
-            if r == -1 {
-                debug_assert!(*libc::__errno_location() == libc::EINTR ||
-                              *libc::__errno_location() == libc::EAGAIN);
-            }
+            let r = futex(&self.futex, FUTEX_WAIT | FUTEX_PRIVATE, 1, ptr::null());
+            debug_assert!(r == 0 ||
+                          r == -libc::EINTR ||
+                          r == -libc::EAGAIN);
         }
     }

@@ -75,13 +72,11 @@
                 tv_sec: diff.as_secs() as libc::time_t,
                 tv_nsec: diff.subsec_nanos() as libc::c_long,
             };
-            let r = libc::syscall(SYS_FUTEX, &self.futex, FUTEX_WAIT | FUTEX_PRIVATE, 1, &ts);
-            debug_assert!(r == 0 || r == -1);
-            if r == -1 {
-                debug_assert!(*libc::__errno_location() == libc::EINTR ||
-                              *libc::__errno_location() == libc::EAGAIN ||
-                              *libc::__errno_location() == libc::ETIMEDOUT);
-            }
+            let r = futex(&self.futex, FUTEX_WAIT | FUTEX_PRIVATE, 1, &ts);
+            debug_assert!(r == 0 ||
+                          r == -libc::EINTR ||
+                          r == -libc::EAGAIN ||
+                          r == -libc::ETIMEDOUT);
         }
         true
     }
@@ -110,10 +105,9 @@
     pub unsafe fn unpark(self) {
         // The thread data may have been freed at this point, but it doesn't
         // matter since the syscall will just return EFAULT in that case.
-        let r = libc::syscall(SYS_FUTEX, self.futex, FUTEX_WAKE | FUTEX_PRIVATE, 1);
-        debug_assert!(r == 0 || r == 1 || r == -1);
-        if r == -1 {
-            debug_assert_eq!(*libc::__errno_location(), libc::EFAULT);
-        }
+        let r = futex(self.futex, FUTEX_WAKE | FUTEX_PRIVATE, 1, ptr::null());
+        debug_assert!(r == 0 ||
+                      r == 1 ||
+                      r == -libc::EFAULT);
     }
 }
Only in parking_lot/core/src/thread_parker: unix.rs
Only in parking_lot/core/src/thread_parker: windows
diff -ur parking_lot/core/src/word_lock.rs src/sys/linux/parking_lot/core/word_lock.rs
--- parking_lot/core/src/word_lock.rs   2017-04-23 12:10:21.202664782 +0200
+++ src/sys/linux/parking_lot/core/word_lock.rs 2017-05-07 04:35:56.059836037 +0200
@@ -5,17 +5,15 @@
 // http://opensource.org/licenses/MIT>, at your option. This file may not be
 // copied, modified, or distributed except according to those terms.

-#[cfg(feature = "nightly")]
-use std::sync::atomic::{AtomicUsize, Ordering, fence};
-#[cfg(not(feature = "nightly"))]
-use stable::{AtomicUsize, Ordering, fence};
-use std::ptr;
-use std::mem;
-use std::cell::Cell;
-use spinwait::SpinWait;
-use thread_parker::ThreadParker;
+use cell::Cell;
+use libc;
+use mem;
+use ptr;
+use super::spinwait::SpinWait;
+use super::thread_parker::ThreadParker;
+use sync::atomic::{AtomicUsize, Ordering, fence};

-struct ThreadData {
+pub struct ThreadData {
     parker: ThreadParker,

     // Linked list of threads in the queue. The queue is split into two parts:
@@ -36,7 +34,7 @@
 }

 impl ThreadData {
-    fn new() -> ThreadData {
+    pub fn new() -> ThreadData {
         ThreadData {
             parker: ThreadParker::new(),
             queue_tail: Cell::new(ptr::null()),
@@ -46,8 +44,6 @@
     }
 }

-thread_local!(static THREAD_DATA: ThreadData = ThreadData::new());
-
 const LOCKED_BIT: usize = 1;
 const QUEUE_LOCKED_BIT: usize = 2;
 const QUEUE_MASK: usize = !3;
@@ -109,7 +105,7 @@
             }

             // Get our thread data and prepare it for parking
-            let thread_data = &*THREAD_DATA.with(|x| x as *const ThreadData);
+            let thread_data = &*libc::tls_parking_lot_word_lock_data();
             assert!(mem::align_of_val(thread_data) > !QUEUE_MASK);
             thread_data.parker.prepare_park();
Only in src/sys/linux/parking_lot/: LICENSE-APACHE
Only in src/sys/linux/parking_lot/: LICENSE-MIT
Only in parking_lot/src/: condvar.rs
Only in src/sys/linux/parking_lot/: core
Only in parking_lot/src/: elision.rs
Only in parking_lot/src/: lib.rs
Only in src/sys/linux/parking_lot/: mod.rs
Only in parking_lot/src/: mutex.rs
Only in parking_lot/src/: once.rs
diff -ur parking_lot/src/raw_mutex.rs src/sys/linux/parking_lot/raw_mutex.rs
--- parking_lot/src/raw_mutex.rs    2017-04-23 12:25:38.120264290 +0200
+++ src/sys/linux/parking_lot/raw_mutex.rs  2017-05-07 04:35:56.059836037 +0200
@@ -5,16 +5,10 @@
 // http://opensource.org/licenses/MIT>, at your option. This file may not be
 // copied, modified, or distributed except according to those terms.

-#[cfg(feature = "nightly")]
-use std::sync::atomic::{AtomicU8, Ordering};
-#[cfg(feature = "nightly")]
+use sync::atomic::{AtomicU8, Ordering};
 type U8 = u8;
-#[cfg(not(feature = "nightly"))]
-use stable::{AtomicU8, Ordering};
-#[cfg(not(feature = "nightly"))]
-type U8 = usize;
-use std::time::{Duration, Instant};
-use parking_lot_core::{self, ParkResult, UnparkResult, SpinWait, UnparkToken, DEFAULT_PARK_TOKEN};
+use time::{Duration, Instant};
+use super::parking_lot_core::{self, ParkResult, UnparkResult, SpinWait, UnparkToken, DEFAULT_PARK_TOKEN};

 // UnparkToken used to indicate that that the target thread should attempt to
 // lock the mutex again as soon as it is unparked.
@@ -32,16 +26,10 @@
 }

 impl RawMutex {
-    #[cfg(feature = "nightly")]
     #[inline]
     pub const fn new() -> RawMutex {
         RawMutex { state: AtomicU8::new(0) }
     }
-    #[cfg(not(feature = "nightly"))]
-    #[inline]
-    pub fn new() -> RawMutex {
-        RawMutex { state: AtomicU8::new(0) }
-    }

     #[inline]
     pub fn lock(&self) {
Only in parking_lot/src/: raw_remutex.rs
Only in parking_lot/src/: raw_rwlock.rs
Only in parking_lot/src/: remutex.rs
Only in parking_lot/src/: rwlock.rs
Only in parking_lot/src/: stable.rs
Only in parking_lot/src/: util.rs
tbu- commented 7 years ago

@homunkulus try

homunkulus commented 7 years ago

:hourglass: Trying commit 52c87b7 with merge 52c87b7...

homunkulus commented 7 years ago

:broken_heart: Test failed - status-travis

tbu- commented 7 years ago

Hmm...

tbu- commented 7 years ago

@homunkulus retry

homunkulus commented 7 years ago

:hourglass: Trying commit 52c87b7 with merge 52c87b7...

tbu- commented 7 years ago

@homunkulus try

homunkulus commented 7 years ago

:hourglass: Trying commit 7c59c37 with merge 7c59c37...

homunkulus commented 7 years ago

:broken_heart: Test failed - status-travis

homunkulus commented 7 years ago

:umbrella: The latest upstream changes (presumably #150) made this pull request unmergeable. Please resolve the merge conflicts.

tbu- commented 7 years ago

@homunkulus try

homunkulus commented 7 years ago

:hourglass: Trying commit a615fda with merge a615fda...

homunkulus commented 7 years ago

:broken_heart: Test failed - status-travis

tbu- commented 7 years ago

@homunkulus try

homunkulus commented 7 years ago

:hourglass: Trying commit 61e1806 with merge 61e1806...

homunkulus commented 7 years ago

:sunny: Test successful - status-travis State: approved= try=True

tbu- commented 7 years ago

@japaric Is the general approach OK? Should I port the rest of the synchronization primitives from parking_lot?

japaric commented 7 years ago

@tbu- Sorry for taking so long to get to this. The approach seems good to me; the diff to parking-lot looks small and maintainable -- though some cfgs like cfg(windows) could have been kept to make it even smaller but that's a nit.

@homunkulus r+

homunkulus commented 7 years ago

:pushpin: Commit f1f03d7 has been approved by japaric

homunkulus commented 7 years ago

:hourglass: Testing commit f1f03d7 with merge f1f03d7...

homunkulus commented 7 years ago

:sunny: Test successful - status-travis Approved by: japaric Pushing f1f03d7aad6b2ef72c97054cb28f1ffde2f13c5a to master...