Created
September 15, 2025 04:18
-
-
Save frroossst/7d3c9eb5c04a873fc91e19cb6e042689 to your computer and use it in GitHub Desktop.
Rust Impl of Hilligan's C concurrent queue list
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| use std::sync::atomic::{AtomicU32, AtomicU64, AtomicUsize, Ordering}; | |
| use std::mem::MaybeUninit; | |
| use std::cell::UnsafeCell; | |
| #[derive(Debug, PartialEq)] | |
| pub enum QueueResult { | |
| Success = 0, | |
| Failure = 1, | |
| } | |
| /// UnsafeCell<MaybeUninit<T>> to allow interior mutability for the stored item | |
| pub struct Node<T> { | |
| next: AtomicU32, | |
| previous: AtomicU32, | |
| item: UnsafeCell<MaybeUninit<T>>, | |
| } | |
| impl<T> Node<T> { | |
| fn new() -> Self { | |
| Self { | |
| next: AtomicU32::new(0), | |
| previous: AtomicU32::new(0), | |
| item: UnsafeCell::new(MaybeUninit::uninit()), | |
| } | |
| } | |
| } | |
| pub struct ConcurrentQueue<T> { | |
| /// Combined head (lower 32 bits) and tail (upper 32 bits) indices | |
| /// This allows atomic updates of both head and tail simultaneously | |
| node_index: AtomicU64, | |
| /// Pre-allocated array of nodes | |
| nodes: Box<[Node<T>]>, | |
| /// Index of the first free node (1-based, 0 = no free nodes) | |
| free_index: AtomicU32, | |
| /// Total number of nodes in the array | |
| node_count: AtomicUsize, | |
| } | |
| impl<T> ConcurrentQueue<T> { | |
| /// queueInit() in the C implementation | |
| pub fn new(node_count: usize) -> Self { | |
| // Create nodes array | |
| let mut nodes: Vec<Node<T>> = Vec::with_capacity(node_count); | |
| for _ in 0..node_count { | |
| nodes.push(Node::new()); | |
| } | |
| let nodes = nodes.into_boxed_slice(); | |
| let queue = Self { | |
| node_index: AtomicU64::new(0), // Initially empty (head=0, tail=0) | |
| nodes, | |
| free_index: AtomicU32::new(1), // start free chain at index 1 | |
| node_count: AtomicUsize::new(0), | |
| }; | |
| // init the free chain | |
| for i in 0..node_count { | |
| let next_index = if i == node_count - 1 { 0 } else { (i + 2) as u32 }; | |
| queue.nodes[i].next.store(next_index, Ordering::Relaxed); | |
| queue.nodes[i].previous.store(0, Ordering::Relaxed); | |
| } | |
| queue | |
| } | |
| #[inline] | |
| pub fn len(&self) -> usize { | |
| self.node_count.load(Ordering::Acquire) | |
| } | |
| /// Pack head and tail indices into a single u64 | |
| /// Upper 32 bits = tail, Lower 32 bits = head | |
| #[inline] | |
| fn pack_indices(tail: u32, head: u32) -> u64 { | |
| ((tail as u64) << 32) | (head as u64) | |
| } | |
| /// Extract tail from packed index (upper 32 bits) | |
| #[inline] | |
| fn get_tail(packed: u64) -> u32 { | |
| (packed >> 32) as u32 | |
| } | |
| /// Extract head from packed index (lower 32 bits) | |
| #[inline] | |
| fn get_head(packed: u64) -> u32 { | |
| packed as u32 | |
| } | |
| /// Atomically push an item onto the queue | |
| pub fn push(&self, item: T) -> QueueResult { | |
| // First CAS loop: acquire a free node | |
| let acquired_index = loop { | |
| let free_index = self.free_index.load(Ordering::Acquire); | |
| // No free nodes available | |
| if free_index == 0 { | |
| return QueueResult::Failure; | |
| } | |
| // Get the next free node in the chain | |
| let next_free = self.nodes[(free_index - 1) as usize].next.load(Ordering::Acquire); | |
| // Try to update the free chain to point to the next free node | |
| // This corresponds to: casi(&queue->freeIndex, freeIndex, queue->nodes[freeIndex-1].next) | |
| match self.free_index.compare_exchange_weak( | |
| free_index, | |
| next_free, | |
| Ordering::AcqRel, | |
| Ordering::Acquire | |
| ) { | |
| Ok(_) => break free_index, // Successfully acquired the node | |
| Err(_) => continue, // Retry - another thread got there first | |
| } | |
| }; | |
| // Store the item in the acquired node | |
| // SAFETY: We now own this node exclusively, so we can write to it | |
| unsafe { | |
| (*self.nodes[(acquired_index - 1) as usize].item.get()).as_mut_ptr().write(item); | |
| } | |
| // Second CAS loop: update the queue head/tail | |
| loop { | |
| let node_index = self.node_index.load(Ordering::Acquire); | |
| let new_index = if node_index == 0 { | |
| // Queue is empty - set both head and tail to our node | |
| self.nodes[(acquired_index - 1) as usize].next.store(0, Ordering::Release); | |
| Self::pack_indices(acquired_index, acquired_index) | |
| } else { | |
| // Queue has items - add to tail | |
| let tail = Self::get_tail(node_index); | |
| let head = Self::get_head(node_index); | |
| // Point our node to the current tail | |
| self.nodes[(acquired_index - 1) as usize].next.store(tail, Ordering::Release); | |
| // Create new index with old head and new tail | |
| Self::pack_indices(acquired_index, head) | |
| }; | |
| // Try to update the queue's head/tail indices | |
| match self.node_index.compare_exchange_weak( | |
| node_index, | |
| new_index, | |
| Ordering::AcqRel, | |
| Ordering::Acquire | |
| ) { | |
| Ok(_) => { | |
| // Success! Now update the previous pointer of the old tail | |
| if node_index != 0 { | |
| let old_tail = Self::get_tail(node_index); | |
| if old_tail != 0 { | |
| self.nodes[(old_tail - 1) as usize].previous.store( | |
| acquired_index, | |
| Ordering::Release | |
| ); | |
| } | |
| } | |
| break; | |
| } | |
| Err(_) => continue, // Retry with new values | |
| } | |
| } | |
| QueueResult::Success | |
| } | |
| /// Atomically pop an item from the queue | |
| pub fn pop(&self) -> Result<T, QueueResult> { | |
| // CAS loop to pop from the head | |
| let popped_index = loop { | |
| let node_index = self.node_index.load(Ordering::Acquire); | |
| // Queue is empty | |
| if node_index == 0 { | |
| return Err(QueueResult::Failure); | |
| } | |
| let head = Self::get_head(node_index); | |
| let tail = Self::get_tail(node_index); | |
| if head == 0 { | |
| return Err(QueueResult::Failure); | |
| } | |
| let new_index = if head == tail { | |
| // Last item in queue - make queue empty | |
| 0 | |
| } else { | |
| // Get the previous node to become new head | |
| loop { | |
| let previous = self.nodes[(head - 1) as usize].previous.load(Ordering::Acquire); | |
| if previous != 0 { | |
| break Self::pack_indices(tail, previous); | |
| } | |
| // Spin waiting for push to set the previous pointer | |
| std::hint::spin_loop(); | |
| } | |
| }; | |
| // Try to update the queue indices | |
| match self.node_index.compare_exchange_weak( | |
| node_index, | |
| new_index, | |
| Ordering::AcqRel, | |
| Ordering::Acquire | |
| ) { | |
| Ok(_) => break head, | |
| Err(_) => continue, | |
| } | |
| }; | |
| // Extract the item from the popped node | |
| // SAFETY: We now own this node and it contains a valid item | |
| let item = unsafe { | |
| (*self.nodes[(popped_index - 1) as usize].item.get()).as_ptr().read() | |
| }; | |
| // Reset the previous pointer | |
| self.nodes[(popped_index - 1) as usize].previous.store(0, Ordering::Release); | |
| // Return the node to the free chain | |
| loop { | |
| let free_index = self.free_index.load(Ordering::Acquire); | |
| self.nodes[(popped_index - 1) as usize].next.store(free_index, Ordering::Release); | |
| match self.free_index.compare_exchange_weak( | |
| free_index, | |
| popped_index, | |
| Ordering::AcqRel, | |
| Ordering::Acquire | |
| ) { | |
| Ok(_) => break, | |
| Err(_) => continue, | |
| } | |
| } | |
| Ok(item) | |
| } | |
| /// destroy a node from the free chain (used for queue resizing) | |
| pub fn destroy_node(&self) -> QueueResult { | |
| loop { | |
| let free_index = self.free_index.load(Ordering::Acquire); | |
| if free_index == 0 { | |
| return QueueResult::Failure; | |
| } | |
| let next_free = self.nodes[(free_index - 1) as usize].next.load(Ordering::Acquire); | |
| match self.free_index.compare_exchange_weak( | |
| free_index, | |
| next_free, | |
| Ordering::AcqRel, | |
| Ordering::Acquire | |
| ) { | |
| Ok(_) => return QueueResult::Success, | |
| Err(_) => continue, | |
| } | |
| } | |
| } | |
| /// pop an item and destroy the node (don't return it to free chain) | |
| pub fn pop_destroyed(&self) -> Result<T, QueueResult> { | |
| // Same as pop but without returning node to free chain | |
| let popped_index = loop { | |
| let node_index = self.node_index.load(Ordering::Acquire); | |
| if node_index == 0 { | |
| return Err(QueueResult::Failure); | |
| } | |
| let head = Self::get_head(node_index); | |
| let tail = Self::get_tail(node_index); | |
| if head == 0 { | |
| return Err(QueueResult::Failure); | |
| } | |
| let new_index = if head == tail { | |
| 0 | |
| } else { | |
| loop { | |
| let previous = self.nodes[(head - 1) as usize].previous.load(Ordering::Acquire); | |
| if previous != 0 { | |
| break Self::pack_indices(tail, previous); | |
| } | |
| std::hint::spin_loop(); | |
| } | |
| }; | |
| match self.node_index.compare_exchange_weak( | |
| node_index, | |
| new_index, | |
| Ordering::AcqRel, | |
| Ordering::Acquire | |
| ) { | |
| Ok(_) => break head, | |
| Err(_) => continue, | |
| } | |
| }; | |
| // Extract the item | |
| let item = unsafe { | |
| (*self.nodes[(popped_index - 1) as usize].item.get()).as_ptr().read() | |
| }; | |
| // reset previous pointer but don't return to free chain | |
| self.nodes[(popped_index - 1) as usize].previous.store(0, Ordering::Release); | |
| Ok(item) | |
| } | |
| pub fn is_empty(&self) -> bool { | |
| self.node_index.load(Ordering::Acquire) == 0 | |
| } | |
| } | |
| unsafe impl<T: Send> Send for ConcurrentQueue<T> {} | |
| unsafe impl<T: Send> Sync for ConcurrentQueue<T> {} | |
| impl<T> Drop for ConcurrentQueue<T> { | |
| fn drop(&mut self) { | |
| while let Ok(_) = self.pop() { | |
| } | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment