summaryrefslogtreecommitdiff
path: root/src/futex.rs
diff options
context:
space:
mode:
authorMica White <botahamec@outlook.com>2025-12-08 20:14:03 -0500
committerMica White <botahamec@outlook.com>2025-12-08 20:14:03 -0500
commitc31f4ce84c3c8b3f89a05890df775d4e766aaadb (patch)
tree40169c1240717002197c85985f9bb652dd4b0af8 /src/futex.rs
First commitHEADmain
Diffstat (limited to 'src/futex.rs')
-rwxr-xr-xsrc/futex.rs535
1 files changed, 535 insertions, 0 deletions
diff --git a/src/futex.rs b/src/futex.rs
new file mode 100755
index 0000000..ad74f11
--- /dev/null
+++ b/src/futex.rs
@@ -0,0 +1,535 @@
+use core::sync::atomic::Ordering;
+
+use cfg_if::cfg_if;
+
+cfg_if! {
+ if #[cfg(any(
+ target_os = "linux",
+ target_os = "android",
+ //all(target_os = "emscripten", target_feature = "atomics"),
+ //target_os = "freebsd",
+ //target_os = "openbsd",
+ //target_os = "dragonfly",
+ //target_os = "fuchsia",
+ ))] {
+ mod linux;
+ use linux::{Futex, Primitive, SmallFutex, SmallPrimitive};
+ } else if #[cfg(all(target_os = "windows", not(target_vendor = "win7")))] {
+ mod windows;
+ use windows::Futex;
+ }
+}
+
+const SPIN_COUNT: u32 = 100;
+
+const UNLOCKED: SmallPrimitive = 0;
+const LOCKED: SmallPrimitive = 1; // locked, no other threads waiting
+const CONTENDED: SmallPrimitive = 2; // locked, and other threads waiting (contended)
+
+pub struct Mutex(SmallFutex);
+
+impl Mutex {
+ #[inline]
+ pub const fn new() -> Self {
+ Self(Futex::new(UNLOCKED))
+ }
+
+ /// Locks the mutex
+ ///
+ /// # Safety
+ ///
+ /// UB occurs if the mutex is already locked by the current thread and the
+ /// `unsafe_lock` feature is enabled.
+ #[inline]
+ pub unsafe fn lock(&self) {
+ if self
+ .0
+ .compare_exchange(UNLOCKED, LOCKED, Ordering::Acquire, Ordering::Relaxed)
+ .is_err()
+ {
+ self.lock_contended();
+ }
+ }
+
+ /// If the mutex is unlocked, it is locked, and this function returns
+ /// `true'. Otherwise, `false` is returned.
+ #[inline]
+ pub unsafe fn try_lock(&self) -> bool {
+ self.0
+ .compare_exchange(UNLOCKED, LOCKED, Ordering::Acquire, Ordering::Relaxed)
+ .is_ok()
+ }
+
+ /// Unlocks the mutex
+ ///
+ /// # Safety
+ ///
+ /// UB occurs if the mutex is already unlocked or if it has been locked on
+ /// a different thread.
+ #[inline]
+ pub unsafe fn unlock(&self) {
+ if self.0.swap(UNLOCKED, Ordering::Release) == CONTENDED {
+ // We only wake up one thread. When that thread locks the mutex, it
+ // will mark the mutex as CONTENDED (see lock_contended above),
+ // which makes sure that any other waiting threads will also be
+ // woken up eventually.
+ self.0.wake();
+ }
+ }
+
+ #[inline]
+ pub unsafe fn is_locked(&self) -> bool {
+ self.0.load(Ordering::Relaxed) == UNLOCKED
+ }
+
+ #[cold]
+ fn lock_contended(&self) {
+ // Spin first to speed things up if the lock is released quickly.
+ let mut state = self.spin();
+
+ // If it's unlocked now, attempt to take the lock
+ // without marking it as contended.
+ if state == UNLOCKED {
+ match self
+ .0
+ .compare_exchange(UNLOCKED, LOCKED, Ordering::Acquire, Ordering::Relaxed)
+ {
+ Ok(_) => return, // Locked!
+ Err(s) => state = s,
+ }
+ }
+
+ loop {
+ // Put the lock in contended state.
+ // We avoid an unnecessary write if it as already set to CONTENDED,
+ // to be friendlier for the caches.
+ if state != CONTENDED && self.0.swap(CONTENDED, Ordering::Acquire) == UNLOCKED {
+ // We changed it from UNLOCKED to CONTENDED, so we just successfully locked it.
+ return;
+ }
+
+ // Wait for the futex to change state, assuming it is still CONTENDED.
+ self.0.wait(CONTENDED);
+
+ // Spin again after waking up.
+ state = self.spin();
+ }
+ }
+
+ fn spin(&self) -> SmallPrimitive {
+ let mut spin = SPIN_COUNT;
+ loop {
+ // We only use `load` (and not `swap` or `compare_exchange`)
+ // while spinning, to be easier on the caches.
+ let state = self.0.load(Ordering::Acquire);
+
+ // We stop spinning when the mutex is UNLOCKED,
+ // but also when it's CONTENDED.
+ if state != LOCKED || spin == 0 {
+ return state;
+ }
+
+ core::hint::spin_loop();
+ spin -= 1;
+ }
+ }
+}
+
+pub struct RwLock {
+ // The state consists of a 30-bit reader counter, a 'readers waiting' flag, and a 'writers waiting' flag.
+ // Bits 0.30:
+ // 0: Unlocked
+ // 1.=0x3FFF_FFFE: Locked by N readers
+ // 0x3FFF_FFFF: Write locked
+ // Bit 30: Readers are waiting on this futex.
+ // Bit 31: Writers are waiting on the writer_notify futex.
+ state: Futex,
+ // The 'condition variable' to notify writers through.
+ // Incremented on every signal.
+ writer_notify: Futex,
+}
+
+const READ_LOCKED: Primitive = 1;
+const MASK: Primitive = (1 << 30) - 1;
+const WRITE_LOCKED: Primitive = MASK;
+const DOWNGRADE: Primitive = READ_LOCKED.wrapping_sub(WRITE_LOCKED); // READ_LOCKED - WRITE_LOCKED
+const MAX_READERS: Primitive = MASK - 1;
+const READERS_WAITING: Primitive = 1 << 30;
+const WRITERS_WAITING: Primitive = 1 << 31;
+
+#[inline]
+fn is_unlocked(state: Primitive) -> bool {
+ state & MASK == 0
+}
+
+#[inline]
+fn is_write_locked(state: Primitive) -> bool {
+ state & MASK == WRITE_LOCKED
+}
+
+#[inline]
+fn has_readers_waiting(state: Primitive) -> bool {
+ state & READERS_WAITING != 0
+}
+
+#[inline]
+fn has_writers_waiting(state: Primitive) -> bool {
+ state & WRITERS_WAITING != 0
+}
+
+#[inline]
+fn is_read_lockable(state: Primitive) -> bool {
+ // This also returns false if the counter could overflow if we tried to read lock it.
+ //
+ // We don't allow read-locking if there's readers waiting, even if the lock is unlocked
+ // and there's no writers waiting. The only situation when this happens is after unlocking,
+ // at which point the unlocking thread might be waking up writers, which have priority over readers.
+ // The unlocking thread will clear the readers waiting bit and wake up readers, if necessary.
+ state & MASK < MAX_READERS && !has_readers_waiting(state) && !has_writers_waiting(state)
+}
+
+#[inline]
+fn is_read_lockable_after_wakeup(state: Primitive) -> bool {
+ // We make a special case for checking if we can read-lock _after_ a reader thread that went to
+ // sleep has been woken up by a call to `downgrade`.
+ //
+ // `downgrade` will wake up all readers and place the lock in read mode. Thus, there should be
+ // no readers waiting and the lock should be read-locked (not write-locked or unlocked).
+ //
+ // Note that we do not check if any writers are waiting. This is because a call to `downgrade`
+ // implies that the caller wants other readers to read the value protected by the lock. If we
+ // did not allow readers to acquire the lock before writers after a `downgrade`, then only the
+ // original writer would be able to read the value, thus defeating the purpose of `downgrade`.
+ state & MASK < MAX_READERS
+ && !has_readers_waiting(state)
+ && !is_write_locked(state)
+ && !is_unlocked(state)
+}
+
+#[inline]
+fn has_reached_max_readers(state: Primitive) -> bool {
+ state & MASK == MAX_READERS
+}
+
+impl RwLock {
+ #[inline]
+ pub const fn new() -> Self {
+ Self {
+ state: Futex::new(0),
+ writer_notify: Futex::new(0),
+ }
+ }
+
+ #[inline]
+ pub fn try_read(&self) -> bool {
+ self.state
+ .fetch_update(Ordering::Acquire, Ordering::Relaxed, |s| {
+ is_read_lockable(s).then(|| s + READ_LOCKED)
+ })
+ .is_ok()
+ }
+
+ #[inline]
+ pub fn read(&self) {
+ let state = self.state.load(Ordering::Relaxed);
+ if !is_read_lockable(state)
+ || self
+ .state
+ .compare_exchange_weak(
+ state,
+ state + READ_LOCKED,
+ Ordering::Acquire,
+ Ordering::Relaxed,
+ )
+ .is_err()
+ {
+ self.read_contended();
+ }
+ }
+
+ /// # Safety
+ ///
+ /// The `RwLock` must be read-locked (N readers) in order to call this.
+ #[inline]
+ pub unsafe fn read_unlock(&self) {
+ let state = self.state.fetch_sub(READ_LOCKED, Ordering::Release) - READ_LOCKED;
+
+ // It's impossible for a reader to be waiting on a read-locked RwLock,
+ // except if there is also a writer waiting.
+ debug_assert!(!has_readers_waiting(state) || has_writers_waiting(state));
+
+ // Wake up a writer if we were the last reader and there's a writer waiting.
+ if is_unlocked(state) && has_writers_waiting(state) {
+ self.wake_writer_or_readers(state);
+ }
+ }
+
+ #[cold]
+ fn read_contended(&self) {
+ let mut has_slept = false;
+ let mut state = self.spin_read();
+
+ loop {
+ // If we have just been woken up, first check for a `downgrade` call.
+ // Otherwise, if we can read-lock it, lock it.
+ if (has_slept && is_read_lockable_after_wakeup(state)) || is_read_lockable(state) {
+ match self.state.compare_exchange_weak(
+ state,
+ state + READ_LOCKED,
+ Ordering::Acquire,
+ Ordering::Relaxed,
+ ) {
+ Ok(_) => return, // Locked!
+ Err(s) => {
+ state = s;
+ continue;
+ }
+ }
+ }
+
+ // Check for overflow.
+ assert!(
+ !has_reached_max_readers(state),
+ "too many active read locks on RwLock"
+ );
+
+ // Make sure the readers waiting bit is set before we go to sleep.
+ if !has_readers_waiting(state) {
+ if let Err(s) = self.state.compare_exchange(
+ state,
+ state | READERS_WAITING,
+ Ordering::Relaxed,
+ Ordering::Relaxed,
+ ) {
+ state = s;
+ continue;
+ }
+ }
+
+ // Wait for the state to change.
+ self.state.wait(state | READERS_WAITING);
+ has_slept = true;
+
+ // Spin again after waking up.
+ state = self.spin_read();
+ }
+ }
+
+ #[inline]
+ pub fn try_write(&self) -> bool {
+ self.state
+ .fetch_update(Ordering::Acquire, Ordering::Relaxed, |s| {
+ is_unlocked(s).then(|| s + WRITE_LOCKED)
+ })
+ .is_ok()
+ }
+
+ #[inline]
+ pub fn write(&self) {
+ if self
+ .state
+ .compare_exchange_weak(0, WRITE_LOCKED, Ordering::Acquire, Ordering::Relaxed)
+ .is_err()
+ {
+ self.write_contended();
+ }
+ }
+
+ /// # Safety
+ ///
+ /// The `RwLock` must be write-locked (single writer) in order to call this.
+ #[inline]
+ pub unsafe fn write_unlock(&self) {
+ let state = self.state.fetch_sub(WRITE_LOCKED, Ordering::Release) - WRITE_LOCKED;
+
+ debug_assert!(is_unlocked(state));
+
+ if has_writers_waiting(state) || has_readers_waiting(state) {
+ self.wake_writer_or_readers(state);
+ }
+ }
+
+ /// # Safety
+ ///
+ /// The `RwLock` must be write-locked (single writer) in order to call this.
+ #[inline]
+ pub unsafe fn downgrade(&self) {
+ // Removes all write bits and adds a single read bit.
+ let state = self.state.fetch_add(DOWNGRADE, Ordering::Release);
+ debug_assert!(
+ is_write_locked(state),
+ "RwLock must be write locked to call `downgrade`"
+ );
+
+ if has_readers_waiting(state) {
+ // Since we had the exclusive lock, nobody else can unset this bit.
+ self.state.fetch_sub(READERS_WAITING, Ordering::Relaxed);
+ self.state.wake_all();
+ }
+ }
+
+ #[cold]
+ fn write_contended(&self) {
+ let mut state = self.spin_write();
+
+ let mut other_writers_waiting = 0;
+
+ loop {
+ // If it's unlocked, we try to lock it.
+ if is_unlocked(state) {
+ match self.state.compare_exchange_weak(
+ state,
+ state | WRITE_LOCKED | other_writers_waiting,
+ Ordering::Acquire,
+ Ordering::Relaxed,
+ ) {
+ Ok(_) => return, // Locked!
+ Err(s) => {
+ state = s;
+ continue;
+ }
+ }
+ }
+
+ // Set the waiting bit indicating that we're waiting on it.
+ if !has_writers_waiting(state) {
+ if let Err(s) = self.state.compare_exchange(
+ state,
+ state | WRITERS_WAITING,
+ Ordering::Relaxed,
+ Ordering::Relaxed,
+ ) {
+ state = s;
+ continue;
+ }
+ }
+
+ // Other writers might be waiting now too, so we should make sure
+ // we keep that bit on once we manage lock it.
+ other_writers_waiting = WRITERS_WAITING;
+
+ // Examine the notification counter before we check if `state` has changed,
+ // to make sure we don't miss any notifications.
+ let seq = self.writer_notify.load(Ordering::Acquire);
+
+ // Don't go to sleep if the lock has become available,
+ // or if the writers waiting bit is no longer set.
+ state = self.state.load(Ordering::Relaxed);
+ if is_unlocked(state) || !has_writers_waiting(state) {
+ continue;
+ }
+
+ // Wait for the state to change.
+ self.writer_notify.wait(seq);
+
+ // Spin again after waking up.
+ state = self.spin_write();
+ }
+ }
+
+ /// Wakes up waiting threads after unlocking.
+ ///
+ /// If both are waiting, this will wake up only one writer, but will fall
+ /// back to waking up readers if there was no writer to wake up.
+ #[cold]
+ fn wake_writer_or_readers(&self, mut state: Primitive) {
+ assert!(is_unlocked(state));
+
+ // The readers waiting bit might be turned on at any point now,
+ // since readers will block when there's anything waiting.
+ // Writers will just lock the lock though, regardless of the waiting bits,
+ // so we don't have to worry about the writer waiting bit.
+ //
+ // If the lock gets locked in the meantime, we don't have to do
+ // anything, because then the thread that locked the lock will take
+ // care of waking up waiters when it unlocks.
+
+ // If only writers are waiting, wake one of them up.
+ if state == WRITERS_WAITING {
+ match self
+ .state
+ .compare_exchange(state, 0, Ordering::Relaxed, Ordering::Relaxed)
+ {
+ Ok(_) => {
+ self.wake_writer();
+ return;
+ }
+ Err(s) => {
+ // Maybe some readers are now waiting too. So, continue to the next `if`.
+ state = s;
+ }
+ }
+ }
+
+ // If both writers and readers are waiting, leave the readers waiting
+ // and only wake up one writer.
+ if state == READERS_WAITING + WRITERS_WAITING {
+ if self
+ .state
+ .compare_exchange(state, READERS_WAITING, Ordering::Relaxed, Ordering::Relaxed)
+ .is_err()
+ {
+ // The lock got locked. Not our problem anymore.
+ return;
+ }
+ if self.wake_writer() {
+ return;
+ }
+ // No writers were actually blocked on futex_wait, so we continue
+ // to wake up readers instead, since we can't be sure if we notified a writer.
+ state = READERS_WAITING;
+ }
+
+ // If readers are waiting, wake them all up.
+ if state == READERS_WAITING
+ && self
+ .state
+ .compare_exchange(state, 0, Ordering::Relaxed, Ordering::Relaxed)
+ .is_ok()
+ {
+ self.state.wake_all();
+ }
+ }
+
+ /// This wakes one writer and returns true if we woke up a writer that was
+ /// blocked on futex_wait.
+ ///
+ /// If this returns false, it might still be the case that we notified a
+ /// writer that was about to go to sleep.
+ fn wake_writer(&self) -> bool {
+ self.writer_notify.fetch_add(1, Ordering::Release);
+ self.writer_notify.wake()
+ // Note that FreeBSD and DragonFlyBSD don't tell us whether they woke
+ // up any threads or not, and always return `false` here. That still
+ // results in correct behavior: it just means readers get woken up as
+ // well in case both readers and writers were waiting.
+ }
+
+ /// Spin for a while, but stop directly at the given condition.
+ #[inline]
+ fn spin_until(&self, f: impl Fn(Primitive) -> bool) -> Primitive {
+ let mut spin = SPIN_COUNT; // Chosen by fair dice roll.
+ loop {
+ let state = self.state.load(Ordering::Relaxed);
+ if f(state) || spin == 0 {
+ return state;
+ }
+ core::hint::spin_loop();
+ spin -= 1;
+ }
+ }
+
+ #[inline]
+ fn spin_write(&self) -> Primitive {
+ // Stop spinning when it's unlocked or when there's waiting writers, to keep things somewhat fair.
+ self.spin_until(|state| is_unlocked(state) || has_writers_waiting(state))
+ }
+
+ #[inline]
+ fn spin_read(&self) -> Primitive {
+ // Stop spinning when it's unlocked or read locked, or when there's waiting threads.
+ self.spin_until(|state| {
+ !is_write_locked(state) || has_readers_waiting(state) || has_writers_waiting(state)
+ })
+ }
+}