Skip to content

Instantly share code, notes, and snippets.

@RandyMcMillan
Forked from rust-play/playground.rs
Last active June 20, 2026 13:25
Show Gist options
  • Select an option

  • Save RandyMcMillan/299050754a3a263c14b03ff3528c1998 to your computer and use it in GitHub Desktop.

Select an option

Save RandyMcMillan/299050754a3a263c14b03ff3528c1998 to your computer and use it in GitHub Desktop.
ring_buffer.rs
use crossbeam_utils::CachePadded;
use std::cell::UnsafeCell;
use std::mem::MaybeUninit;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::thread;
pub struct RingBuffer<T, const N: usize> {
pub head: CachePadded<AtomicUsize>,
pub tail: CachePadded<AtomicUsize>,
buffer: UnsafeCell<[MaybeUninit<T>; N]>,
}
// Explicitly implement Sync because UnsafeCell inhibits it by default.
// This is safe because we will manage synchronization via head and tail atomics.
unsafe impl<T: Send, const N: usize> Sync for RingBuffer<T, N> {}
impl<T, const N: usize> RingBuffer<T, N> {
pub fn new() -> Self {
// Ensure N is a power of two for efficient masking,
// or ensure N > 0 to avoid division by zero.
assert!(N > 0, "Buffer capacity must be greater than 0");
// Initialize the UnsafeCell with uninitialized memory safely
let buffer = unsafe { MaybeUninit::uninit().assume_init() };
Self {
head: CachePadded::new(AtomicUsize::new(0)),
tail: CachePadded::new(AtomicUsize::new(0)),
buffer: UnsafeCell::new(buffer),
}
}
/// Enqueues an item into the buffer. Returns `Err(item)` if full.
pub fn push(&self, item: T) -> Result<(), T> {
let head = self.head.load(Ordering::Relaxed);
let tail = self.tail.load(Ordering::Acquire);
// Check if the buffer is full
if tail - head == N {
return Err(item);
}
// Write the item into the buffer slot
let index = tail % N;
unsafe {
let buffer_ptr = self.buffer.get() as *mut MaybeUninit<T>;
buffer_ptr.add(index).write(MaybeUninit::new(item));
}
// Release the item to the consumer
self.tail.store(tail + 1, Ordering::Release);
Ok(())
}
/// Dequeues an item from the buffer. Returns `None` if empty.
pub fn pop(&self) -> Option<T> {
let head = self.head.load(Ordering::Acquire);
let tail = self.tail.load(Ordering::Relaxed);
// Check if the buffer is empty
if head == tail {
return None;
}
// Read the item from the buffer slot
let index = head % N;
let item = unsafe {
let buffer_ptr = self.buffer.get() as *const MaybeUninit<T>;
buffer_ptr.add(index).read().assume_init()
};
// Release the slot back to the producer
self.head.store(head + 1, Ordering::Release);
Some(item)
}
}
// Clean up any remaining elements left in the buffer when it goes out of scope
impl<T, const N: usize> Drop for RingBuffer<T, N> {
fn drop(&mut self) {
while self.pop().is_some() {}
}
}
fn main() {
// Wrap the ring buffer in an Arc to share it safely between threads
let ring_buffer = Arc::new(RingBuffer::<i32, 8>::new());
let producer_buffer = Arc::clone(&ring_buffer);
let producer = thread::spawn(move || {
for i in 1..=20 {
// Spin-lock retry loop if the buffer happens to be full
while let Err(val) = producer_buffer.push(i) {
print!("[Producer] Buffer full! Retrying item: {}\n", val);
thread::sleep(std::time::Duration::from_millis(5));
}
println!("[Producer] Pushed: {}", i);
thread::sleep(std::time::Duration::from_millis(2));
}
});
let consumer_buffer = Arc::clone(&ring_buffer);
let consumer = thread::spawn(move || {
let mut count = 0;
while count < 20 {
if let Some(val) = consumer_buffer.pop() {
println!("[Consumer] Popped: {}", val);
count += 1;
} else {
// Spin-lock retry loop if the buffer happens to be empty
thread::sleep(std::time::Duration::from_millis(4));
}
}
});
producer.join().unwrap();
consumer.join().unwrap();
println!("Data processing complete!");
}
@RandyMcMillan

Copy link
Copy Markdown
Author

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment