diff options
| author | Mica White <botahamec@outlook.com> | 2025-12-08 20:14:03 -0500 |
|---|---|---|
| committer | Mica White <botahamec@outlook.com> | 2025-12-08 20:14:03 -0500 |
| commit | c31f4ce84c3c8b3f89a05890df775d4e766aaadb (patch) | |
| tree | 40169c1240717002197c85985f9bb652dd4b0af8 /src | |
Diffstat (limited to 'src')
| -rwxr-xr-x | src/futex.rs | 535 | ||||
| -rwxr-xr-x | src/futex/linux.rs | 61 | ||||
| -rwxr-xr-x | src/futex/windows.rs | 114 | ||||
| -rwxr-xr-x | src/lazy_box.rs | 97 | ||||
| -rwxr-xr-x | src/lib.rs | 187 | ||||
| -rwxr-xr-x | src/no_threads.rs | 122 | ||||
| -rwxr-xr-x | src/pthread.rs | 118 | ||||
| -rwxr-xr-x | src/spin.rs | 68 | ||||
| -rwxr-xr-x | src/windows7.rs | 119 |
9 files changed, 1421 insertions, 0 deletions
diff --git a/src/futex.rs b/src/futex.rs new file mode 100755 index 0000000..ad74f11 --- /dev/null +++ b/src/futex.rs @@ -0,0 +1,535 @@ +use core::sync::atomic::Ordering; + +use cfg_if::cfg_if; + +cfg_if! { + if #[cfg(any( + target_os = "linux", + target_os = "android", + //all(target_os = "emscripten", target_feature = "atomics"), + //target_os = "freebsd", + //target_os = "openbsd", + //target_os = "dragonfly", + //target_os = "fuchsia", + ))] { + mod linux; + use linux::{Futex, Primitive, SmallFutex, SmallPrimitive}; + } else if #[cfg(all(target_os = "windows", not(target_vendor = "win7")))] { + mod windows; + use windows::Futex; + } +} + +const SPIN_COUNT: u32 = 100; + +const UNLOCKED: SmallPrimitive = 0; +const LOCKED: SmallPrimitive = 1; // locked, no other threads waiting +const CONTENDED: SmallPrimitive = 2; // locked, and other threads waiting (contended) + +pub struct Mutex(SmallFutex); + +impl Mutex { + #[inline] + pub const fn new() -> Self { + Self(Futex::new(UNLOCKED)) + } + + /// Locks the mutex + /// + /// # Safety + /// + /// UB occurs if the mutex is already locked by the current thread and the + /// `unsafe_lock` feature is enabled. + #[inline] + pub unsafe fn lock(&self) { + if self + .0 + .compare_exchange(UNLOCKED, LOCKED, Ordering::Acquire, Ordering::Relaxed) + .is_err() + { + self.lock_contended(); + } + } + + /// If the mutex is unlocked, it is locked, and this function returns + /// `true'. Otherwise, `false` is returned. + #[inline] + pub unsafe fn try_lock(&self) -> bool { + self.0 + .compare_exchange(UNLOCKED, LOCKED, Ordering::Acquire, Ordering::Relaxed) + .is_ok() + } + + /// Unlocks the mutex + /// + /// # Safety + /// + /// UB occurs if the mutex is already unlocked or if it has been locked on + /// a different thread. + #[inline] + pub unsafe fn unlock(&self) { + if self.0.swap(UNLOCKED, Ordering::Release) == CONTENDED { + // We only wake up one thread. When that thread locks the mutex, it + // will mark the mutex as CONTENDED (see lock_contended above), + // which makes sure that any other waiting threads will also be + // woken up eventually. + self.0.wake(); + } + } + + #[inline] + pub unsafe fn is_locked(&self) -> bool { + self.0.load(Ordering::Relaxed) == UNLOCKED + } + + #[cold] + fn lock_contended(&self) { + // Spin first to speed things up if the lock is released quickly. + let mut state = self.spin(); + + // If it's unlocked now, attempt to take the lock + // without marking it as contended. + if state == UNLOCKED { + match self + .0 + .compare_exchange(UNLOCKED, LOCKED, Ordering::Acquire, Ordering::Relaxed) + { + Ok(_) => return, // Locked! + Err(s) => state = s, + } + } + + loop { + // Put the lock in contended state. + // We avoid an unnecessary write if it as already set to CONTENDED, + // to be friendlier for the caches. + if state != CONTENDED && self.0.swap(CONTENDED, Ordering::Acquire) == UNLOCKED { + // We changed it from UNLOCKED to CONTENDED, so we just successfully locked it. + return; + } + + // Wait for the futex to change state, assuming it is still CONTENDED. + self.0.wait(CONTENDED); + + // Spin again after waking up. + state = self.spin(); + } + } + + fn spin(&self) -> SmallPrimitive { + let mut spin = SPIN_COUNT; + loop { + // We only use `load` (and not `swap` or `compare_exchange`) + // while spinning, to be easier on the caches. + let state = self.0.load(Ordering::Acquire); + + // We stop spinning when the mutex is UNLOCKED, + // but also when it's CONTENDED. + if state != LOCKED || spin == 0 { + return state; + } + + core::hint::spin_loop(); + spin -= 1; + } + } +} + +pub struct RwLock { + // The state consists of a 30-bit reader counter, a 'readers waiting' flag, and a 'writers waiting' flag. + // Bits 0.30: + // 0: Unlocked + // 1.=0x3FFF_FFFE: Locked by N readers + // 0x3FFF_FFFF: Write locked + // Bit 30: Readers are waiting on this futex. + // Bit 31: Writers are waiting on the writer_notify futex. + state: Futex, + // The 'condition variable' to notify writers through. + // Incremented on every signal. + writer_notify: Futex, +} + +const READ_LOCKED: Primitive = 1; +const MASK: Primitive = (1 << 30) - 1; +const WRITE_LOCKED: Primitive = MASK; +const DOWNGRADE: Primitive = READ_LOCKED.wrapping_sub(WRITE_LOCKED); // READ_LOCKED - WRITE_LOCKED +const MAX_READERS: Primitive = MASK - 1; +const READERS_WAITING: Primitive = 1 << 30; +const WRITERS_WAITING: Primitive = 1 << 31; + +#[inline] +fn is_unlocked(state: Primitive) -> bool { + state & MASK == 0 +} + +#[inline] +fn is_write_locked(state: Primitive) -> bool { + state & MASK == WRITE_LOCKED +} + +#[inline] +fn has_readers_waiting(state: Primitive) -> bool { + state & READERS_WAITING != 0 +} + +#[inline] +fn has_writers_waiting(state: Primitive) -> bool { + state & WRITERS_WAITING != 0 +} + +#[inline] +fn is_read_lockable(state: Primitive) -> bool { + // This also returns false if the counter could overflow if we tried to read lock it. + // + // We don't allow read-locking if there's readers waiting, even if the lock is unlocked + // and there's no writers waiting. The only situation when this happens is after unlocking, + // at which point the unlocking thread might be waking up writers, which have priority over readers. + // The unlocking thread will clear the readers waiting bit and wake up readers, if necessary. + state & MASK < MAX_READERS && !has_readers_waiting(state) && !has_writers_waiting(state) +} + +#[inline] +fn is_read_lockable_after_wakeup(state: Primitive) -> bool { + // We make a special case for checking if we can read-lock _after_ a reader thread that went to + // sleep has been woken up by a call to `downgrade`. + // + // `downgrade` will wake up all readers and place the lock in read mode. Thus, there should be + // no readers waiting and the lock should be read-locked (not write-locked or unlocked). + // + // Note that we do not check if any writers are waiting. This is because a call to `downgrade` + // implies that the caller wants other readers to read the value protected by the lock. If we + // did not allow readers to acquire the lock before writers after a `downgrade`, then only the + // original writer would be able to read the value, thus defeating the purpose of `downgrade`. + state & MASK < MAX_READERS + && !has_readers_waiting(state) + && !is_write_locked(state) + && !is_unlocked(state) +} + +#[inline] +fn has_reached_max_readers(state: Primitive) -> bool { + state & MASK == MAX_READERS +} + +impl RwLock { + #[inline] + pub const fn new() -> Self { + Self { + state: Futex::new(0), + writer_notify: Futex::new(0), + } + } + + #[inline] + pub fn try_read(&self) -> bool { + self.state + .fetch_update(Ordering::Acquire, Ordering::Relaxed, |s| { + is_read_lockable(s).then(|| s + READ_LOCKED) + }) + .is_ok() + } + + #[inline] + pub fn read(&self) { + let state = self.state.load(Ordering::Relaxed); + if !is_read_lockable(state) + || self + .state + .compare_exchange_weak( + state, + state + READ_LOCKED, + Ordering::Acquire, + Ordering::Relaxed, + ) + .is_err() + { + self.read_contended(); + } + } + + /// # Safety + /// + /// The `RwLock` must be read-locked (N readers) in order to call this. + #[inline] + pub unsafe fn read_unlock(&self) { + let state = self.state.fetch_sub(READ_LOCKED, Ordering::Release) - READ_LOCKED; + + // It's impossible for a reader to be waiting on a read-locked RwLock, + // except if there is also a writer waiting. + debug_assert!(!has_readers_waiting(state) || has_writers_waiting(state)); + + // Wake up a writer if we were the last reader and there's a writer waiting. + if is_unlocked(state) && has_writers_waiting(state) { + self.wake_writer_or_readers(state); + } + } + + #[cold] + fn read_contended(&self) { + let mut has_slept = false; + let mut state = self.spin_read(); + + loop { + // If we have just been woken up, first check for a `downgrade` call. + // Otherwise, if we can read-lock it, lock it. + if (has_slept && is_read_lockable_after_wakeup(state)) || is_read_lockable(state) { + match self.state.compare_exchange_weak( + state, + state + READ_LOCKED, + Ordering::Acquire, + Ordering::Relaxed, + ) { + Ok(_) => return, // Locked! + Err(s) => { + state = s; + continue; + } + } + } + + // Check for overflow. + assert!( + !has_reached_max_readers(state), + "too many active read locks on RwLock" + ); + + // Make sure the readers waiting bit is set before we go to sleep. + if !has_readers_waiting(state) { + if let Err(s) = self.state.compare_exchange( + state, + state | READERS_WAITING, + Ordering::Relaxed, + Ordering::Relaxed, + ) { + state = s; + continue; + } + } + + // Wait for the state to change. + self.state.wait(state | READERS_WAITING); + has_slept = true; + + // Spin again after waking up. + state = self.spin_read(); + } + } + + #[inline] + pub fn try_write(&self) -> bool { + self.state + .fetch_update(Ordering::Acquire, Ordering::Relaxed, |s| { + is_unlocked(s).then(|| s + WRITE_LOCKED) + }) + .is_ok() + } + + #[inline] + pub fn write(&self) { + if self + .state + .compare_exchange_weak(0, WRITE_LOCKED, Ordering::Acquire, Ordering::Relaxed) + .is_err() + { + self.write_contended(); + } + } + + /// # Safety + /// + /// The `RwLock` must be write-locked (single writer) in order to call this. + #[inline] + pub unsafe fn write_unlock(&self) { + let state = self.state.fetch_sub(WRITE_LOCKED, Ordering::Release) - WRITE_LOCKED; + + debug_assert!(is_unlocked(state)); + + if has_writers_waiting(state) || has_readers_waiting(state) { + self.wake_writer_or_readers(state); + } + } + + /// # Safety + /// + /// The `RwLock` must be write-locked (single writer) in order to call this. + #[inline] + pub unsafe fn downgrade(&self) { + // Removes all write bits and adds a single read bit. + let state = self.state.fetch_add(DOWNGRADE, Ordering::Release); + debug_assert!( + is_write_locked(state), + "RwLock must be write locked to call `downgrade`" + ); + + if has_readers_waiting(state) { + // Since we had the exclusive lock, nobody else can unset this bit. + self.state.fetch_sub(READERS_WAITING, Ordering::Relaxed); + self.state.wake_all(); + } + } + + #[cold] + fn write_contended(&self) { + let mut state = self.spin_write(); + + let mut other_writers_waiting = 0; + + loop { + // If it's unlocked, we try to lock it. + if is_unlocked(state) { + match self.state.compare_exchange_weak( + state, + state | WRITE_LOCKED | other_writers_waiting, + Ordering::Acquire, + Ordering::Relaxed, + ) { + Ok(_) => return, // Locked! + Err(s) => { + state = s; + continue; + } + } + } + + // Set the waiting bit indicating that we're waiting on it. + if !has_writers_waiting(state) { + if let Err(s) = self.state.compare_exchange( + state, + state | WRITERS_WAITING, + Ordering::Relaxed, + Ordering::Relaxed, + ) { + state = s; + continue; + } + } + + // Other writers might be waiting now too, so we should make sure + // we keep that bit on once we manage lock it. + other_writers_waiting = WRITERS_WAITING; + + // Examine the notification counter before we check if `state` has changed, + // to make sure we don't miss any notifications. + let seq = self.writer_notify.load(Ordering::Acquire); + + // Don't go to sleep if the lock has become available, + // or if the writers waiting bit is no longer set. + state = self.state.load(Ordering::Relaxed); + if is_unlocked(state) || !has_writers_waiting(state) { + continue; + } + + // Wait for the state to change. + self.writer_notify.wait(seq); + + // Spin again after waking up. + state = self.spin_write(); + } + } + + /// Wakes up waiting threads after unlocking. + /// + /// If both are waiting, this will wake up only one writer, but will fall + /// back to waking up readers if there was no writer to wake up. + #[cold] + fn wake_writer_or_readers(&self, mut state: Primitive) { + assert!(is_unlocked(state)); + + // The readers waiting bit might be turned on at any point now, + // since readers will block when there's anything waiting. + // Writers will just lock the lock though, regardless of the waiting bits, + // so we don't have to worry about the writer waiting bit. + // + // If the lock gets locked in the meantime, we don't have to do + // anything, because then the thread that locked the lock will take + // care of waking up waiters when it unlocks. + + // If only writers are waiting, wake one of them up. + if state == WRITERS_WAITING { + match self + .state + .compare_exchange(state, 0, Ordering::Relaxed, Ordering::Relaxed) + { + Ok(_) => { + self.wake_writer(); + return; + } + Err(s) => { + // Maybe some readers are now waiting too. So, continue to the next `if`. + state = s; + } + } + } + + // If both writers and readers are waiting, leave the readers waiting + // and only wake up one writer. + if state == READERS_WAITING + WRITERS_WAITING { + if self + .state + .compare_exchange(state, READERS_WAITING, Ordering::Relaxed, Ordering::Relaxed) + .is_err() + { + // The lock got locked. Not our problem anymore. + return; + } + if self.wake_writer() { + return; + } + // No writers were actually blocked on futex_wait, so we continue + // to wake up readers instead, since we can't be sure if we notified a writer. + state = READERS_WAITING; + } + + // If readers are waiting, wake them all up. + if state == READERS_WAITING + && self + .state + .compare_exchange(state, 0, Ordering::Relaxed, Ordering::Relaxed) + .is_ok() + { + self.state.wake_all(); + } + } + + /// This wakes one writer and returns true if we woke up a writer that was + /// blocked on futex_wait. + /// + /// If this returns false, it might still be the case that we notified a + /// writer that was about to go to sleep. + fn wake_writer(&self) -> bool { + self.writer_notify.fetch_add(1, Ordering::Release); + self.writer_notify.wake() + // Note that FreeBSD and DragonFlyBSD don't tell us whether they woke + // up any threads or not, and always return `false` here. That still + // results in correct behavior: it just means readers get woken up as + // well in case both readers and writers were waiting. + } + + /// Spin for a while, but stop directly at the given condition. + #[inline] + fn spin_until(&self, f: impl Fn(Primitive) -> bool) -> Primitive { + let mut spin = SPIN_COUNT; // Chosen by fair dice roll. + loop { + let state = self.state.load(Ordering::Relaxed); + if f(state) || spin == 0 { + return state; + } + core::hint::spin_loop(); + spin -= 1; + } + } + + #[inline] + fn spin_write(&self) -> Primitive { + // Stop spinning when it's unlocked or when there's waiting writers, to keep things somewhat fair. + self.spin_until(|state| is_unlocked(state) || has_writers_waiting(state)) + } + + #[inline] + fn spin_read(&self) -> Primitive { + // Stop spinning when it's unlocked or read locked, or when there's waiting threads. + self.spin_until(|state| { + !is_write_locked(state) || has_readers_waiting(state) || has_writers_waiting(state) + }) + } +} diff --git a/src/futex/linux.rs b/src/futex/linux.rs new file mode 100755 index 0000000..80ca5a6 --- /dev/null +++ b/src/futex/linux.rs @@ -0,0 +1,61 @@ +use core::ops::Deref; +use core::sync::atomic::{AtomicU32, Ordering}; + +use libc::{syscall, SYS_futex}; + +#[repr(C)] +pub struct Futex(AtomicU32); + +pub type Atomic = AtomicU32; +pub type Primitive = u32; + +pub type SmallFutex = Futex; +pub type SmallAtomic = Atomic; +pub type SmallPrimitive = Primitive; + +impl Futex { + #[inline] + pub const fn new(initial_value: u32) -> Self { + Self(AtomicU32::new(initial_value)) + } + + #[inline] + pub fn wait(&self, expected_start_value: u32) { + unsafe { + syscall( + SYS_futex, + core::ptr::from_ref(&self.0), + libc::FUTEX_WAIT_BITSET | libc::FUTEX_PRIVATE_FLAG, + expected_start_value, + core::ptr::null::<()>(), + core::ptr::null::<u32>(), // This argument is unused for FUTEX_WAIT_BITSET. + !0u32, // A full bitmask, to make it behave like a regular FUTEX_WAIT. + ); + } + } + + #[inline] + pub fn wake(&self) -> bool { + let ptr = &self.0 as *const AtomicU32; + const OP: libc::c_int = libc::FUTEX_WAKE | libc::FUTEX_PRIVATE_FLAG; + unsafe { libc::syscall(libc::SYS_futex, ptr, OP, 1) > 0 } + } + + #[inline] + pub fn wake_all(&self) { + let ptr = &raw const self.0; + let op = libc::FUTEX_WAKE | libc::FUTEX_PRIVATE_FLAG; + unsafe { + syscall(libc::SYS_futex, ptr, op, i32::MAX); + } + } +} + +impl Deref for Futex { + type Target = Atomic; + + #[inline] + fn deref(&self) -> &Self::Target { + &self.0 + } +} diff --git a/src/futex/windows.rs b/src/futex/windows.rs new file mode 100755 index 0000000..153d9ef --- /dev/null +++ b/src/futex/windows.rs @@ -0,0 +1,114 @@ +use core::ffi::c_void; + +#[cfg(not(target_vendor = "win7"))] +// Use raw-dylib to import synchronization functions to workaround issues with the older mingw import library. +#[cfg_attr( + target_arch = "x86", + link( + name = "api-ms-win-core-synch-l1-2-0", + kind = "raw-dylib", + import_name_type = "undecorated" + ) +)] +#[cfg_attr( + not(target_arch = "x86"), + link(name = "api-ms-win-core-synch-l1-2-0", kind = "raw-dylib") +)] +extern "system" { + type BOOL = i32; + + pub const TRUE: BOOL = 1; + pub const INFINITE: u32 = 4294967295; + + pub fn WaitOnAddress( + address: *const c_void, + compareaddress: *const c_void, + addresssize: usize, + dwmilliseconds: u32, + ) -> BOOL; + pub fn WakeByAddressSingle(address: *const c_void); + pub fn WakeByAddressAll(address: *const c_void); +} + +pub struct Futex(AtomicU32); +pub type Primitive = u32; + +impl Futex { + #[inline] + pub const fn new(initial_value: Primitive) -> Self { + Self(AtomicU32::new(initial_value)) + } + + #[inline] + pub fn wait(&self, expected_start_value: Primitive) { + const SIZE: u32 = core::mem::size_of::<Atomic>(); + let addr = core::ptr::from_ref(&self.0).cast::<c_void>(); + let compare_addr = ptr::addr_of!(compare).cast::<c_void>(); + WaitOnAddress(addr, expected_start_value, SIZE, INFINITE); + } + + #[inline] + pub fn wake(&self) -> bool { + WakeByAddressSingle(core::ptr::from_ref(&self.0).cast::<c_void>()); + false + } + + #[inline] + pub fn wake_all(&self) { + unsafe { + let addr = core::ptr::from_ref(address).cast::<c_void>(); + WakeByAddressAll(addr); + } + } +} + +impl Deref for Futex { + type Target = Atomic; + + #[inline] + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +pub struct SmallFutex(AtomicU8); +pub type SmallAtomic = AtomicU8; +pub type SmallPrimitive = u8; + +impl SmallFutex { + #[inline] + pub const fn new(initial_value: SmallPrimitive) -> Self { + Self(AtomicU32::new(initial_value)) + } + + #[inline] + pub fn wait(&self, expected_start_value: SmallPrimitive) { + const SIZE: u32 = core::mem::size_of::<SmallAtomic>(); + let addr = core::ptr::from_ref(&self.0).cast::<c_void>(); + let compare_addr = ptr::addr_of!(compare).cast::<c_void>(); + WaitOnAddress(addr, expected_start_value, SIZE, INFINITE); + } + + #[inline] + pub fn wake(&self) -> bool { + WakeByAddressSingle(core::ptr::from_ref(&self.0).cast::<c_void>()); + false + } + + #[inline] + pub fn wake_all(&self) { + unsafe { + let addr = core::ptr::from_ref(address).cast::<c_void>(); + WakeByAddressAll(addr); + } + } +} + +impl Deref for SmallFutex { + type Target = SmallAtomic; + + #[inline] + fn deref(&self) -> &Self::Target { + &self.0 + } +} diff --git a/src/lazy_box.rs b/src/lazy_box.rs new file mode 100755 index 0000000..0fd7fd9 --- /dev/null +++ b/src/lazy_box.rs @@ -0,0 +1,97 @@ +// This is used to wrap pthread {Mutex, Condvar, RwLock} in. + +use core::marker::PhantomData; +use core::ops::{Deref, DerefMut}; +use core::ptr::null_mut; +use core::sync::atomic::{AtomicPtr, Ordering}; + +use alloc::boxed::Box; + +pub struct LazyBox<T: LazyInit> { + ptr: AtomicPtr<T>, + _phantom: PhantomData<T>, +} + +pub trait LazyInit { + /// This is called before the box is allocated, to provide the value to + /// move into the new box. + /// + /// It might be called more than once per LazyBox, as multiple threads + /// might race to initialize it concurrently, each constructing and initializing + /// their own box. All but one of them will be passed to `cancel_init` right after. + fn init() -> Box<Self>; + + /// Any surplus boxes from `init()` that lost the initialization race + /// are passed to this function for disposal. + /// + /// The default implementation calls destroy(). + fn cancel_init(x: Box<Self>) { + Self::destroy(x); + } + + /// This is called to destroy a used box. + /// + /// The default implementation just drops it. + fn destroy(_: Box<Self>) {} +} + +impl<T: LazyInit> LazyBox<T> { + #[inline] + pub const fn new() -> Self { + Self { + ptr: AtomicPtr::new(null_mut()), + _phantom: PhantomData, + } + } + + #[inline] + fn get_pointer(&self) -> *mut T { + let ptr = self.ptr.load(Ordering::Acquire); + if ptr.is_null() { + self.initialize() + } else { + ptr + } + } + + #[cold] + fn initialize(&self) -> *mut T { + let new_ptr = Box::into_raw(T::init()); + match self + .ptr + .compare_exchange(null_mut(), new_ptr, Ordering::AcqRel, Ordering::Acquire) + { + Ok(_) => new_ptr, + Err(ptr) => { + // Lost the race to another thread. + // Drop the box we created, and use the one from the other thread instead. + T::cancel_init(unsafe { Box::from_raw(new_ptr) }); + ptr + } + } + } +} + +impl<T: LazyInit> Deref for LazyBox<T> { + type Target = T; + #[inline] + fn deref(&self) -> &T { + unsafe { &*self.get_pointer() } + } +} + +impl<T: LazyInit> DerefMut for LazyBox<T> { + #[inline] + fn deref_mut(&mut self) -> &mut T { + unsafe { &mut *self.get_pointer() } + } +} + +impl<T: LazyInit> Drop for LazyBox<T> { + fn drop(&mut self) { + let ptr = *self.ptr.get_mut(); + if !ptr.is_null() { + T::destroy(unsafe { Box::from_raw(ptr) }); + } + } +} diff --git a/src/lib.rs b/src/lib.rs new file mode 100755 index 0000000..b26d0b1 --- /dev/null +++ b/src/lib.rs @@ -0,0 +1,187 @@ +#![no_std] +#![forbid(unsafe_op_in_unsafe_fn)] + +extern crate alloc; + +// tier 1: linux, windows, wasm w/o atomics +// tier 2: mac os, android, ios, fuschia, illumos, freebsd, netbsd, solaris, redox, uefi, zkvm, embedded +// not supported: dragonfly, wasm w/ atomics, hermit, teeos, sgx, solid, xous + +cfg_if::cfg_if! { + if #[cfg(any( + all(target_os = "windows", not(target_vendor = "win7")), + target_os = "linux", + target_os = "android", + //target_os = "freebsd", + //target_os = "openbsd", + //target_os = "dragonfly", + //all(target_family = "wasm", target_feature = "atomics"), + //target_os = "hermit", + ))] { + mod futex; + } else if #[cfg(any( + target_os = "dragonfly", + all(target_family = "wasm", target_feature = "atomics"), + target_os = "hermit" + ))] { + // merge with above when implemented + //} else if #[cfg(target_os = "fuchsia")] { + // mod fuchsia; + // mod unix; + } else if #[cfg(any( + target_family = "unix", + ))] { + extern crate alloc; + + mod lazy_box; + + mod pthread; + //mod queue; + } else if #[cfg(target_os = "teeos")] { + extern crate alloc; + + mod lazy_box; + + mod pthread; + //mod teeos; + } else if #[cfg(all(target_os = "windows", target_vendor = "win7"))] { + mod windows7; + //mod queue; + } else if #[cfg(all(target_vendor = "fortanix", target_env = "sgx"))] { + //mod sgx; + //mod queue; + } else if #[cfg(target_os = "solid_asp3")] { + //mod itron; + //mod solid; + } else if #[cfg(target_os = "xous")] { + //mod xous; + //mod queue; + } else if #[cfg(any( + target_family = "wasm", + target_os = "uefi", + target_os = "zkvm" + ))] { + mod no_threads; + } else if #[cfg(target_has_atomic = "8")] { + mod spin; + } +} + +cfg_if::cfg_if! { + if #[cfg(any( + all(target_os = "windows", not(target_vendor = "win7")), + target_os = "linux", + target_os = "android", + //target_os = "freebsd", + //target_os = "openbsd", + //target_os = "dragonfly", + //all(target_family = "wasm", target_feature = "atomics"), + //target_os = "hermit", + ))] { + pub use futex::Mutex; + } else if #[cfg(any( + target_os = "dragonfly", + all(target_family = "wasm", target_feature = "atomics"), + target_os = "hermit" + ))] { + // merge with above when implemented + //} else if #[cfg(target_os = "fuchsia")] { + // mod fuchsia; + // pub use fuchsia::Mutex; + } else if #[cfg(any( + target_family = "unix", + //target_os = "teeos", + ))] { + pub use pthread::Mutex; + } else if #[cfg(target_os = "teeos")] { + // merge with above when implemented + } else if #[cfg(all(target_os = "windows", target_vendor = "win7"))] { + pub use windows7::Mutex; + } else if #[cfg(all(target_vendor = "fortanix", target_env = "sgx"))] { + //pub use sgx::Mutex; + } else if #[cfg(target_os = "solid_asp3")] { + //pub use itron::Mutex; + } else if #[cfg(target_os = "xous")] { + //pub use xous::Mutex; + } else if #[cfg(any( + target_family = "wasm", + target_os = "uefi", + target_os = "zkvm" + ))] { + pub use no_threads::Mutex; + } else if #[cfg(all(target_os = "none", target_has_atomic = "8"))] { + pub use spin::Mutex; + } +} + +cfg_if::cfg_if! { + if #[cfg(any( + all(target_os = "windows", not(target_vendor = "win7")), + target_os = "linux", + target_os = "android", + //target_os = "freebsd", + //target_os = "openbsd", + //target_os = "dragonfly", + //target_os = "fuchsia", + //all(target_family = "wasm", target_feature = "atomics"), + //target_os = "hermit", + ))] { + pub use futex::RwLock; + } else if #[cfg(any( + all(target_family = "wasm", target_feature = "atomics"), + target_os = "hermit" + ))] { + // merge with above when implemented + } else if #[cfg(any( + target_family = "unix", + all(target_os = "windows", target_vendor = "win7"), + //all(target_vendor = "fortanix", target_env = "sgx"), + //target_os = "xous", + ))] { + //pub use queue::RwLock; + } else if #[cfg(any( + all(target_vendor = "fortanix", target_env = "sgx"), + target_os = "xous",) + )] { + // merge with above when implemented + } else if #[cfg(target_os = "solid_asp3")] { + //pub use solid::RwLock; + } else if #[cfg(target_os = "teeos")] { + //pub use teeos::RwLock; + } else if #[cfg(any( + target_family = "wasm", + target_os = "uefi", + target_os = "zkvm" + ))] { + pub use no_threads::RwLock; + } else if #[cfg(all(target_os = "none", target_has_atomic = "8"))] { + //pub use spin::RwLock; + } +} + +unsafe impl lock_api::RawMutex for Mutex { + #[allow(clippy::declare_interior_mutable_const)] + const INIT: Self = Self::new(); + + type GuardMarker = lock_api::GuardNoSend; + + #[no_panic::no_panic] + fn lock(&self) { + // safety: unsafe_lock is disabled + unsafe { self.lock() } + } + + #[no_panic::no_panic] + fn try_lock(&self) -> bool { + unsafe { self.try_lock() } + } + + #[no_panic::no_panic] + unsafe fn unlock(&self) { + unsafe { self.unlock() } + } + + fn is_locked(&self) -> bool { + unsafe { self.is_locked() } + } +} diff --git a/src/no_threads.rs b/src/no_threads.rs new file mode 100755 index 0000000..f831f5f --- /dev/null +++ b/src/no_threads.rs @@ -0,0 +1,122 @@ +use core::cell::Cell; + +pub struct Mutex { + // This platform has no threads, so we can use a Cell here. + locked: Cell<bool>, +} + +unsafe impl Send for Mutex {} +unsafe impl Sync for Mutex {} // no threads on this platform + +impl Mutex { + #[inline] + pub const fn new() -> Mutex { + Mutex { + locked: Cell::new(false), + } + } + + /// Locks the mutex + /// + /// # Safety + /// + /// UB occurs if the mutex is already locked by the current thread and the + /// `unsafe_lock` feature is enabled. + #[inline] + pub unsafe fn lock(&self) { + if self.locked.replace(true) { + if cfg!(feature = "unsafe_lock") { + // safety: it's UB to lock this when it's already locked + unsafe { core::hint::unreachable_unchecked() }; + } else { + panic!("deadlock on a platform with no threads"); + } + } + } + + /// If the mutex is unlocked, it is locked, and this function returns + /// `true'. Otherwise, `false` is returned. + #[inline] + pub unsafe fn try_lock(&self) -> bool { + self.locked.replace(true) == false + } + + /// Unlocks the mutex + /// + /// # Safety + /// + /// UB occurs if the mutex is already unlocked or if it has been locked on + /// a different thread. + #[inline] + pub unsafe fn unlock(&self) { + self.locked.set(false); + } + + #[inline] + pub unsafe fn is_locked(&self) -> bool { + self.locked.get() + } +} + +pub struct RwLock { + // This platform has no threads, so we can use a Cell here. + mode: Cell<isize>, +} + +unsafe impl Send for RwLock {} +unsafe impl Sync for RwLock {} // no threads on this platform + +impl RwLock { + #[inline] + pub const fn new() -> RwLock { + RwLock { mode: Cell::new(0) } + } + + #[inline] + pub unsafe fn read(&self) { + let m = self.mode.get(); + if m >= 0 { + self.mode.set(m + 1); + } else { + unsafe { core::hint::unreachable_unchecked() }; + } + } + + #[inline] + pub unsafe fn try_read(&self) -> bool { + let m = self.mode.get(); + if m >= 0 { + self.mode.set(m + 1); + true + } else { + false + } + } + + #[inline] + pub unsafe fn write(&self) { + if self.mode.replace(isize::MAX) != 0 { + unsafe { core::hint::unreachable_unchecked() }; + } + } + + #[inline] + pub unsafe fn try_write(&self) -> bool { + if self.mode.get() == 0 { + self.mode.set(-1); + true + } else { + false + } + } + + #[inline] + pub unsafe fn read_unlock(&self) { + self.mode.set(self.mode.get() - 1); + } + + #[inline] + pub unsafe fn write_unlock(&self) { + assert_eq!(self.mode.replace(0), -1); + } +} diff --git a/src/pthread.rs b/src/pthread.rs new file mode 100755 index 0000000..b549e1d --- /dev/null +++ b/src/pthread.rs @@ -0,0 +1,118 @@ +use core::cell::UnsafeCell; +use core::ops::Deref; + +use alloc::boxed::Box; + +use libc::{ + pthread_mutex_lock, pthread_mutex_t, pthread_mutex_trylock, pthread_mutex_unlock, EBUSY, + PTHREAD_MUTEX_INITIALIZER, +}; + +use crate::lazy_box::{LazyBox, LazyInit}; + +struct RawMutex(UnsafeCell<pthread_mutex_t>); + +impl LazyInit for RawMutex { + fn init() -> Box<Self> { + // We need to box this no matter what, because copying a pthread mutex + // results in undocumented behavior. + let mutex = Box::new(Self(UnsafeCell::new(PTHREAD_MUTEX_INITIALIZER))); + + if !cfg!(feature = "unsafe_lock") { + // A pthread mutex initialized with PTHREAD_MUTEX_INITIALIZER will have + // a type of PTHREAD_MUTEX_DEFAULT, which has undefined behavior if you + // try to re-lock it from the same thread when you already hold a lock + // (https://pubs.opengroup.org/onlinepubs/9699919799/functions/pthread_mutex_init.html). + // This is the case even if PTHREAD_MUTEX_DEFAULT == PTHREAD_MUTEX_NORMAL + // (https://github.com/rust-lang/rust/issues/33770#issuecomment-220847521) -- in that + // case, `pthread_mutexattr_settype(PTHREAD_MUTEX_DEFAULT)` will of course be the same + // as setting it to `PTHREAD_MUTEX_NORMAL`, but not setting any mode will result in + // a Mutex where re-locking is UB. + // + // In practice, glibc takes advantage of this undefined behavior to + // implement hardware lock elision, which uses hardware transactional + // memory to avoid acquiring the lock. While a transaction is in + // progress, the lock appears to be unlocked. This isn't a problem for + // other threads since the transactional memory will abort if a conflict + // is detected, however no abort is generated when re-locking from the + // same thread. + // + // Since locking the same mutex twice will result in two aliasing &mut + // references, we instead create the mutex with type + // PTHREAD_MUTEX_NORMAL which is guaranteed to deadlock if we try to + // re-lock it from the same thread, thus avoiding undefined behavior. + unsafe { + let mut attr = MaybeUninit::<libc::pthread_mutexattr_t>::uninit(); + libc::pthread_mutexattr_init(attr.as_mut_ptr()); + let attr = PthreadMutexAttr(&mut attr); + libc::pthread_mutexattr_settype(attr.0.as_mut_ptr(), libc::PTHREAD_MUTEX_NORMAL); + libc::pthread_mutex_init(mutex.0.get(), attr.0.as_ptr()); + } + } + + mutex + } +} + +impl Deref for RawMutex { + type Target = UnsafeCell<pthread_mutex_t>; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +pub struct Mutex(LazyBox<RawMutex>); + +unsafe impl Send for Mutex {} +unsafe impl Sync for Mutex {} + +impl Mutex { + #[inline] + pub const fn new() -> Self { + Self(LazyBox::new()) + } + + /// Locks the mutex + /// + /// # Safety + /// + /// UB occurs if the mutex is already locked by the current thread and the + /// `unsafe_lock` feature is enabled. + #[inline] + pub unsafe fn lock(&self) { + // safety: the pointer is valid + pthread_mutex_lock(self.0.get()); + } + + /// If the mutex is unlocked, it is locked, and this function returns + /// `true'. Otherwise, `false` is returned. + #[inline] + pub unsafe fn try_lock(&self) -> bool { + // safety: the pointer is valid + unsafe { pthread_mutex_trylock(self.0.get()) == 0 } + } + + /// Unlocks the mutex + /// + /// # Safety + /// + /// UB occurs if the mutex is already unlocked or if it has been locked on + /// a different thread. + #[inline] + pub unsafe fn unlock(&self) { + // safety: the pointer is valid + pthread_mutex_unlock(self.0.get()); + } + + pub unsafe fn is_locked(&self) -> bool { + if self.try_lock() { + unsafe { + self.unlock(); + } + false + } else { + true + } + } +} diff --git a/src/spin.rs b/src/spin.rs new file mode 100755 index 0000000..c81a1e7 --- /dev/null +++ b/src/spin.rs @@ -0,0 +1,68 @@ +use core::sync::atomic::{AtomicBool, Ordering}; + +pub struct Mutex(AtomicBool); + +unsafe impl Send for Mutex {} +unsafe impl Sync for Mutex {} + +impl Mutex { + #[inline] + pub const fn new() -> Mutex { + Mutex(AtomicBool::new(false)) + } + + /// Locks the mutex + /// + /// # Safety + /// + /// UB occurs if the mutex is already locked by the current thread and the + /// `unsafe_lock` feature is enabled. + #[inline] + pub unsafe fn lock(&self) { + // Can fail to lock even if the spinlock is not locked. May be more + // efficient than `try_lock` hen called in a loop. + loop { + if self.try_lock_weak() { + break; + } + + while self.is_locked() { + core::hint::spin_loop(); + } + } + } + + /// If the mutex is unlocked, it is locked, and this function returns + /// `true'. Otherwise, `false` is returned. + #[inline] + pub unsafe fn try_lock(&self) -> bool { + // The reason for using a strong compare_exchange is explained here: + // https://github.com/Amanieu/parking_lot/pull/207#issuecomment-575869107 + self.lock + .compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed) + .is_ok() + } + + /// Unlocks the mutex + /// + /// # Safety + /// + /// UB occurs if the mutex is already unlocked or if it has been locked on + /// a different thread. + #[inline] + pub unsafe fn unlock(&self) { + self.locked.set(false); + } + + #[inline] + pub unsafe fn is_locked(&self) -> bool { + self.lock.load(Ordering::Relaxed) + } + + #[inline] + fn try_lock_weak(&self) -> bool { + self.lock + .compare_exchange_weak(false, true, Ordering::Acquire, Ordering::Relaxed) + .is_ok() + } +} diff --git a/src/windows7.rs b/src/windows7.rs new file mode 100755 index 0000000..2007d05 --- /dev/null +++ b/src/windows7.rs @@ -0,0 +1,119 @@ +use core::cell::UnsafeCell; + +#[repr(C)] +#[derive(Clone, Copy)] +pub struct SrwLock { + ptr: *mut core::ffi::c_void, +} + +const SRWLOCK_INIT: SrwLock = SrwLock { + ptr: core::ptr::null_mut(), +}; + +type Boolean = u8; + +#[link(name = "kernel32")] +extern "system" { + pub fn AcquireSRWLockShared(srwlock: *mut SrwLock); + pub fn AcquireSRWLockExclusive(srwlock: *mut SrwLock); + pub fn TryAcquireSRWLockShared(srwlock: *mut SrwLock) -> Boolean; + pub fn TryAcquireSRWLockExclusive(srwlock: *mut SrwLock) -> Boolean; + pub fn ReleaseSRWLockShared(srwlock: *mut SrwLock); + pub fn ReleaseSRWLockExclusive(srwlock: *mut SrwLock); +} + +pub struct Mutex(UnsafeCell<SrwLock>); + +unsafe impl Send for Mutex {} +unsafe impl Sync for Mutex {} + +impl Mutex { + #[inline] + pub const fn new() -> Self { + Self(UnsafeCell::new(SRWLOCK_INIT)) + } + + /// Locks the mutex + /// + /// # Safety + /// + /// UB occurs if the mutex is already locked by the current thread and the + /// `unsafe_lock` feature is enabled. + #[inline] + pub unsafe fn lock(&self) { + unsafe { + AcquireSRWLockExclusive(self.0.get()); + } + } + + /// If the mutex is unlocked, it is locked, and this function returns + /// `true'. Otherwise, `false` is returned. + #[inline] + pub unsafe fn try_lock(&self) -> bool { + unsafe { TryAcquireSRWLockExclusive(self.0.get()) != 0 } + } + + /// Unlocks the mutex + /// + /// # Safety + /// + /// UB occurs if the mutex is already unlocked or if it has been locked on + /// a different thread. + #[inline] + pub unsafe fn unlock(&self) { + unsafe { + ReleaseSRWLockExclusive(self.0.get()); + } + } + + pub unsafe fn is_locked(&self) -> bool { + if self.try_lock() { + unsafe { + self.unlock(); + } + false + } else { + true + } + } +} + +pub struct RwLock { + inner: UnsafeCell<c::SRWLOCK>, +} + +unsafe impl Send for RwLock {} +unsafe impl Sync for RwLock {} + +impl RwLock { + #[inline] + pub const fn new() -> RwLock { + RwLock { + inner: UnsafeCell::new(c::SRWLOCK_INIT), + } + } + #[inline] + pub fn read(&self) { + unsafe { c::AcquireSRWLockShared(self.inner.get()) } + } + #[inline] + pub fn try_read(&self) -> bool { + unsafe { c::TryAcquireSRWLockShared(self.inner.get()) != 0 } + } + #[inline] + pub fn write(&self) { + unsafe { c::AcquireSRWLockExclusive(self.inner.get()) } + } + #[inline] + pub fn try_write(&self) -> bool { + unsafe { c::TryAcquireSRWLockExclusive(self.inner.get()) != 0 } + } + #[inline] + pub unsafe fn read_unlock(&self) { + c::ReleaseSRWLockShared(self.inner.get()) + } + #[inline] + pub unsafe fn write_unlock(&self) { + c::ReleaseSRWLockExclusive(self.inner.get()) + } +} |
