Lock free programming.

Producer Consumer with Atomic Flag

use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::thread;

fn main() {
    let ref_count = Arc::new(AtomicUsize::new(0));
    let mut handles = vec![];

    for i in 0..5 {
        let rc = Arc::clone(&ref_count);
        handles.push(thread::spawn(move || {
            let prev = rc.fetch_add(1, Ordering::Relaxed);
            println!("Thread {} incremented count to {}", i, prev + 1);
        }));
    }

    for handle in handles {
        handle.join().unwrap();
    }

    println!("Final reference count: {}", ref_count.load(Ordering::Relaxed));
} 

Atomic Reference Counting

use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::thread;

fn main() {
    let ref_count = Arc::new(AtomicUsize::new(0));
    let mut handles = vec![];

    for i in 0..5 {
        let rc = Arc::clone(&ref_count);
        handles.push(thread::spawn(move || {
            let prev = rc.fetch_add(1, Ordering::Relaxed);
            println!("Thread {} incremented count to {}", i, prev + 1);
        }));
    }

    for handle in handles {
        handle.join().unwrap();
    }

    println!("Final reference count: {}", ref_count.load(Ordering::Relaxed));
}

Multi writer atomic counter


use std::sync::atomic::{AtomicI32, Ordering};
use std::sync::Arc;
use std::thread;
use std::time::Duration;


fn main() {
    let counter = Arc::new(AtomicI32::new(0));
    let mut writers = vec![];

    // Create 5 writer threads
    for i in 0..5 {
        let cnt = Arc::clone(&counter);
        writers.push(thread::spawn(move || {
            for _ in 0..1000 {
                cnt.fetch_add(1, Ordering::Relaxed);
            }
            println!("Writer {} finished", i);
        }));
    }

    // Create reader thread
    let reader_cnt = Arc::clone(&counter);
    let reader = thread::spawn(move || {
        while reader_cnt.load(Ordering::Acquire) < 4000 {
            thread::sleep(Duration::from_millis(10));
        }
        println!("Reader detected counter >= 4000");
    });

    for writer in writers {
        writer.join().unwrap();
    }
    reader.join().unwrap();

    println!("Final counter: {}", counter.load(Ordering::Relaxed));
}

Lock free singletone initilization

use std::sync::atomic::{AtomicPtr, Ordering};
use std::sync::Arc;
use std::thread;

struct Singleton {
    data: String,
}

impl Singleton {
    fn new() -> Self {
        Singleton {
            data: "Initialized".to_string(),
        }
    }
}

fn main() {
    let singleton_ptr = Arc::new(AtomicPtr::<Singleton>::new(std::ptr::null_mut()));
    let mut handles = vec![];

    for i in 0..3 {
        let ptr = Arc::clone(&singleton_ptr);
        handles.push(thread::spawn(move || {
            let mut instance = Box::new(Singleton::new());
            instance.data = format!("Thread {}'s instance", i);
            
            match ptr.compare_exchange(
                std::ptr::null_mut(),
                Box::into_raw(instance),
                Ordering::AcqRel,
                Ordering::Acquire
            ) {
                Ok(_) => println!("Thread {} initialized singleton", i),
                Err(_) => println!("Thread {} found already initialized", i),
            }
        }));
    }

    for handle in handles {
        handle.join().unwrap();
    }

    // Cleanup (in real code, you'd need proper memory management)
    let ptr = singleton_ptr.load(Ordering::Acquire);
    if !ptr.is_null() {
        unsafe { drop(Box::from_raw(ptr)); }
    }
}

Produce consumer with atomic flag


use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::thread;
use std::time::Duration;

fn main() {
    let data_ready = Arc::new(AtomicBool::new(false));
    let data_ready_consumer = Arc::clone(&data_ready);
    
    // Producer thread
    let producer = thread::spawn(move || {
        println!("[Producer] Preparing data...");
        thread::sleep(Duration::from_secs(1));
        data_ready.store(true, Ordering::Release);
        println!("[Producer] Data ready!");
    });

    // Consumer thread
    let consumer = thread::spawn(move || {
        println!("[Consumer] Waiting for data...");
        while !data_ready_consumer.load(Ordering::Acquire) {
            thread::sleep(Duration::from_millis(100));
        }
        println!("[Consumer] Processing data!");
    });

    producer.join().unwrap();
    consumer.join().unwrap();
}

Producer Consumer Block


use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::thread;
use std::time::Duration;

fn main() {
    let data_ready = Arc::new(AtomicBool::new(false));
    let data_ready_consumer = Arc::clone(&data_ready);

    // Producer thread
    let producer = thread::spawn(move || {
        println!("[Producer] Preparing data...");
        thread::sleep(Duration::from_secs(1));
        data_ready.store(true, Ordering::Release);
        println!("[Producer] Data ready!");
    });

    // Consumer thread
    let consumer = thread::spawn(move || {
        println!("[Consumer] Waiting for data...");
        while !data_ready_consumer.load(Ordering::Acquire) {
            thread::sleep(Duration::from_millis(100));
        }
        println!("[Consumer] Processing data!");
    });

    producer.join().unwrap();
    consumer.join().unwrap();
}

Spinlock


use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::thread;

struct Spinlock {
    locked: AtomicBool,
}

impl Spinlock {
    fn new() -> Arc<Self> {
        Arc::new(Spinlock {
            locked: AtomicBool::new(false),
        })
    }
    
    fn lock(&self) {
        while self.locked.compare_exchange_weak(
            false,
            true,
            Ordering::Acquire,
            Ordering::Relaxed
        ).is_err() {
            std::hint::spin_loop();
        }
    }
    
    fn unlock(&self) {
        self.locked.store(false, Ordering::Release);
    }
}

fn main() {
    let lock = Spinlock::new();
    let mut handles = vec![];
    
    for i in 0..5 {
        let lock = Arc::clone(&lock);
        handles.push(thread::spawn(move || {
            lock.lock();
            println!("Thread {} acquired lock", i);
            thread::sleep(std::time::Duration::from_millis(100));
            println!("Thread {} releasing lock", i);
            lock.unlock();
        }));
    }
    
    for handle in handles {
        handle.join().unwrap();
    }
}

CAS Operation


use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::thread;

fn main() {
    let shared_val = Arc::new(AtomicUsize::new(0));
    let mut handles = vec![];
    
    for i in 0..5 {
        let shared_val = Arc::clone(&shared_val);
        handles.push(thread::spawn(move || {
            let mut success = false;
            while !success {
                let current = shared_val.load(Ordering::Acquire);
                let new = current + 1;
                success = shared_val.compare_exchange(
                    current,
                    new,
                    Ordering::Release,
                    Ordering::Relaxed
                ).is_ok();
                println!("Thread {}: CAS {} -> {}: {}",
                    i, current, new, if success { "success" } else { "retry" });
            }
        }));
    }
    
    for handle in handles {
        handle.join().unwrap();
    }
    
    println!("Final value: {}", shared_val.load(Ordering::Relaxed));
}

Example: Atomic Fetch-and-Add (Counter)


use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::thread;

fn main() {
    let counter = Arc::new(AtomicUsize::new(0));
    let mut handles = vec![];
    
    for _ in 0..10 {
        let counter = Arc::clone(&counter);
        handles.push(thread::spawn(move || {
            for _ in 0..100 {
                counter.fetch_add(1, Ordering::Relaxed);
            }
        }));
    }
    
    for handle in handles {
        handle.join().unwrap();
    }
    
    println!("Final counter value: {}", counter.load(Ordering::Relaxed));
}

Example: Basic Atomic Load and Store


use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::thread;

fn main() {
    let flag = Arc::new(AtomicBool::new(false));
    let flag2 = Arc::clone(&flag);

    let handle1 = thread::spawn(move || {
        flag.store(true, Ordering::Release);
        println!("Thread 1 set flag to true");
    });

    let handle2 = thread::spawn(move || {
        while !flag2.load(Ordering::Acquire) {
            // Optional: Improve spin-wait performance
            std::hint::spin_loop();
        }
        println!("Thread 2 detected flag is true");
    });

    handle1.join().unwrap();
    handle2.join().unwrap();
}


//without using Arc less flexible

use std::sync::atomic::{AtomicBool, Ordering};
use std::thread;

fn main() {
    let flag = AtomicBool::new(false);

    let handle1 = thread::spawn({
        let flag = &flag;
        move || {
            flag.store(true, Ordering::Release);
            println!("Thread 1 set flag to true");
        }
    });

    let handle2 = thread::spawn({
        let flag = &flag;
        move || {
            while !flag.load(Ordering::Acquire) {}
            println!("Thread 2 detected flag is true");
        }
    });

    handle1.join().unwrap();
    handle2.join().unwrap();
}

Advanced Concepts

Yes, the examples provided are excellent for understanding basic lock-free programming in Rust! They cover fundamental atomic operations, common patterns, and important safety considerations. Here’s why they’re sufficient for basics and what you might explore next:


Why These Examples Are Good for Basics

  1. Core Atomic Operations:

    • Demonstrated load, store, compare_exchange (CAS), fetch_add, etc.
    • Covered all major memory orderings (Relaxed, Acquire/Release, SeqCst).
  2. Practical Patterns:

    • Spinlocks, atomic counters, singletons, producer-consumer.
    • Proper use of Arc for thread-safe sharing.
  3. Safety Awareness:

    • Highlighted unsafe requirements for raw pointers.
    • Emphasized memory ordering and dangling pointer risks.
  4. Gradual Complexity:

    • Started with simple flags, progressed to lock-free stacks and state machines.

Where to Go Next

If you’ve mastered these, consider exploring:

1. More Advanced Lock-Free Structures

  • Lock-free queues (e.g., Michael-Scott or MPMC designs).
  • Hazard pointers for safe memory reclamation.
  • RCU (Read-Copy-Update) patterns.

2. Real-World Crates

3. Deep Dives

  • Memory Models: Understand x86/ARM differences.
  • Benchmarking: Measure contention and throughput.
  • Formal Verification: Tools like loom for testing concurrency.

Example: Next-Step Challenge (MPSC Queue)

Here’s a teaser for a multi-producer, single-consumer queue (more advanced but buildable after mastering the basics):

#![allow(unused)]
fn main() {
use std::sync::atomic::{AtomicPtr, Ordering};
use std::ptr;

struct Node<T> {
    value: Option<T>,
    next: AtomicPtr<Node<T>>,
}

pub struct Queue<T> {
    head: AtomicPtr<Node<T>>,
    tail: AtomicPtr<Node<T>>,
}

impl<T> Queue<T> {
    pub fn new() -> Self {
        let dummy = Box::into_raw(Box::new(Node {
            value: None,
            next: AtomicPtr::new(ptr::null_mut()),
        }));
        Queue {
            head: AtomicPtr::new(dummy),
            tail: AtomicPtr::new(dummy),
        }
    }

    pub fn push(&self, value: T) {
        let new_node = Box::into_raw(Box::new(Node {
            value: Some(value),
            next: AtomicPtr::new(ptr::null_mut()),
        }));

        loop {
            let tail = self.tail.load(Ordering::Acquire);
            let next = unsafe { (*tail).next.load(Ordering::Acquire) };

            if next.is_null() {
                if unsafe { (*tail).next.compare_exchange(
                    ptr::null_mut(),
                    new_node,
                    Ordering::AcqRel,
                    Ordering::Relaxed
                ).is_ok() } {
                    self.tail.compare_exchange(
                        tail,
                        new_node,
                        Ordering::AcqRel,
                        Ordering::Relaxed
                    ).ok(); // Ignore failure
                    break;
                }
            } else {
                self.tail.compare_exchange(
                    tail,
                    next,
                    Ordering::AcqRel,
                    Ordering::Relaxed
                ).ok(); // Help other threads
            }
        }
    }

    pub fn pop(&self) -> Option<T> {
        loop {
            let head = self.head.load(Ordering::Acquire);
            let next = unsafe { (*head).next.load(Ordering::Acquire) };

            if !next.is_null() {
                if self.head.compare_exchange(
                    head,
                    next,
                    Ordering::AcqRel,
                    Ordering::Relaxed
                ).is_ok() {
                    let next_node = unsafe { Box::from_raw(next) };
                    return next_node.value;
                }
            } else {
                return None;
            }
        }
    }
}
}

Final Advice

  • Master the basics first: Ensure you’re comfortable with all the initial examples.
  • Read widely: Study implementations in crossbeam or the Linux kernel.
  • Write tests: Concurrency bugs are subtle—use loom or randomized testing.

These examples give you a rock-solid foundation—now go build something awesome! 🚀