Skip to content

Instantly share code, notes, and snippets.

@frroossst
Created September 15, 2025 04:18
Show Gist options
  • Save frroossst/7d3c9eb5c04a873fc91e19cb6e042689 to your computer and use it in GitHub Desktop.
Save frroossst/7d3c9eb5c04a873fc91e19cb6e042689 to your computer and use it in GitHub Desktop.
Rust Impl of Hilligan's C concurrent queue list
use std::sync::atomic::{AtomicU32, AtomicU64, AtomicUsize, Ordering};
use std::mem::MaybeUninit;
use std::cell::UnsafeCell;
#[derive(Debug, PartialEq)]
pub enum QueueResult {
Success = 0,
Failure = 1,
}
/// UnsafeCell<MaybeUninit<T>> to allow interior mutability for the stored item
pub struct Node<T> {
next: AtomicU32,
previous: AtomicU32,
item: UnsafeCell<MaybeUninit<T>>,
}
impl<T> Node<T> {
fn new() -> Self {
Self {
next: AtomicU32::new(0),
previous: AtomicU32::new(0),
item: UnsafeCell::new(MaybeUninit::uninit()),
}
}
}
pub struct ConcurrentQueue<T> {
/// Combined head (lower 32 bits) and tail (upper 32 bits) indices
/// This allows atomic updates of both head and tail simultaneously
node_index: AtomicU64,
/// Pre-allocated array of nodes
nodes: Box<[Node<T>]>,
/// Index of the first free node (1-based, 0 = no free nodes)
free_index: AtomicU32,
/// Total number of nodes in the array
node_count: AtomicUsize,
}
impl<T> ConcurrentQueue<T> {
/// queueInit() in the C implementation
pub fn new(node_count: usize) -> Self {
// Create nodes array
let mut nodes: Vec<Node<T>> = Vec::with_capacity(node_count);
for _ in 0..node_count {
nodes.push(Node::new());
}
let nodes = nodes.into_boxed_slice();
let queue = Self {
node_index: AtomicU64::new(0), // Initially empty (head=0, tail=0)
nodes,
free_index: AtomicU32::new(1), // start free chain at index 1
node_count: AtomicUsize::new(0),
};
// init the free chain
for i in 0..node_count {
let next_index = if i == node_count - 1 { 0 } else { (i + 2) as u32 };
queue.nodes[i].next.store(next_index, Ordering::Relaxed);
queue.nodes[i].previous.store(0, Ordering::Relaxed);
}
queue
}
#[inline]
pub fn len(&self) -> usize {
self.node_count.load(Ordering::Acquire)
}
/// Pack head and tail indices into a single u64
/// Upper 32 bits = tail, Lower 32 bits = head
#[inline]
fn pack_indices(tail: u32, head: u32) -> u64 {
((tail as u64) << 32) | (head as u64)
}
/// Extract tail from packed index (upper 32 bits)
#[inline]
fn get_tail(packed: u64) -> u32 {
(packed >> 32) as u32
}
/// Extract head from packed index (lower 32 bits)
#[inline]
fn get_head(packed: u64) -> u32 {
packed as u32
}
/// Atomically push an item onto the queue
pub fn push(&self, item: T) -> QueueResult {
// First CAS loop: acquire a free node
let acquired_index = loop {
let free_index = self.free_index.load(Ordering::Acquire);
// No free nodes available
if free_index == 0 {
return QueueResult::Failure;
}
// Get the next free node in the chain
let next_free = self.nodes[(free_index - 1) as usize].next.load(Ordering::Acquire);
// Try to update the free chain to point to the next free node
// This corresponds to: casi(&queue->freeIndex, freeIndex, queue->nodes[freeIndex-1].next)
match self.free_index.compare_exchange_weak(
free_index,
next_free,
Ordering::AcqRel,
Ordering::Acquire
) {
Ok(_) => break free_index, // Successfully acquired the node
Err(_) => continue, // Retry - another thread got there first
}
};
// Store the item in the acquired node
// SAFETY: We now own this node exclusively, so we can write to it
unsafe {
(*self.nodes[(acquired_index - 1) as usize].item.get()).as_mut_ptr().write(item);
}
// Second CAS loop: update the queue head/tail
loop {
let node_index = self.node_index.load(Ordering::Acquire);
let new_index = if node_index == 0 {
// Queue is empty - set both head and tail to our node
self.nodes[(acquired_index - 1) as usize].next.store(0, Ordering::Release);
Self::pack_indices(acquired_index, acquired_index)
} else {
// Queue has items - add to tail
let tail = Self::get_tail(node_index);
let head = Self::get_head(node_index);
// Point our node to the current tail
self.nodes[(acquired_index - 1) as usize].next.store(tail, Ordering::Release);
// Create new index with old head and new tail
Self::pack_indices(acquired_index, head)
};
// Try to update the queue's head/tail indices
match self.node_index.compare_exchange_weak(
node_index,
new_index,
Ordering::AcqRel,
Ordering::Acquire
) {
Ok(_) => {
// Success! Now update the previous pointer of the old tail
if node_index != 0 {
let old_tail = Self::get_tail(node_index);
if old_tail != 0 {
self.nodes[(old_tail - 1) as usize].previous.store(
acquired_index,
Ordering::Release
);
}
}
break;
}
Err(_) => continue, // Retry with new values
}
}
QueueResult::Success
}
/// Atomically pop an item from the queue
pub fn pop(&self) -> Result<T, QueueResult> {
// CAS loop to pop from the head
let popped_index = loop {
let node_index = self.node_index.load(Ordering::Acquire);
// Queue is empty
if node_index == 0 {
return Err(QueueResult::Failure);
}
let head = Self::get_head(node_index);
let tail = Self::get_tail(node_index);
if head == 0 {
return Err(QueueResult::Failure);
}
let new_index = if head == tail {
// Last item in queue - make queue empty
0
} else {
// Get the previous node to become new head
loop {
let previous = self.nodes[(head - 1) as usize].previous.load(Ordering::Acquire);
if previous != 0 {
break Self::pack_indices(tail, previous);
}
// Spin waiting for push to set the previous pointer
std::hint::spin_loop();
}
};
// Try to update the queue indices
match self.node_index.compare_exchange_weak(
node_index,
new_index,
Ordering::AcqRel,
Ordering::Acquire
) {
Ok(_) => break head,
Err(_) => continue,
}
};
// Extract the item from the popped node
// SAFETY: We now own this node and it contains a valid item
let item = unsafe {
(*self.nodes[(popped_index - 1) as usize].item.get()).as_ptr().read()
};
// Reset the previous pointer
self.nodes[(popped_index - 1) as usize].previous.store(0, Ordering::Release);
// Return the node to the free chain
loop {
let free_index = self.free_index.load(Ordering::Acquire);
self.nodes[(popped_index - 1) as usize].next.store(free_index, Ordering::Release);
match self.free_index.compare_exchange_weak(
free_index,
popped_index,
Ordering::AcqRel,
Ordering::Acquire
) {
Ok(_) => break,
Err(_) => continue,
}
}
Ok(item)
}
/// destroy a node from the free chain (used for queue resizing)
pub fn destroy_node(&self) -> QueueResult {
loop {
let free_index = self.free_index.load(Ordering::Acquire);
if free_index == 0 {
return QueueResult::Failure;
}
let next_free = self.nodes[(free_index - 1) as usize].next.load(Ordering::Acquire);
match self.free_index.compare_exchange_weak(
free_index,
next_free,
Ordering::AcqRel,
Ordering::Acquire
) {
Ok(_) => return QueueResult::Success,
Err(_) => continue,
}
}
}
/// pop an item and destroy the node (don't return it to free chain)
pub fn pop_destroyed(&self) -> Result<T, QueueResult> {
// Same as pop but without returning node to free chain
let popped_index = loop {
let node_index = self.node_index.load(Ordering::Acquire);
if node_index == 0 {
return Err(QueueResult::Failure);
}
let head = Self::get_head(node_index);
let tail = Self::get_tail(node_index);
if head == 0 {
return Err(QueueResult::Failure);
}
let new_index = if head == tail {
0
} else {
loop {
let previous = self.nodes[(head - 1) as usize].previous.load(Ordering::Acquire);
if previous != 0 {
break Self::pack_indices(tail, previous);
}
std::hint::spin_loop();
}
};
match self.node_index.compare_exchange_weak(
node_index,
new_index,
Ordering::AcqRel,
Ordering::Acquire
) {
Ok(_) => break head,
Err(_) => continue,
}
};
// Extract the item
let item = unsafe {
(*self.nodes[(popped_index - 1) as usize].item.get()).as_ptr().read()
};
// reset previous pointer but don't return to free chain
self.nodes[(popped_index - 1) as usize].previous.store(0, Ordering::Release);
Ok(item)
}
pub fn is_empty(&self) -> bool {
self.node_index.load(Ordering::Acquire) == 0
}
}
unsafe impl<T: Send> Send for ConcurrentQueue<T> {}
unsafe impl<T: Send> Sync for ConcurrentQueue<T> {}
impl<T> Drop for ConcurrentQueue<T> {
fn drop(&mut self) {
while let Ok(_) = self.pop() {
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment