Skip to content

Instantly share code, notes, and snippets.

@ysbaddaden
Last active February 22, 2025 11:09
Show Gist options
  • Save ysbaddaden/1f08756b75056d1cb2dfd443599a9885 to your computer and use it in GitHub Desktop.
Save ysbaddaden/1f08756b75056d1cb2dfd443599a9885 to your computer and use it in GitHub Desktop.
require "crystal/spin_lock"
require "crystal/pointer_linked_list"
# Synchronize execution of concurrently running fibers.
#
# This can be used to replace polling with a waiting list that can be resumed
# when resources are available, while still behaving inside a mutually exclusive
# context: when a waiting fiber is resumed, the mutex will be relocked.
#
# Can also be used as a notification system without a mutex.
class ConditionVariable
private struct Waiting
include Crystal::PointerLinkedList::Node
def initialize(@fiber : Fiber)
end
def enqueue : Nil
@fiber.enqueue
end
end
def initialize
@spin = Crystal::SpinLock.new
@list = Crystal::PointerLinkedList(Waiting).new
end
# Wait until the condition variable is signaled. If a *mutex* is passed it
# will be unlocked before suspending the fiber and relocked when the fiber is
# resumed and before returning.
def wait(mutex : Mutex?) : Nil
waiting = Waiting.new(Fiber.current)
@spin.sync { @list.push(pointerof(waiting)) }
mutex.try(&.unlock)
Fiber.suspend
mutex.try(&.lock)
end
# Wakes up one waiting fiber (if any).
def signal : Nil
waiting = @spin.sync { @list.shift? }
waiting.try(&.enqueue)
end
# Wakes up all waiting fibers at once (if any).
def broadcast : Nil
list = @spin.sync do
copy = @list
@list.clear
copy
end
list.each(&.enqueue)
end
end
require "minitest/autorun"
require "./condition_variable"
class ConditionVariableTest < Minitest::Test
def test_signal
m = Mutex.new(:unchecked)
c = ConditionVariable.new
done = waiting = 0
100.times do
::spawn do
m.synchronize do
waiting += 1
c.wait(m)
done += 1
end
end
end
eventually { assert_equal 100, waiting }
# resume fibers one by one
0.upto(99) do |i|
eventually { assert_equal i, done }
c.signal
Fiber.yield
end
eventually { assert_equal 100, done }
end
def test_broadcast
m = Mutex.new(:unchecked)
c = ConditionVariable.new
done = waiting = 0
100.times do
::spawn do
m.synchronize do
waiting += 1
c.wait(m)
done += 1
end
end
end
eventually { assert_equal 100, waiting }
assert_equal 0, done
# resume all fibers at once
c.broadcast
eventually { assert_equal 100, done }
end
def test_producer_consumer
m = Mutex.new(:unchecked)
c = ConditionVariable.new
state = -1
ready = false
::spawn(name: "consumer") do
m.synchronize do
ready = true
c.wait(m)
assert_equal 1, state
state = 2
end
end
::spawn(name: "producer") do
eventually { assert ready, "expected consumer to eventually be ready" }
m.synchronize { state = 1 }
c.signal
end
eventually { assert_equal 2, state }
end
protected def eventually(timeout : Time::Span = 5.seconds, &)
start = Time.monotonic
loop do
Fiber.yield
begin
yield
rescue ex
raise ex if (Time.monotonic - start) > timeout
else
break
end
end
end
end
@straight-shoota
Copy link

The Waiting type looks like it could be replaced with Fiber::PointerLinkedListNode?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment