-
-
Save RandyMcMillan/299050754a3a263c14b03ff3528c1998 to your computer and use it in GitHub Desktop.
ring_buffer.rs
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| use crossbeam_utils::CachePadded; | |
| use std::cell::UnsafeCell; | |
| use std::mem::MaybeUninit; | |
| use std::sync::atomic::{AtomicUsize, Ordering}; | |
| use std::sync::Arc; | |
| use std::thread; | |
| pub struct RingBuffer<T, const N: usize> { | |
| pub head: CachePadded<AtomicUsize>, | |
| pub tail: CachePadded<AtomicUsize>, | |
| buffer: UnsafeCell<[MaybeUninit<T>; N]>, | |
| } | |
| // Explicitly implement Sync because UnsafeCell inhibits it by default. | |
| // This is safe because we will manage synchronization via head and tail atomics. | |
| unsafe impl<T: Send, const N: usize> Sync for RingBuffer<T, N> {} | |
| impl<T, const N: usize> RingBuffer<T, N> { | |
| pub fn new() -> Self { | |
| // Ensure N is a power of two for efficient masking, | |
| // or ensure N > 0 to avoid division by zero. | |
| assert!(N > 0, "Buffer capacity must be greater than 0"); | |
| // Initialize the UnsafeCell with uninitialized memory safely | |
| let buffer = unsafe { MaybeUninit::uninit().assume_init() }; | |
| Self { | |
| head: CachePadded::new(AtomicUsize::new(0)), | |
| tail: CachePadded::new(AtomicUsize::new(0)), | |
| buffer: UnsafeCell::new(buffer), | |
| } | |
| } | |
| /// Enqueues an item into the buffer. Returns `Err(item)` if full. | |
| pub fn push(&self, item: T) -> Result<(), T> { | |
| let head = self.head.load(Ordering::Relaxed); | |
| let tail = self.tail.load(Ordering::Acquire); | |
| // Check if the buffer is full | |
| if tail - head == N { | |
| return Err(item); | |
| } | |
| // Write the item into the buffer slot | |
| let index = tail % N; | |
| unsafe { | |
| let buffer_ptr = self.buffer.get() as *mut MaybeUninit<T>; | |
| buffer_ptr.add(index).write(MaybeUninit::new(item)); | |
| } | |
| // Release the item to the consumer | |
| self.tail.store(tail + 1, Ordering::Release); | |
| Ok(()) | |
| } | |
| /// Dequeues an item from the buffer. Returns `None` if empty. | |
| pub fn pop(&self) -> Option<T> { | |
| let head = self.head.load(Ordering::Acquire); | |
| let tail = self.tail.load(Ordering::Relaxed); | |
| // Check if the buffer is empty | |
| if head == tail { | |
| return None; | |
| } | |
| // Read the item from the buffer slot | |
| let index = head % N; | |
| let item = unsafe { | |
| let buffer_ptr = self.buffer.get() as *const MaybeUninit<T>; | |
| buffer_ptr.add(index).read().assume_init() | |
| }; | |
| // Release the slot back to the producer | |
| self.head.store(head + 1, Ordering::Release); | |
| Some(item) | |
| } | |
| } | |
| // Clean up any remaining elements left in the buffer when it goes out of scope | |
| impl<T, const N: usize> Drop for RingBuffer<T, N> { | |
| fn drop(&mut self) { | |
| while self.pop().is_some() {} | |
| } | |
| } | |
| fn main() { | |
| // Wrap the ring buffer in an Arc to share it safely between threads | |
| let ring_buffer = Arc::new(RingBuffer::<i32, 8>::new()); | |
| let producer_buffer = Arc::clone(&ring_buffer); | |
| let producer = thread::spawn(move || { | |
| for i in 1..=20 { | |
| // Spin-lock retry loop if the buffer happens to be full | |
| while let Err(val) = producer_buffer.push(i) { | |
| print!("[Producer] Buffer full! Retrying item: {}\n", val); | |
| thread::sleep(std::time::Duration::from_millis(5)); | |
| } | |
| println!("[Producer] Pushed: {}", i); | |
| thread::sleep(std::time::Duration::from_millis(2)); | |
| } | |
| }); | |
| let consumer_buffer = Arc::clone(&ring_buffer); | |
| let consumer = thread::spawn(move || { | |
| let mut count = 0; | |
| while count < 20 { | |
| if let Some(val) = consumer_buffer.pop() { | |
| println!("[Consumer] Popped: {}", val); | |
| count += 1; | |
| } else { | |
| // Spin-lock retry loop if the buffer happens to be empty | |
| thread::sleep(std::time::Duration::from_millis(4)); | |
| } | |
| } | |
| }); | |
| producer.join().unwrap(); | |
| consumer.join().unwrap(); | |
| println!("Data processing complete!"); | |
| } |
Author
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
https://play.rust-lang.org/?version=stable&mode=debug&edition=2024&gist=4ed48d9fe3488291d2e329e8e1f36893