Last active
February 22, 2025 11:09
-
-
Save ysbaddaden/1f08756b75056d1cb2dfd443599a9885 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require "crystal/spin_lock" | |
require "crystal/pointer_linked_list" | |
# Synchronize execution of concurrently running fibers. | |
# | |
# This can be used to replace polling with a waiting list that can be resumed | |
# when resources are available, while still behaving inside a mutually exclusive | |
# context: when a waiting fiber is resumed, the mutex will be relocked. | |
# | |
# Can also be used as a notification system without a mutex. | |
class ConditionVariable | |
private struct Waiting | |
include Crystal::PointerLinkedList::Node | |
def initialize(@fiber : Fiber) | |
end | |
def enqueue : Nil | |
@fiber.enqueue | |
end | |
end | |
def initialize | |
@spin = Crystal::SpinLock.new | |
@list = Crystal::PointerLinkedList(Waiting).new | |
end | |
# Wait until the condition variable is signaled. If a *mutex* is passed it | |
# will be unlocked before suspending the fiber and relocked when the fiber is | |
# resumed and before returning. | |
def wait(mutex : Mutex?) : Nil | |
waiting = Waiting.new(Fiber.current) | |
@spin.sync { @list.push(pointerof(waiting)) } | |
mutex.try(&.unlock) | |
Fiber.suspend | |
mutex.try(&.lock) | |
end | |
# Wakes up one waiting fiber (if any). | |
def signal : Nil | |
waiting = @spin.sync { @list.shift? } | |
waiting.try(&.enqueue) | |
end | |
# Wakes up all waiting fibers at once (if any). | |
def broadcast : Nil | |
list = @spin.sync do | |
copy = @list | |
@list.clear | |
copy | |
end | |
list.each(&.enqueue) | |
end | |
end |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require "minitest/autorun" | |
require "./condition_variable" | |
class ConditionVariableTest < Minitest::Test | |
def test_signal | |
m = Mutex.new(:unchecked) | |
c = ConditionVariable.new | |
done = waiting = 0 | |
100.times do | |
::spawn do | |
m.synchronize do | |
waiting += 1 | |
c.wait(m) | |
done += 1 | |
end | |
end | |
end | |
eventually { assert_equal 100, waiting } | |
# resume fibers one by one | |
0.upto(99) do |i| | |
eventually { assert_equal i, done } | |
c.signal | |
Fiber.yield | |
end | |
eventually { assert_equal 100, done } | |
end | |
def test_broadcast | |
m = Mutex.new(:unchecked) | |
c = ConditionVariable.new | |
done = waiting = 0 | |
100.times do | |
::spawn do | |
m.synchronize do | |
waiting += 1 | |
c.wait(m) | |
done += 1 | |
end | |
end | |
end | |
eventually { assert_equal 100, waiting } | |
assert_equal 0, done | |
# resume all fibers at once | |
c.broadcast | |
eventually { assert_equal 100, done } | |
end | |
def test_producer_consumer | |
m = Mutex.new(:unchecked) | |
c = ConditionVariable.new | |
state = -1 | |
ready = false | |
::spawn(name: "consumer") do | |
m.synchronize do | |
ready = true | |
c.wait(m) | |
assert_equal 1, state | |
state = 2 | |
end | |
end | |
::spawn(name: "producer") do | |
eventually { assert ready, "expected consumer to eventually be ready" } | |
m.synchronize { state = 1 } | |
c.signal | |
end | |
eventually { assert_equal 2, state } | |
end | |
protected def eventually(timeout : Time::Span = 5.seconds, &) | |
start = Time.monotonic | |
loop do | |
Fiber.yield | |
begin | |
yield | |
rescue ex | |
raise ex if (Time.monotonic - start) > timeout | |
else | |
break | |
end | |
end | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
The
Waiting
type looks like it could be replaced withFiber::PointerLinkedListNode
?