Last active
December 20, 2024 11:56
-
-
Save ysbaddaden/5fdd55e2da617cf20cb1df6e92e60688 to your computer and use it in GitHub Desktop.
Crystal port of nsync mu (missing conditions) + Mutex & RWLock objects
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Crystal port of the mutex from the "nsync" library. | |
# | |
# Copyright 2016 Google Inc. | |
# | |
# Licensed under the Apache License, Version 2.0 (the "License"); | |
# you may not use this file except in compliance with the License. | |
# You may obtain a copy of the License at | |
# | |
# http://www.apache.org/licenses/LICENSE-2.0 | |
# | |
# Unless required by applicable law or agreed to in writing, software | |
# distributed under the License is distributed on an "AS IS" BASIS, | |
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
# See the License for the specific language governing permissions and | |
# limitations under the License. | |
# | |
# References: | |
# - <https://github.com/google/nsync> | |
# - <https://justine.lol/mutex/> | |
require "../futex" | |
require "crystal/pointer_linked_list" | |
# TODO: ConditionVariable | |
module Nsync | |
# TODO: protection (unchecked, checked, reentrant) | |
# TODO: #lock(timeout : Time::Span) | |
class Mutex(WaiterT) | |
def initialize | |
@mu = MU(WaiterT).new | |
end | |
def try_lock? : Bool | |
@mu.try_lock? | |
end | |
def lock : Nil | |
@mu.lock | |
end | |
def unlock : Nil | |
@mu.unlock | |
end | |
def held? : Bool | |
@mu.held? | |
end | |
def synchronize(& : -> F) forall F | |
lock | |
begin | |
yield | |
ensure | |
unlock | |
end | |
end | |
end | |
ThreadMutex = Mutex(ThreadWaiter) | |
# FiberMutex = Mutex(FiberWaiter) | |
# TODO: protection (checked) | |
# TODO: #write_lock(timeout : Time::Span) | |
# TODO: #read_lock(timeout : Time::Span) | |
class RWLock(WaiterT) | |
def initialize | |
@mu = MU(WaiterT).new | |
end | |
def try_write_lock? : Bool | |
@mu.try_lock? | |
end | |
def write_lock : Nil | |
@mu.lock | |
end | |
def write_unlock : Nil | |
@mu.unlock | |
end | |
def write_held? : Bool | |
@mu.held? | |
end | |
def write_synchronize(& : -> F) forall F | |
write_lock | |
begin | |
yield | |
ensure | |
write_unlock | |
end | |
end | |
def try_read_lock? : Bool | |
@mu.try_rlock? | |
end | |
def read_lock : Nil | |
@mu.rlock | |
end | |
def read_unlock : Nil | |
@mu.runlock | |
end | |
def read_held? : Bool | |
@mu.rheld? | |
end | |
def read_synchronize(& : -> F) forall F | |
read_lock | |
begin | |
yield | |
ensure | |
read_unlock | |
end | |
end | |
end | |
ThreadRWLock = RWLock(ThreadWaiter) | |
# FiberRWLock = RWLock(FiberWaiter) | |
abstract struct Waiter | |
enum Type | |
Reader | |
Writer | |
end | |
def initialize(@type : Type) | |
@waiting = Atomic(UInt32).new(0_u32) | |
end | |
def reader? : Bool | |
@type.reader? | |
end | |
def writer? : Bool | |
@type.writer? | |
end | |
def waiting! : Nil | |
@waiting.set(1_u32, :relaxed) | |
end | |
def wait : Nil | |
while @waiting.get(:acquire) != 0_u32 | |
wait_impl | |
end | |
end | |
def wake : Nil | |
@waiting.set(0_u32, :release) | |
wake_impl | |
end | |
private abstract def wait_impl : Nil | |
private abstract def wake_impl : Nil | |
# def self.yield : Nil | |
end | |
# TODO: support macOS, FreeBSD and other systems that have futex-like syscalls | |
struct ThreadWaiter < Waiter | |
# include Crystal::PointerLinkedList::Node | |
property previous : self* | |
property next : self* | |
def initialize(type : Waiter::Type) | |
super(type) | |
@previous = Pointer(self).null | |
@next = Pointer(self).null | |
end | |
private def wait_impl : Nil | |
Futex.wait(pointerof(@waiting.@value), value: 1_u32) | |
end | |
private def wake_impl : Nil | |
Futex.wake(pointerof(@waiting.@value), count: 1) | |
end | |
def self.yield : Nil | |
Thread.yield | |
end | |
end | |
# struct FiberWaiter < Waiter | |
# # include Crystal::PointerLinkedList::Node | |
# property previous : self* | |
# property next : self* | |
# def initialize(type : Waiter::Type) | |
# super(type) | |
# @previous = Pointer(self).null | |
# @next = Pointer(self).null | |
# @fiber = Fiber.current | |
# end | |
# private def wait_impl : Nil | |
# Fiber.suspend if @waiting.get(:acquire) == 1_u32 | |
# end | |
# private def wake_impl : Nil | |
# @fiber.enqueue | |
# end | |
# def self.yield : Nil | |
# Fiber.yield | |
# end | |
# end | |
# The foundation type to implement them all. | |
struct MU(WaiterT) | |
# TODO: conditions, ... | |
UNLOCKED = 0_u32 | |
WRITER_LOCK = 1_u32 | |
SPINLOCK = 2_u32 | |
WAITING = 4_u32 | |
DESIG_WAKER = 8_u32 | |
WRITER_WAITING = 16_u32 | |
LONG_WAIT = 32_u32 | |
READER_LOCK = 256_u32 | |
READER_LOCK_FIELD = ~(READER_LOCK - 1_u32) | |
ANY_LOCK = WRITER_LOCK | READER_LOCK_FIELD | |
WZERO_TO_ACQUIRE = ANY_LOCK | LONG_WAIT | |
WADD_TO_ACQUIRE = WRITER_LOCK | |
WSET_WHEN_WAITING = WAITING | WRITER_WAITING | |
WCLEAR_ON_ACQUIRE = WRITER_WAITING | |
RZERO_TO_ACQUIRE = WRITER_LOCK | WRITER_WAITING | LONG_WAIT | |
RADD_TO_ACQUIRE = READER_LOCK | |
RSET_WHEN_WAITING = WAITING | |
RCLEAR_ON_ACQUIRE = 0_u32 | |
LONG_WAIT_THRESHOLD = 30 | |
def initialize | |
@m = Atomic(UInt32).new(UNLOCKED) | |
@waiters = Crystal::PointerLinkedList(WaiterT).new | |
end | |
# Block until self is free and then acquire it in write mode. | |
@[NoInline] | |
def lock : Nil | |
unless try_lock? | |
waiter = WaiterT.new(:writer) | |
lock_slow(pointerof(waiter), WZERO_TO_ACQUIRE, WADD_TO_ACQUIRE, WSET_WHEN_WAITING, WCLEAR_ON_ACQUIRE) | |
end | |
end | |
# Attempt to acquire self in write mode without blocking. Returns `true` iff | |
# successful. | |
def try_lock? : Bool | |
m, success = @m.compare_and_set(UNLOCKED, WADD_TO_ACQUIRE, :acquire, :relaxed) | |
return true if success | |
if (m & WZERO_TO_ACQUIRE) == 0_u32 | |
_, success = @m.compare_and_set(m, (m &+ WADD_TO_ACQUIRE) & ~WCLEAR_ON_ACQUIRE, :acquire, :relaxed) | |
success | |
else | |
false | |
end | |
end | |
# Block until self can be acquired in read mode and then acquire it. | |
@[NoInline] | |
def rlock : Nil | |
unless try_rlock? | |
waiter = WaiterT.new(:reader) | |
lock_slow(pointerof(waiter), RZERO_TO_ACQUIRE, RADD_TO_ACQUIRE, RSET_WHEN_WAITING, RCLEAR_ON_ACQUIRE) | |
end | |
end | |
# Attempt to acquire self in read mode without blocking. Returns `true` iff | |
# successful. | |
def try_rlock? : Bool | |
m, success = @m.compare_and_set(UNLOCKED, RADD_TO_ACQUIRE, :acquire, :relaxed) | |
return true if success | |
if (m & RZERO_TO_ACQUIRE) == 0_u32 | |
_, success = @m.compare_and_set(m, (m &+ RADD_TO_ACQUIRE) & ~RCLEAR_ON_ACQUIRE, :acquire, :relaxed) | |
success | |
else | |
false | |
end | |
end | |
def lock_slow(waiter, zero_to_acquire, add_to_acquire, set_when_waiting, clear_on_acquire) : Nil | |
attempts = 0 | |
wait_count = 0 | |
clear = 0 | |
long_wait = 0 | |
while true | |
m = @m.get(:relaxed) | |
if (m & zero_to_acquire) == 0_u32 | |
# unlocked: try to grab the lock | |
_, success = @m.compare_and_set(m, (m &+ add_to_acquire) & ~(clear | long_wait | clear_on_acquire), :acquire, :relaxed) | |
return if success | |
elsif (m & SPINLOCK) == 0_u32 | |
# try to grab the spinlock; also set the waiting bit to declare that a | |
# waiter might be queued | |
m, success = @m.compare_and_set(m, (m | SPINLOCK | long_wait | set_when_waiting) & ~clear, :acquire, :relaxed) | |
if success | |
# we can now queue the waiter; first wait goes to the tail, subsequent | |
# ones go to the head | |
waiter.value.waiting! | |
if wait_count == 0 | |
@waiters.push(waiter) | |
else | |
@waiters.unshift(waiter) | |
end | |
release_spinlock | |
# sleep | |
waiter.value.wait | |
# retry | |
wait_count += 1 | |
if wait_count == LONG_WAIT_THRESHOLD | |
# if the thread has been woken more than this many times, and still | |
# not acquired, set the LONG_WAIT bit to prevent thread that have not | |
# waited from acquiring; this avoids thread starvation; the number | |
# should be high enough to still benefit from not having to wait until | |
# absolutely necessary | |
long_wait = LONG_WAIT | |
end | |
attempts = 0 | |
clear = DESIG_WAKER | |
# waiters woken at least once don't care about waiting writers or long waiters | |
zero_to_acquire &= ~(WRITER_WAITING | LONG_WAIT) | |
end | |
end | |
attempts = delay(attempts) | |
end | |
end | |
# Unlock self, that must be held in write mode. | |
@[NoInline] | |
def unlock : Nil | |
m, success = @m.compare_and_set(WRITER_LOCK, UNLOCKED, :release, :relaxed) | |
return if success | |
# sanity check | |
if (m & (READER_LOCK_FIELD | WRITER_LOCK)) == 0 | |
if (m & READER_LOCK_FIELD) != 0 | |
panic! "attempt to unlock a lock held in read mode" | |
else | |
panic! "attempt to unlock a lock not held in write mode" | |
end | |
end | |
unless (m & (WAITING | DESIG_WAKER)) == WAITING | |
_, success = @m.compare_and_set(m, m - WRITER_LOCK, :release, :relaxed) | |
return if success | |
end | |
# there are waiters and no designated waker (or we failed the CAS) | |
unlock_slow(WADD_TO_ACQUIRE) | |
end | |
# Unlock self, that must be held in read mode. | |
@[NoInline] | |
def runlock : Nil | |
m, success = @m.compare_and_set(READER_LOCK, UNLOCKED, :release, :relaxed) | |
return if success | |
# sanity check | |
if ((m ^ WRITER_LOCK) & (WRITER_LOCK | READER_LOCK_FIELD)) == 0 | |
if (m & WRITER_LOCK) != 0 | |
panic! "attempt to runlock a lock held in write mode" | |
else | |
panic! "attempt to runlock a lock not held in read mode" | |
end | |
end | |
if (m & (WAITING | DESIG_WAKER)) == WAITING && (m & READER_LOCK_FIELD) == READER_LOCK | |
# there are waiters and no designated waker, the last reader is | |
# unlocking, and not all waiters have a false condition: must take the | |
# slow path to wake a waiter | |
unlock_slow(WADD_TO_ACQUIRE) | |
else | |
_, success = @m.compare_and_set(m, m - READER_LOCK, :release, :relaxed) | |
unlock_slow(WADD_TO_ACQUIRE) unless success | |
end | |
end | |
def unlock_slow(add_to_acquire) : Nil | |
attempts = 0 | |
while true | |
m = @m.get(:relaxed) | |
early_release_mu = add_to_acquire | |
set_on_release = 0_u32 | |
if (m & WAITING) == 0 || (m & DESIG_WAKER) != 0 || (m & READER_LOCK_FIELD) > READER_LOCK | |
# no one to wake, there's a designated waker waking up, or there are | |
# still readers: try to unlock | |
_, success = @m.compare_and_set(m, m &- add_to_acquire, :release, :relaxed) | |
return if success | |
elsif (m & SPINLOCK) == 0 | |
# try to lock spinlock | |
_, success = @m.compare_and_set(m, (m &- early_release_mu) | SPINLOCK | DESIG_WAKER, :acquire_release, :relaxed) | |
if success | |
# try to wake up a waiter | |
wake = Crystal::PointerLinkedList(WaiterT).new | |
if waiter = @waiters.first? | |
if waiter.value.writer? | |
# wake the first writer | |
@waiters.shift? | |
wake.push(waiter) | |
else | |
# wake all readers until we reach a writer | |
while waiter = @waiters.shift? | |
if waiter.value.writer? | |
# failed to wake a writer that could acquire if it were first | |
set_on_release |= WRITER_WAITING | |
break | |
end | |
wake.push(waiter) | |
end | |
end | |
end | |
clear_on_release = SPINLOCK | |
clear_on_release |= DESIG_WAKER if wake.empty? | |
clear_on_release |= WAITING | WRITER_WAITING if @waiters.empty? | |
# release spinlock & clear flag(s) | |
m = @m.get(:relaxed) | |
while true | |
m, success = @m.compare_and_set(m, (m | set_on_release) & ~clear_on_release, :release, :relaxed) | |
break if success | |
end | |
wake.consume_each do |waiter| | |
waiter.value.wake | |
end | |
return | |
end | |
end | |
attempts = delay(attempts) | |
end | |
end | |
# Returns whether self is held in write mode. | |
def held? : Bool | |
(@m.get(:relaxed) & WRITER_LOCK) != 0 | |
end | |
# Returns whether self is held in read mode. | |
def rheld? : Bool | |
m = @m.get(:relaxed) | |
(m & ANY_LOCK) != 0 && (m & WRITER_LOCK) == 0 | |
end | |
private def delay(backoff) | |
if backoff < 7 | |
backoff.times { Intrinsics.pause } | |
backoff += 1 | |
else | |
WaiterT.yield | |
end | |
backoff | |
end | |
private def release_spinlock : Nil | |
m = @m.get(:relaxed) | |
while true | |
m, success = @m.compare_and_set(m, m & ~SPINLOCK, :release, :relaxed) | |
break if success | |
end | |
end | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment