summaryrefslogtreecommitdiff
path: root/src/futex.rs
blob: ad74f1148aa1d2c3b2ca3028e5cfc5c37a8c1491 (plain)
use core::sync::atomic::Ordering;

use cfg_if::cfg_if;

cfg_if! {
	if #[cfg(any(
		target_os = "linux",
		target_os = "android",
		//all(target_os = "emscripten", target_feature = "atomics"),
		//target_os = "freebsd",
		//target_os = "openbsd",
		//target_os = "dragonfly",
		//target_os = "fuchsia",
	))] {
		mod linux;
		use linux::{Futex, Primitive, SmallFutex, SmallPrimitive};
	} else if #[cfg(all(target_os = "windows", not(target_vendor = "win7")))] {
		mod windows;
		use windows::Futex;
	}
}

const SPIN_COUNT: u32 = 100;

const UNLOCKED: SmallPrimitive = 0;
const LOCKED: SmallPrimitive = 1; // locked, no other threads waiting
const CONTENDED: SmallPrimitive = 2; // locked, and other threads waiting (contended)

pub struct Mutex(SmallFutex);

impl Mutex {
	#[inline]
	pub const fn new() -> Self {
		Self(Futex::new(UNLOCKED))
	}

	/// Locks the mutex
	///
	/// # Safety
	///
	/// UB occurs if the mutex is already locked by the current thread and the
	/// `unsafe_lock` feature is enabled.
	#[inline]
	pub unsafe fn lock(&self) {
		if self
			.0
			.compare_exchange(UNLOCKED, LOCKED, Ordering::Acquire, Ordering::Relaxed)
			.is_err()
		{
			self.lock_contended();
		}
	}

	/// If the mutex is unlocked, it is locked, and this function returns
	/// `true'. Otherwise, `false` is returned.
	#[inline]
	pub unsafe fn try_lock(&self) -> bool {
		self.0
			.compare_exchange(UNLOCKED, LOCKED, Ordering::Acquire, Ordering::Relaxed)
			.is_ok()
	}

	/// Unlocks the mutex
	///
	/// # Safety
	///
	/// UB occurs if the mutex is already unlocked or if it has been locked on
	/// a different thread.
	#[inline]
	pub unsafe fn unlock(&self) {
		if self.0.swap(UNLOCKED, Ordering::Release) == CONTENDED {
			// We only wake up one thread. When that thread locks the mutex, it
			// will mark the mutex as CONTENDED (see lock_contended above),
			// which makes sure that any other waiting threads will also be
			// woken up eventually.
			self.0.wake();
		}
	}

	#[inline]
	pub unsafe fn is_locked(&self) -> bool {
		self.0.load(Ordering::Relaxed) == UNLOCKED
	}

	#[cold]
	fn lock_contended(&self) {
		// Spin first to speed things up if the lock is released quickly.
		let mut state = self.spin();

		// If it's unlocked now, attempt to take the lock
		// without marking it as contended.
		if state == UNLOCKED {
			match self
				.0
				.compare_exchange(UNLOCKED, LOCKED, Ordering::Acquire, Ordering::Relaxed)
			{
				Ok(_) => return, // Locked!
				Err(s) => state = s,
			}
		}

		loop {
			// Put the lock in contended state.
			// We avoid an unnecessary write if it as already set to CONTENDED,
			// to be friendlier for the caches.
			if state != CONTENDED && self.0.swap(CONTENDED, Ordering::Acquire) == UNLOCKED {
				// We changed it from UNLOCKED to CONTENDED, so we just successfully locked it.
				return;
			}

			// Wait for the futex to change state, assuming it is still CONTENDED.
			self.0.wait(CONTENDED);

			// Spin again after waking up.
			state = self.spin();
		}
	}

	fn spin(&self) -> SmallPrimitive {
		let mut spin = SPIN_COUNT;
		loop {
			// We only use `load` (and not `swap` or `compare_exchange`)
			// while spinning, to be easier on the caches.
			let state = self.0.load(Ordering::Acquire);

			// We stop spinning when the mutex is UNLOCKED,
			// but also when it's CONTENDED.
			if state != LOCKED || spin == 0 {
				return state;
			}

			core::hint::spin_loop();
			spin -= 1;
		}
	}
}

pub struct RwLock {
	// The state consists of a 30-bit reader counter, a 'readers waiting' flag, and a 'writers waiting' flag.
	// Bits 0.30:
	//   0: Unlocked
	//   1.=0x3FFF_FFFE: Locked by N readers
	//   0x3FFF_FFFF: Write locked
	// Bit 30: Readers are waiting on this futex.
	// Bit 31: Writers are waiting on the writer_notify futex.
	state: Futex,
	// The 'condition variable' to notify writers through.
	// Incremented on every signal.
	writer_notify: Futex,
}

const READ_LOCKED: Primitive = 1;
const MASK: Primitive = (1 << 30) - 1;
const WRITE_LOCKED: Primitive = MASK;
const DOWNGRADE: Primitive = READ_LOCKED.wrapping_sub(WRITE_LOCKED); // READ_LOCKED - WRITE_LOCKED
const MAX_READERS: Primitive = MASK - 1;
const READERS_WAITING: Primitive = 1 << 30;
const WRITERS_WAITING: Primitive = 1 << 31;

#[inline]
fn is_unlocked(state: Primitive) -> bool {
	state & MASK == 0
}

#[inline]
fn is_write_locked(state: Primitive) -> bool {
	state & MASK == WRITE_LOCKED
}

#[inline]
fn has_readers_waiting(state: Primitive) -> bool {
	state & READERS_WAITING != 0
}

#[inline]
fn has_writers_waiting(state: Primitive) -> bool {
	state & WRITERS_WAITING != 0
}

#[inline]
fn is_read_lockable(state: Primitive) -> bool {
	// This also returns false if the counter could overflow if we tried to read lock it.
	//
	// We don't allow read-locking if there's readers waiting, even if the lock is unlocked
	// and there's no writers waiting. The only situation when this happens is after unlocking,
	// at which point the unlocking thread might be waking up writers, which have priority over readers.
	// The unlocking thread will clear the readers waiting bit and wake up readers, if necessary.
	state & MASK < MAX_READERS && !has_readers_waiting(state) && !has_writers_waiting(state)
}

#[inline]
fn is_read_lockable_after_wakeup(state: Primitive) -> bool {
	// We make a special case for checking if we can read-lock _after_ a reader thread that went to
	// sleep has been woken up by a call to `downgrade`.
	//
	// `downgrade` will wake up all readers and place the lock in read mode. Thus, there should be
	// no readers waiting and the lock should be read-locked (not write-locked or unlocked).
	//
	// Note that we do not check if any writers are waiting. This is because a call to `downgrade`
	// implies that the caller wants other readers to read the value protected by the lock. If we
	// did not allow readers to acquire the lock before writers after a `downgrade`, then only the
	// original writer would be able to read the value, thus defeating the purpose of `downgrade`.
	state & MASK < MAX_READERS
		&& !has_readers_waiting(state)
		&& !is_write_locked(state)
		&& !is_unlocked(state)
}

#[inline]
fn has_reached_max_readers(state: Primitive) -> bool {
	state & MASK == MAX_READERS
}

impl RwLock {
	#[inline]
	pub const fn new() -> Self {
		Self {
			state: Futex::new(0),
			writer_notify: Futex::new(0),
		}
	}

	#[inline]
	pub fn try_read(&self) -> bool {
		self.state
			.fetch_update(Ordering::Acquire, Ordering::Relaxed, |s| {
				is_read_lockable(s).then(|| s + READ_LOCKED)
			})
			.is_ok()
	}

	#[inline]
	pub fn read(&self) {
		let state = self.state.load(Ordering::Relaxed);
		if !is_read_lockable(state)
			|| self
				.state
				.compare_exchange_weak(
					state,
					state + READ_LOCKED,
					Ordering::Acquire,
					Ordering::Relaxed,
				)
				.is_err()
		{
			self.read_contended();
		}
	}

	/// # Safety
	///
	/// The `RwLock` must be read-locked (N readers) in order to call this.
	#[inline]
	pub unsafe fn read_unlock(&self) {
		let state = self.state.fetch_sub(READ_LOCKED, Ordering::Release) - READ_LOCKED;

		// It's impossible for a reader to be waiting on a read-locked RwLock,
		// except if there is also a writer waiting.
		debug_assert!(!has_readers_waiting(state) || has_writers_waiting(state));

		// Wake up a writer if we were the last reader and there's a writer waiting.
		if is_unlocked(state) && has_writers_waiting(state) {
			self.wake_writer_or_readers(state);
		}
	}

	#[cold]
	fn read_contended(&self) {
		let mut has_slept = false;
		let mut state = self.spin_read();

		loop {
			// If we have just been woken up, first check for a `downgrade` call.
			// Otherwise, if we can read-lock it, lock it.
			if (has_slept && is_read_lockable_after_wakeup(state)) || is_read_lockable(state) {
				match self.state.compare_exchange_weak(
					state,
					state + READ_LOCKED,
					Ordering::Acquire,
					Ordering::Relaxed,
				) {
					Ok(_) => return, // Locked!
					Err(s) => {
						state = s;
						continue;
					}
				}
			}

			// Check for overflow.
			assert!(
				!has_reached_max_readers(state),
				"too many active read locks on RwLock"
			);

			// Make sure the readers waiting bit is set before we go to sleep.
			if !has_readers_waiting(state) {
				if let Err(s) = self.state.compare_exchange(
					state,
					state | READERS_WAITING,
					Ordering::Relaxed,
					Ordering::Relaxed,
				) {
					state = s;
					continue;
				}
			}

			// Wait for the state to change.
			self.state.wait(state | READERS_WAITING);
			has_slept = true;

			// Spin again after waking up.
			state = self.spin_read();
		}
	}

	#[inline]
	pub fn try_write(&self) -> bool {
		self.state
			.fetch_update(Ordering::Acquire, Ordering::Relaxed, |s| {
				is_unlocked(s).then(|| s + WRITE_LOCKED)
			})
			.is_ok()
	}

	#[inline]
	pub fn write(&self) {
		if self
			.state
			.compare_exchange_weak(0, WRITE_LOCKED, Ordering::Acquire, Ordering::Relaxed)
			.is_err()
		{
			self.write_contended();
		}
	}

	/// # Safety
	///
	/// The `RwLock` must be write-locked (single writer) in order to call this.
	#[inline]
	pub unsafe fn write_unlock(&self) {
		let state = self.state.fetch_sub(WRITE_LOCKED, Ordering::Release) - WRITE_LOCKED;

		debug_assert!(is_unlocked(state));

		if has_writers_waiting(state) || has_readers_waiting(state) {
			self.wake_writer_or_readers(state);
		}
	}

	/// # Safety
	///
	/// The `RwLock` must be write-locked (single writer) in order to call this.
	#[inline]
	pub unsafe fn downgrade(&self) {
		// Removes all write bits and adds a single read bit.
		let state = self.state.fetch_add(DOWNGRADE, Ordering::Release);
		debug_assert!(
			is_write_locked(state),
			"RwLock must be write locked to call `downgrade`"
		);

		if has_readers_waiting(state) {
			// Since we had the exclusive lock, nobody else can unset this bit.
			self.state.fetch_sub(READERS_WAITING, Ordering::Relaxed);
			self.state.wake_all();
		}
	}

	#[cold]
	fn write_contended(&self) {
		let mut state = self.spin_write();

		let mut other_writers_waiting = 0;

		loop {
			// If it's unlocked, we try to lock it.
			if is_unlocked(state) {
				match self.state.compare_exchange_weak(
					state,
					state | WRITE_LOCKED | other_writers_waiting,
					Ordering::Acquire,
					Ordering::Relaxed,
				) {
					Ok(_) => return, // Locked!
					Err(s) => {
						state = s;
						continue;
					}
				}
			}

			// Set the waiting bit indicating that we're waiting on it.
			if !has_writers_waiting(state) {
				if let Err(s) = self.state.compare_exchange(
					state,
					state | WRITERS_WAITING,
					Ordering::Relaxed,
					Ordering::Relaxed,
				) {
					state = s;
					continue;
				}
			}

			// Other writers might be waiting now too, so we should make sure
			// we keep that bit on once we manage lock it.
			other_writers_waiting = WRITERS_WAITING;

			// Examine the notification counter before we check if `state` has changed,
			// to make sure we don't miss any notifications.
			let seq = self.writer_notify.load(Ordering::Acquire);

			// Don't go to sleep if the lock has become available,
			// or if the writers waiting bit is no longer set.
			state = self.state.load(Ordering::Relaxed);
			if is_unlocked(state) || !has_writers_waiting(state) {
				continue;
			}

			// Wait for the state to change.
			self.writer_notify.wait(seq);

			// Spin again after waking up.
			state = self.spin_write();
		}
	}

	/// Wakes up waiting threads after unlocking.
	///
	/// If both are waiting, this will wake up only one writer, but will fall
	/// back to waking up readers if there was no writer to wake up.
	#[cold]
	fn wake_writer_or_readers(&self, mut state: Primitive) {
		assert!(is_unlocked(state));

		// The readers waiting bit might be turned on at any point now,
		// since readers will block when there's anything waiting.
		// Writers will just lock the lock though, regardless of the waiting bits,
		// so we don't have to worry about the writer waiting bit.
		//
		// If the lock gets locked in the meantime, we don't have to do
		// anything, because then the thread that locked the lock will take
		// care of waking up waiters when it unlocks.

		// If only writers are waiting, wake one of them up.
		if state == WRITERS_WAITING {
			match self
				.state
				.compare_exchange(state, 0, Ordering::Relaxed, Ordering::Relaxed)
			{
				Ok(_) => {
					self.wake_writer();
					return;
				}
				Err(s) => {
					// Maybe some readers are now waiting too. So, continue to the next `if`.
					state = s;
				}
			}
		}

		// If both writers and readers are waiting, leave the readers waiting
		// and only wake up one writer.
		if state == READERS_WAITING + WRITERS_WAITING {
			if self
				.state
				.compare_exchange(state, READERS_WAITING, Ordering::Relaxed, Ordering::Relaxed)
				.is_err()
			{
				// The lock got locked. Not our problem anymore.
				return;
			}
			if self.wake_writer() {
				return;
			}
			// No writers were actually blocked on futex_wait, so we continue
			// to wake up readers instead, since we can't be sure if we notified a writer.
			state = READERS_WAITING;
		}

		// If readers are waiting, wake them all up.
		if state == READERS_WAITING
			&& self
				.state
				.compare_exchange(state, 0, Ordering::Relaxed, Ordering::Relaxed)
				.is_ok()
		{
			self.state.wake_all();
		}
	}

	/// This wakes one writer and returns true if we woke up a writer that was
	/// blocked on futex_wait.
	///
	/// If this returns false, it might still be the case that we notified a
	/// writer that was about to go to sleep.
	fn wake_writer(&self) -> bool {
		self.writer_notify.fetch_add(1, Ordering::Release);
		self.writer_notify.wake()
		// Note that FreeBSD and DragonFlyBSD don't tell us whether they woke
		// up any threads or not, and always return `false` here. That still
		// results in correct behavior: it just means readers get woken up as
		// well in case both readers and writers were waiting.
	}

	/// Spin for a while, but stop directly at the given condition.
	#[inline]
	fn spin_until(&self, f: impl Fn(Primitive) -> bool) -> Primitive {
		let mut spin = SPIN_COUNT; // Chosen by fair dice roll.
		loop {
			let state = self.state.load(Ordering::Relaxed);
			if f(state) || spin == 0 {
				return state;
			}
			core::hint::spin_loop();
			spin -= 1;
		}
	}

	#[inline]
	fn spin_write(&self) -> Primitive {
		// Stop spinning when it's unlocked or when there's waiting writers, to keep things somewhat fair.
		self.spin_until(|state| is_unlocked(state) || has_writers_waiting(state))
	}

	#[inline]
	fn spin_read(&self) -> Primitive {
		// Stop spinning when it's unlocked or read locked, or when there's waiting threads.
		self.spin_until(|state| {
			!is_write_locked(state) || has_readers_waiting(state) || has_writers_waiting(state)
		})
	}
}