Lock free programming.
Producer Consumer with Atomic Flag
use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; use std::thread; fn main() { let ref_count = Arc::new(AtomicUsize::new(0)); let mut handles = vec![]; for i in 0..5 { let rc = Arc::clone(&ref_count); handles.push(thread::spawn(move || { let prev = rc.fetch_add(1, Ordering::Relaxed); println!("Thread {} incremented count to {}", i, prev + 1); })); } for handle in handles { handle.join().unwrap(); } println!("Final reference count: {}", ref_count.load(Ordering::Relaxed)); }
Atomic Reference Counting
use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; use std::thread; fn main() { let ref_count = Arc::new(AtomicUsize::new(0)); let mut handles = vec![]; for i in 0..5 { let rc = Arc::clone(&ref_count); handles.push(thread::spawn(move || { let prev = rc.fetch_add(1, Ordering::Relaxed); println!("Thread {} incremented count to {}", i, prev + 1); })); } for handle in handles { handle.join().unwrap(); } println!("Final reference count: {}", ref_count.load(Ordering::Relaxed)); }
Multi writer atomic counter
use std::sync::atomic::{AtomicI32, Ordering}; use std::sync::Arc; use std::thread; use std::time::Duration; fn main() { let counter = Arc::new(AtomicI32::new(0)); let mut writers = vec![]; // Create 5 writer threads for i in 0..5 { let cnt = Arc::clone(&counter); writers.push(thread::spawn(move || { for _ in 0..1000 { cnt.fetch_add(1, Ordering::Relaxed); } println!("Writer {} finished", i); })); } // Create reader thread let reader_cnt = Arc::clone(&counter); let reader = thread::spawn(move || { while reader_cnt.load(Ordering::Acquire) < 4000 { thread::sleep(Duration::from_millis(10)); } println!("Reader detected counter >= 4000"); }); for writer in writers { writer.join().unwrap(); } reader.join().unwrap(); println!("Final counter: {}", counter.load(Ordering::Relaxed)); }
Lock free singletone initilization
use std::sync::atomic::{AtomicPtr, Ordering}; use std::sync::Arc; use std::thread; struct Singleton { data: String, } impl Singleton { fn new() -> Self { Singleton { data: "Initialized".to_string(), } } } fn main() { let singleton_ptr = Arc::new(AtomicPtr::<Singleton>::new(std::ptr::null_mut())); let mut handles = vec![]; for i in 0..3 { let ptr = Arc::clone(&singleton_ptr); handles.push(thread::spawn(move || { let mut instance = Box::new(Singleton::new()); instance.data = format!("Thread {}'s instance", i); match ptr.compare_exchange( std::ptr::null_mut(), Box::into_raw(instance), Ordering::AcqRel, Ordering::Acquire ) { Ok(_) => println!("Thread {} initialized singleton", i), Err(_) => println!("Thread {} found already initialized", i), } })); } for handle in handles { handle.join().unwrap(); } // Cleanup (in real code, you'd need proper memory management) let ptr = singleton_ptr.load(Ordering::Acquire); if !ptr.is_null() { unsafe { drop(Box::from_raw(ptr)); } } }
Produce consumer with atomic flag
use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::thread; use std::time::Duration; fn main() { let data_ready = Arc::new(AtomicBool::new(false)); let data_ready_consumer = Arc::clone(&data_ready); // Producer thread let producer = thread::spawn(move || { println!("[Producer] Preparing data..."); thread::sleep(Duration::from_secs(1)); data_ready.store(true, Ordering::Release); println!("[Producer] Data ready!"); }); // Consumer thread let consumer = thread::spawn(move || { println!("[Consumer] Waiting for data..."); while !data_ready_consumer.load(Ordering::Acquire) { thread::sleep(Duration::from_millis(100)); } println!("[Consumer] Processing data!"); }); producer.join().unwrap(); consumer.join().unwrap(); }
Producer Consumer Block
use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::thread; use std::time::Duration; fn main() { let data_ready = Arc::new(AtomicBool::new(false)); let data_ready_consumer = Arc::clone(&data_ready); // Producer thread let producer = thread::spawn(move || { println!("[Producer] Preparing data..."); thread::sleep(Duration::from_secs(1)); data_ready.store(true, Ordering::Release); println!("[Producer] Data ready!"); }); // Consumer thread let consumer = thread::spawn(move || { println!("[Consumer] Waiting for data..."); while !data_ready_consumer.load(Ordering::Acquire) { thread::sleep(Duration::from_millis(100)); } println!("[Consumer] Processing data!"); }); producer.join().unwrap(); consumer.join().unwrap(); }
Spinlock
use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::thread; struct Spinlock { locked: AtomicBool, } impl Spinlock { fn new() -> Arc<Self> { Arc::new(Spinlock { locked: AtomicBool::new(false), }) } fn lock(&self) { while self.locked.compare_exchange_weak( false, true, Ordering::Acquire, Ordering::Relaxed ).is_err() { std::hint::spin_loop(); } } fn unlock(&self) { self.locked.store(false, Ordering::Release); } } fn main() { let lock = Spinlock::new(); let mut handles = vec![]; for i in 0..5 { let lock = Arc::clone(&lock); handles.push(thread::spawn(move || { lock.lock(); println!("Thread {} acquired lock", i); thread::sleep(std::time::Duration::from_millis(100)); println!("Thread {} releasing lock", i); lock.unlock(); })); } for handle in handles { handle.join().unwrap(); } }
CAS Operation
use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; use std::thread; fn main() { let shared_val = Arc::new(AtomicUsize::new(0)); let mut handles = vec![]; for i in 0..5 { let shared_val = Arc::clone(&shared_val); handles.push(thread::spawn(move || { let mut success = false; while !success { let current = shared_val.load(Ordering::Acquire); let new = current + 1; success = shared_val.compare_exchange( current, new, Ordering::Release, Ordering::Relaxed ).is_ok(); println!("Thread {}: CAS {} -> {}: {}", i, current, new, if success { "success" } else { "retry" }); } })); } for handle in handles { handle.join().unwrap(); } println!("Final value: {}", shared_val.load(Ordering::Relaxed)); }
Example: Atomic Fetch-and-Add (Counter)
use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; use std::thread; fn main() { let counter = Arc::new(AtomicUsize::new(0)); let mut handles = vec![]; for _ in 0..10 { let counter = Arc::clone(&counter); handles.push(thread::spawn(move || { for _ in 0..100 { counter.fetch_add(1, Ordering::Relaxed); } })); } for handle in handles { handle.join().unwrap(); } println!("Final counter value: {}", counter.load(Ordering::Relaxed)); }
Example: Basic Atomic Load and Store
use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::thread; fn main() { let flag = Arc::new(AtomicBool::new(false)); let flag2 = Arc::clone(&flag); let handle1 = thread::spawn(move || { flag.store(true, Ordering::Release); println!("Thread 1 set flag to true"); }); let handle2 = thread::spawn(move || { while !flag2.load(Ordering::Acquire) { // Optional: Improve spin-wait performance std::hint::spin_loop(); } println!("Thread 2 detected flag is true"); }); handle1.join().unwrap(); handle2.join().unwrap(); }
//without using Arc less flexible use std::sync::atomic::{AtomicBool, Ordering}; use std::thread; fn main() { let flag = AtomicBool::new(false); let handle1 = thread::spawn({ let flag = &flag; move || { flag.store(true, Ordering::Release); println!("Thread 1 set flag to true"); } }); let handle2 = thread::spawn({ let flag = &flag; move || { while !flag.load(Ordering::Acquire) {} println!("Thread 2 detected flag is true"); } }); handle1.join().unwrap(); handle2.join().unwrap(); }
Advanced Concepts
Yes, the examples provided are excellent for understanding basic lock-free programming in Rust! They cover fundamental atomic operations, common patterns, and important safety considerations. Here’s why they’re sufficient for basics and what you might explore next:
Why These Examples Are Good for Basics
-
Core Atomic Operations:
- Demonstrated
load,store,compare_exchange(CAS),fetch_add, etc. - Covered all major memory orderings (
Relaxed,Acquire/Release,SeqCst).
- Demonstrated
-
Practical Patterns:
- Spinlocks, atomic counters, singletons, producer-consumer.
- Proper use of
Arcfor thread-safe sharing.
-
Safety Awareness:
- Highlighted
unsaferequirements for raw pointers. - Emphasized memory ordering and dangling pointer risks.
- Highlighted
-
Gradual Complexity:
- Started with simple flags, progressed to lock-free stacks and state machines.
Where to Go Next
If you’ve mastered these, consider exploring:
1. More Advanced Lock-Free Structures
- Lock-free queues (e.g., Michael-Scott or MPMC designs).
- Hazard pointers for safe memory reclamation.
- RCU (Read-Copy-Update) patterns.
2. Real-World Crates
crossbeam: Production-grade lock-free data structures.parking_lot: Optimized synchronization primitives.atomic-rs: Extended atomic types.
3. Deep Dives
- Memory Models: Understand x86/ARM differences.
- Benchmarking: Measure contention and throughput.
- Formal Verification: Tools like
loomfor testing concurrency.
Example: Next-Step Challenge (MPSC Queue)
Here’s a teaser for a multi-producer, single-consumer queue (more advanced but buildable after mastering the basics):
#![allow(unused)] fn main() { use std::sync::atomic::{AtomicPtr, Ordering}; use std::ptr; struct Node<T> { value: Option<T>, next: AtomicPtr<Node<T>>, } pub struct Queue<T> { head: AtomicPtr<Node<T>>, tail: AtomicPtr<Node<T>>, } impl<T> Queue<T> { pub fn new() -> Self { let dummy = Box::into_raw(Box::new(Node { value: None, next: AtomicPtr::new(ptr::null_mut()), })); Queue { head: AtomicPtr::new(dummy), tail: AtomicPtr::new(dummy), } } pub fn push(&self, value: T) { let new_node = Box::into_raw(Box::new(Node { value: Some(value), next: AtomicPtr::new(ptr::null_mut()), })); loop { let tail = self.tail.load(Ordering::Acquire); let next = unsafe { (*tail).next.load(Ordering::Acquire) }; if next.is_null() { if unsafe { (*tail).next.compare_exchange( ptr::null_mut(), new_node, Ordering::AcqRel, Ordering::Relaxed ).is_ok() } { self.tail.compare_exchange( tail, new_node, Ordering::AcqRel, Ordering::Relaxed ).ok(); // Ignore failure break; } } else { self.tail.compare_exchange( tail, next, Ordering::AcqRel, Ordering::Relaxed ).ok(); // Help other threads } } } pub fn pop(&self) -> Option<T> { loop { let head = self.head.load(Ordering::Acquire); let next = unsafe { (*head).next.load(Ordering::Acquire) }; if !next.is_null() { if self.head.compare_exchange( head, next, Ordering::AcqRel, Ordering::Relaxed ).is_ok() { let next_node = unsafe { Box::from_raw(next) }; return next_node.value; } } else { return None; } } } } }
Final Advice
- Master the basics first: Ensure you’re comfortable with all the initial examples.
- Read widely: Study implementations in
crossbeamor the Linux kernel. - Write tests: Concurrency bugs are subtle—use
loomor randomized testing.
These examples give you a rock-solid foundation—now go build something awesome! 🚀