Thread ultimate problem with all the apis.

Thread APIs Used in the Program

Here's a comprehensive breakdown of all the thread APIs demonstrated in the code:

Thread Creation and Management

  1. thread::scope

    • Creates a scope in which threads can borrow non-static data from the enclosing scope
    • Automatically joins all threads when the scope ends
    • Example: thread::scope(|scope| { ... })
  2. scope.spawn

    • Spawns a scoped thread that can access data from the parent scope
    • Returns a ScopedJoinHandle
    • Example: handles.push(scope.spawn(|| { ... }))
  3. thread::Builder

    • Provides more configuration options for thread creation
    • Example: thread::Builder::new().name(format!("Worker-{}", i))
  4. Builder::name

    • Sets a name for the thread being created
    • Example: .name(format!("Worker-{}", i))
  5. Builder::spawn_scoped

    • Creates a configured thread within a scope
    • Example: .spawn_scoped(scope, move || { ... })

Thread Identification

  1. thread::current

    • Returns a handle to the current thread
    • Example: let thread = thread::current()
  2. Thread::id

    • Gets the ID of a thread, which is a unique identifier
    • Example: thread.id()
  3. Thread::name

    • Gets the name of a thread
    • Example: thread.name()

Thread Synchronization

  1. thread::park_timeout

    • Blocks the current thread for a specified duration or until unparked
    • Example: thread::park_timeout(Duration::from_millis(10))
  2. Thread::unpark (indirectly used through coordination)

    • Unblocks a previously parked thread
    • In our implementation, we coordinate through atomic variables instead
  3. thread::yield_now

    • Hints to the scheduler to let other threads run
    • Example: thread::yield_now()
  4. thread::sleep

    • Blocks the current thread for a specified duration
    • Example: thread::sleep(Duration::from_millis(500))

Thread Handles

  1. JoinHandle::is_finished

    • Checks if a thread has completed execution without blocking
    • Example: handle.is_finished()
  2. JoinHandle::join

    • Waits for a thread to finish execution
    • Example: handle.join()
  3. JoinHandle::thread

    • Returns an optional reference to the underlying thread
    • Example: handle.thread()

Thread-Local Storage

  1. thread_local!

    • Declares a thread-local variable
    • Example: thread_local! { static OPERATIONS_COMPLETED: std::cell::Cell<usize> = std::cell::Cell::new(0); }
  2. LocalKey::with

    • Accesses a thread-local variable
    • Example: OPERATIONS_COMPLETED.with(|ops| { ops.set(ops.get() + 1); })

Each of these APIs plays a specific role in thread management, allowing for fine-grained control over thread behavior, synchronization, and data sharing, while the program demonstrates how to build a complete multi-threaded application using atomic operations for synchronization rather than traditional locks.


use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};
use std::thread::{self, ThreadId};
use std::time::Duration;
use std::collections::HashMap;

// Thread-local storage for tracking operations within each thread
thread_local! {
    static OPERATIONS_COMPLETED: std::cell::Cell<usize> = std::cell::Cell::new(0);
}

fn main() {
    println!("Main thread ID: {:?}", thread::current().id());
    println!("Main thread name: {:?}", thread::current().name());
    
    // Create shared atomic counters
    let counter = Arc::new(AtomicUsize::new(0));
    let should_stop = Arc::new(AtomicBool::new(false));
    let all_threads_ready = Arc::new(AtomicUsize::new(0));
    let threads_completed = Arc::new(AtomicUsize::new(0));
    
    // Store thread IDs with their respective indexes
    let thread_id_map = Arc::new(Mutex::new(HashMap::<ThreadId, usize>::new()));
    
    // Use thread::scope for borrowing stack data
    thread::scope(|scope| {
        let mut handles = vec![];
        
        // Create a monitoring thread that reports progress
        {
            let counter = Arc::clone(&counter);
            let should_stop = Arc::clone(&should_stop);
            let threads_completed = Arc::clone(&threads_completed);
            
            handles.push(scope.spawn( move || {
                // Set name for the monitoring thread
                let thread = thread::current();
                println!("Monitor thread started: {:?} (ID: {:?})", thread.name(), thread.id());
                
                while !should_stop.load(Ordering::Relaxed) {
                    println!("Progress: {} operations, {} threads completed", 
                             counter.load(Ordering::Relaxed),
                             threads_completed.load(Ordering::Relaxed));
                    thread::sleep(Duration::from_millis(500));
                    thread::yield_now(); // Demonstrate yield_now
                }
                
                println!("Monitor thread finished");
            }));
        }
        
        // Create worker threads with IDs to track them
        let worker_threads = Arc::new(AtomicUsize::new(0));
        
        // Create multiple worker threads using Builder for more control
        for i in 0..5 {
            let counter = Arc::clone(&counter);
            let should_stop = Arc::clone(&should_stop);
            let all_threads_ready = Arc::clone(&all_threads_ready);
            let threads_completed = Arc::clone(&threads_completed);
            let thread_id_map = Arc::clone(&thread_id_map);
            let worker_threads = Arc::clone(&worker_threads);
            
            // Use Builder to configure thread before spawning
            let handle = thread::Builder::new()
                .name(format!("Worker-{}", i))
                .spawn_scoped(scope, move || {
                    let thread = thread::current();
                    println!("Worker thread started: {:?} (ID: {:?})", thread.name(), thread.id());
                    
                    // Store thread ID in the map
                    thread_id_map.lock().unwrap().insert(thread.id(), i);
                    
                    // Signal that this thread is ready
                    all_threads_ready.fetch_add(1, Ordering::SeqCst);
                    worker_threads.fetch_add(1, Ordering::SeqCst);
                    
                    // Wait until all threads are ready
                    while all_threads_ready.load(Ordering::SeqCst) < 5 {
                        thread::park_timeout(Duration::from_millis(10));
                    }
                    
                    // Perform work until signaled to stop
                    let mut local_ops = 0;
                    while !should_stop.load(Ordering::Relaxed) {
                        counter.fetch_add(1, Ordering::Relaxed);
                        local_ops += 1;
                        
                        // Store in thread-local storage
                        OPERATIONS_COMPLETED.with(|ops| {
                            ops.set(ops.get() + 1);
                        });
                        
                        // Sleep briefly to simulate work
                        if local_ops % 100 == 0 {
                            thread::sleep(Duration::from_micros(1));
                        }
                    }
                    
                    // Report final operations from thread-local storage
                    let final_ops = OPERATIONS_COMPLETED.with(|ops| ops.get());
                    println!("Thread {:?} completed {} operations locally", thread.name(), final_ops);
                    
                    // Signal that this thread has completed
                    threads_completed.fetch_add(1, Ordering::SeqCst);
                })
                .expect("Failed to spawn thread");
                
            handles.push(handle);
        }
        
        // Create a thread that will unpark other threads
        {
            // We can't clone ScopedJoinHandle, so we'll use a different approach
            let unparker = scope.spawn(move || {
                thread::sleep(Duration::from_millis(100));
                println!("Unparking worker threads...");
                
                // Wait until all worker threads are ready
                while worker_threads.load(Ordering::SeqCst) < 5 {
                    thread::sleep(Duration::from_millis(10));
                }
                
                // Signal all threads to wake up by changing the all_threads_ready counter
                all_threads_ready.store(5, Ordering::SeqCst);
                
                println!("All threads should now be unparked");
            });
            
            handles.push(unparker);
        }
        
        // Let the threads run for a while
        thread::sleep(Duration::from_secs(2));
        
        // Signal all threads to stop
        should_stop.store(true, Ordering::Relaxed);
        
        // Wait for all worker threads to finish
        println!("Waiting for all threads to complete...");
        
        // Check if threads are finished before joining
        for (i, handle) in handles.iter().enumerate() {
            match handle.is_finished() {
                true => println!("Thread {} already finished", i),
                false => println!("Thread {} still running", i),
            }
        }
        
        // Join all threads
        for handle in handles {
            if let Err(e) = handle.join() {
                println!("Error joining thread: {:?}", e);
            }
        }
    });
    
    println!("Final counter value: {}", counter.load(Ordering::Relaxed));
}