Skip to content

Instantly share code, notes, and snippets.

@ysbaddaden
Last active December 20, 2024 11:56
Show Gist options
  • Save ysbaddaden/5fdd55e2da617cf20cb1df6e92e60688 to your computer and use it in GitHub Desktop.
Save ysbaddaden/5fdd55e2da617cf20cb1df6e92e60688 to your computer and use it in GitHub Desktop.
Crystal port of nsync mu (missing conditions) + Mutex & RWLock objects
# Crystal port of the mutex from the "nsync" library.
#
# Copyright 2016 Google Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# References:
# - <https://github.com/google/nsync>
# - <https://justine.lol/mutex/>
require "../futex"
require "crystal/pointer_linked_list"
# TODO: ConditionVariable
module Nsync
# TODO: protection (unchecked, checked, reentrant)
# TODO: #lock(timeout : Time::Span)
class Mutex(WaiterT)
def initialize
@mu = MU(WaiterT).new
end
def try_lock? : Bool
@mu.try_lock?
end
def lock : Nil
@mu.lock
end
def unlock : Nil
@mu.unlock
end
def held? : Bool
@mu.held?
end
def synchronize(& : -> F) forall F
lock
begin
yield
ensure
unlock
end
end
end
ThreadMutex = Mutex(ThreadWaiter)
# FiberMutex = Mutex(FiberWaiter)
# TODO: protection (checked)
# TODO: #write_lock(timeout : Time::Span)
# TODO: #read_lock(timeout : Time::Span)
class RWLock(WaiterT)
def initialize
@mu = MU(WaiterT).new
end
def try_write_lock? : Bool
@mu.try_lock?
end
def write_lock : Nil
@mu.lock
end
def write_unlock : Nil
@mu.unlock
end
def write_held? : Bool
@mu.held?
end
def write_synchronize(& : -> F) forall F
write_lock
begin
yield
ensure
write_unlock
end
end
def try_read_lock? : Bool
@mu.try_rlock?
end
def read_lock : Nil
@mu.rlock
end
def read_unlock : Nil
@mu.runlock
end
def read_held? : Bool
@mu.rheld?
end
def read_synchronize(& : -> F) forall F
read_lock
begin
yield
ensure
read_unlock
end
end
end
ThreadRWLock = RWLock(ThreadWaiter)
# FiberRWLock = RWLock(FiberWaiter)
abstract struct Waiter
enum Type
Reader
Writer
end
def initialize(@type : Type)
@waiting = Atomic(UInt32).new(0_u32)
end
def reader? : Bool
@type.reader?
end
def writer? : Bool
@type.writer?
end
def waiting! : Nil
@waiting.set(1_u32, :relaxed)
end
def wait : Nil
while @waiting.get(:acquire) != 0_u32
wait_impl
end
end
def wake : Nil
@waiting.set(0_u32, :release)
wake_impl
end
private abstract def wait_impl : Nil
private abstract def wake_impl : Nil
# def self.yield : Nil
end
# TODO: support macOS, FreeBSD and other systems that have futex-like syscalls
struct ThreadWaiter < Waiter
# include Crystal::PointerLinkedList::Node
property previous : self*
property next : self*
def initialize(type : Waiter::Type)
super(type)
@previous = Pointer(self).null
@next = Pointer(self).null
end
private def wait_impl : Nil
Futex.wait(pointerof(@waiting.@value), value: 1_u32)
end
private def wake_impl : Nil
Futex.wake(pointerof(@waiting.@value), count: 1)
end
def self.yield : Nil
Thread.yield
end
end
# struct FiberWaiter < Waiter
# # include Crystal::PointerLinkedList::Node
# property previous : self*
# property next : self*
# def initialize(type : Waiter::Type)
# super(type)
# @previous = Pointer(self).null
# @next = Pointer(self).null
# @fiber = Fiber.current
# end
# private def wait_impl : Nil
# Fiber.suspend if @waiting.get(:acquire) == 1_u32
# end
# private def wake_impl : Nil
# @fiber.enqueue
# end
# def self.yield : Nil
# Fiber.yield
# end
# end
# The foundation type to implement them all.
struct MU(WaiterT)
# TODO: conditions, ...
UNLOCKED = 0_u32
WRITER_LOCK = 1_u32
SPINLOCK = 2_u32
WAITING = 4_u32
DESIG_WAKER = 8_u32
WRITER_WAITING = 16_u32
LONG_WAIT = 32_u32
READER_LOCK = 256_u32
READER_LOCK_FIELD = ~(READER_LOCK - 1_u32)
ANY_LOCK = WRITER_LOCK | READER_LOCK_FIELD
WZERO_TO_ACQUIRE = ANY_LOCK | LONG_WAIT
WADD_TO_ACQUIRE = WRITER_LOCK
WSET_WHEN_WAITING = WAITING | WRITER_WAITING
WCLEAR_ON_ACQUIRE = WRITER_WAITING
RZERO_TO_ACQUIRE = WRITER_LOCK | WRITER_WAITING | LONG_WAIT
RADD_TO_ACQUIRE = READER_LOCK
RSET_WHEN_WAITING = WAITING
RCLEAR_ON_ACQUIRE = 0_u32
LONG_WAIT_THRESHOLD = 30
def initialize
@m = Atomic(UInt32).new(UNLOCKED)
@waiters = Crystal::PointerLinkedList(WaiterT).new
end
# Block until self is free and then acquire it in write mode.
@[NoInline]
def lock : Nil
unless try_lock?
waiter = WaiterT.new(:writer)
lock_slow(pointerof(waiter), WZERO_TO_ACQUIRE, WADD_TO_ACQUIRE, WSET_WHEN_WAITING, WCLEAR_ON_ACQUIRE)
end
end
# Attempt to acquire self in write mode without blocking. Returns `true` iff
# successful.
def try_lock? : Bool
m, success = @m.compare_and_set(UNLOCKED, WADD_TO_ACQUIRE, :acquire, :relaxed)
return true if success
if (m & WZERO_TO_ACQUIRE) == 0_u32
_, success = @m.compare_and_set(m, (m &+ WADD_TO_ACQUIRE) & ~WCLEAR_ON_ACQUIRE, :acquire, :relaxed)
success
else
false
end
end
# Block until self can be acquired in read mode and then acquire it.
@[NoInline]
def rlock : Nil
unless try_rlock?
waiter = WaiterT.new(:reader)
lock_slow(pointerof(waiter), RZERO_TO_ACQUIRE, RADD_TO_ACQUIRE, RSET_WHEN_WAITING, RCLEAR_ON_ACQUIRE)
end
end
# Attempt to acquire self in read mode without blocking. Returns `true` iff
# successful.
def try_rlock? : Bool
m, success = @m.compare_and_set(UNLOCKED, RADD_TO_ACQUIRE, :acquire, :relaxed)
return true if success
if (m & RZERO_TO_ACQUIRE) == 0_u32
_, success = @m.compare_and_set(m, (m &+ RADD_TO_ACQUIRE) & ~RCLEAR_ON_ACQUIRE, :acquire, :relaxed)
success
else
false
end
end
def lock_slow(waiter, zero_to_acquire, add_to_acquire, set_when_waiting, clear_on_acquire) : Nil
attempts = 0
wait_count = 0
clear = 0
long_wait = 0
while true
m = @m.get(:relaxed)
if (m & zero_to_acquire) == 0_u32
# unlocked: try to grab the lock
_, success = @m.compare_and_set(m, (m &+ add_to_acquire) & ~(clear | long_wait | clear_on_acquire), :acquire, :relaxed)
return if success
elsif (m & SPINLOCK) == 0_u32
# try to grab the spinlock; also set the waiting bit to declare that a
# waiter might be queued
m, success = @m.compare_and_set(m, (m | SPINLOCK | long_wait | set_when_waiting) & ~clear, :acquire, :relaxed)
if success
# we can now queue the waiter; first wait goes to the tail, subsequent
# ones go to the head
waiter.value.waiting!
if wait_count == 0
@waiters.push(waiter)
else
@waiters.unshift(waiter)
end
release_spinlock
# sleep
waiter.value.wait
# retry
wait_count += 1
if wait_count == LONG_WAIT_THRESHOLD
# if the thread has been woken more than this many times, and still
# not acquired, set the LONG_WAIT bit to prevent thread that have not
# waited from acquiring; this avoids thread starvation; the number
# should be high enough to still benefit from not having to wait until
# absolutely necessary
long_wait = LONG_WAIT
end
attempts = 0
clear = DESIG_WAKER
# waiters woken at least once don't care about waiting writers or long waiters
zero_to_acquire &= ~(WRITER_WAITING | LONG_WAIT)
end
end
attempts = delay(attempts)
end
end
# Unlock self, that must be held in write mode.
@[NoInline]
def unlock : Nil
m, success = @m.compare_and_set(WRITER_LOCK, UNLOCKED, :release, :relaxed)
return if success
# sanity check
if (m & (READER_LOCK_FIELD | WRITER_LOCK)) == 0
if (m & READER_LOCK_FIELD) != 0
panic! "attempt to unlock a lock held in read mode"
else
panic! "attempt to unlock a lock not held in write mode"
end
end
unless (m & (WAITING | DESIG_WAKER)) == WAITING
_, success = @m.compare_and_set(m, m - WRITER_LOCK, :release, :relaxed)
return if success
end
# there are waiters and no designated waker (or we failed the CAS)
unlock_slow(WADD_TO_ACQUIRE)
end
# Unlock self, that must be held in read mode.
@[NoInline]
def runlock : Nil
m, success = @m.compare_and_set(READER_LOCK, UNLOCKED, :release, :relaxed)
return if success
# sanity check
if ((m ^ WRITER_LOCK) & (WRITER_LOCK | READER_LOCK_FIELD)) == 0
if (m & WRITER_LOCK) != 0
panic! "attempt to runlock a lock held in write mode"
else
panic! "attempt to runlock a lock not held in read mode"
end
end
if (m & (WAITING | DESIG_WAKER)) == WAITING && (m & READER_LOCK_FIELD) == READER_LOCK
# there are waiters and no designated waker, the last reader is
# unlocking, and not all waiters have a false condition: must take the
# slow path to wake a waiter
unlock_slow(WADD_TO_ACQUIRE)
else
_, success = @m.compare_and_set(m, m - READER_LOCK, :release, :relaxed)
unlock_slow(WADD_TO_ACQUIRE) unless success
end
end
def unlock_slow(add_to_acquire) : Nil
attempts = 0
while true
m = @m.get(:relaxed)
early_release_mu = add_to_acquire
set_on_release = 0_u32
if (m & WAITING) == 0 || (m & DESIG_WAKER) != 0 || (m & READER_LOCK_FIELD) > READER_LOCK
# no one to wake, there's a designated waker waking up, or there are
# still readers: try to unlock
_, success = @m.compare_and_set(m, m &- add_to_acquire, :release, :relaxed)
return if success
elsif (m & SPINLOCK) == 0
# try to lock spinlock
_, success = @m.compare_and_set(m, (m &- early_release_mu) | SPINLOCK | DESIG_WAKER, :acquire_release, :relaxed)
if success
# try to wake up a waiter
wake = Crystal::PointerLinkedList(WaiterT).new
if waiter = @waiters.first?
if waiter.value.writer?
# wake the first writer
@waiters.shift?
wake.push(waiter)
else
# wake all readers until we reach a writer
while waiter = @waiters.shift?
if waiter.value.writer?
# failed to wake a writer that could acquire if it were first
set_on_release |= WRITER_WAITING
break
end
wake.push(waiter)
end
end
end
clear_on_release = SPINLOCK
clear_on_release |= DESIG_WAKER if wake.empty?
clear_on_release |= WAITING | WRITER_WAITING if @waiters.empty?
# release spinlock & clear flag(s)
m = @m.get(:relaxed)
while true
m, success = @m.compare_and_set(m, (m | set_on_release) & ~clear_on_release, :release, :relaxed)
break if success
end
wake.consume_each do |waiter|
waiter.value.wake
end
return
end
end
attempts = delay(attempts)
end
end
# Returns whether self is held in write mode.
def held? : Bool
(@m.get(:relaxed) & WRITER_LOCK) != 0
end
# Returns whether self is held in read mode.
def rheld? : Bool
m = @m.get(:relaxed)
(m & ANY_LOCK) != 0 && (m & WRITER_LOCK) == 0
end
private def delay(backoff)
if backoff < 7
backoff.times { Intrinsics.pause }
backoff += 1
else
WaiterT.yield
end
backoff
end
private def release_spinlock : Nil
m = @m.get(:relaxed)
while true
m, success = @m.compare_and_set(m, m & ~SPINLOCK, :release, :relaxed)
break if success
end
end
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment