Thread ultimate problem with all the apis.
Thread APIs Used in the Program
Here's a comprehensive breakdown of all the thread APIs demonstrated in the code:
Thread Creation and Management
-
thread::scope- Creates a scope in which threads can borrow non-static data from the enclosing scope
- Automatically joins all threads when the scope ends
- Example:
thread::scope(|scope| { ... })
-
scope.spawn- Spawns a scoped thread that can access data from the parent scope
- Returns a
ScopedJoinHandle - Example:
handles.push(scope.spawn(|| { ... }))
-
thread::Builder- Provides more configuration options for thread creation
- Example:
thread::Builder::new().name(format!("Worker-{}", i))
-
Builder::name- Sets a name for the thread being created
- Example:
.name(format!("Worker-{}", i))
-
Builder::spawn_scoped- Creates a configured thread within a scope
- Example:
.spawn_scoped(scope, move || { ... })
Thread Identification
-
thread::current- Returns a handle to the current thread
- Example:
let thread = thread::current()
-
Thread::id- Gets the ID of a thread, which is a unique identifier
- Example:
thread.id()
-
Thread::name- Gets the name of a thread
- Example:
thread.name()
Thread Synchronization
-
thread::park_timeout- Blocks the current thread for a specified duration or until unparked
- Example:
thread::park_timeout(Duration::from_millis(10))
-
Thread::unpark(indirectly used through coordination)- Unblocks a previously parked thread
- In our implementation, we coordinate through atomic variables instead
-
thread::yield_now- Hints to the scheduler to let other threads run
- Example:
thread::yield_now()
-
thread::sleep- Blocks the current thread for a specified duration
- Example:
thread::sleep(Duration::from_millis(500))
Thread Handles
-
JoinHandle::is_finished- Checks if a thread has completed execution without blocking
- Example:
handle.is_finished()
-
JoinHandle::join- Waits for a thread to finish execution
- Example:
handle.join()
-
JoinHandle::thread- Returns an optional reference to the underlying thread
- Example:
handle.thread()
Thread-Local Storage
-
thread_local!- Declares a thread-local variable
- Example:
thread_local! { static OPERATIONS_COMPLETED: std::cell::Cell<usize> = std::cell::Cell::new(0); }
-
LocalKey::with- Accesses a thread-local variable
- Example:
OPERATIONS_COMPLETED.with(|ops| { ops.set(ops.get() + 1); })
Each of these APIs plays a specific role in thread management, allowing for fine-grained control over thread behavior, synchronization, and data sharing, while the program demonstrates how to build a complete multi-threaded application using atomic operations for synchronization rather than traditional locks.
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::{Arc, Mutex}; use std::thread::{self, ThreadId}; use std::time::Duration; use std::collections::HashMap; // Thread-local storage for tracking operations within each thread thread_local! { static OPERATIONS_COMPLETED: std::cell::Cell<usize> = std::cell::Cell::new(0); } fn main() { println!("Main thread ID: {:?}", thread::current().id()); println!("Main thread name: {:?}", thread::current().name()); // Create shared atomic counters let counter = Arc::new(AtomicUsize::new(0)); let should_stop = Arc::new(AtomicBool::new(false)); let all_threads_ready = Arc::new(AtomicUsize::new(0)); let threads_completed = Arc::new(AtomicUsize::new(0)); // Store thread IDs with their respective indexes let thread_id_map = Arc::new(Mutex::new(HashMap::<ThreadId, usize>::new())); // Use thread::scope for borrowing stack data thread::scope(|scope| { let mut handles = vec![]; // Create a monitoring thread that reports progress { let counter = Arc::clone(&counter); let should_stop = Arc::clone(&should_stop); let threads_completed = Arc::clone(&threads_completed); handles.push(scope.spawn( move || { // Set name for the monitoring thread let thread = thread::current(); println!("Monitor thread started: {:?} (ID: {:?})", thread.name(), thread.id()); while !should_stop.load(Ordering::Relaxed) { println!("Progress: {} operations, {} threads completed", counter.load(Ordering::Relaxed), threads_completed.load(Ordering::Relaxed)); thread::sleep(Duration::from_millis(500)); thread::yield_now(); // Demonstrate yield_now } println!("Monitor thread finished"); })); } // Create worker threads with IDs to track them let worker_threads = Arc::new(AtomicUsize::new(0)); // Create multiple worker threads using Builder for more control for i in 0..5 { let counter = Arc::clone(&counter); let should_stop = Arc::clone(&should_stop); let all_threads_ready = Arc::clone(&all_threads_ready); let threads_completed = Arc::clone(&threads_completed); let thread_id_map = Arc::clone(&thread_id_map); let worker_threads = Arc::clone(&worker_threads); // Use Builder to configure thread before spawning let handle = thread::Builder::new() .name(format!("Worker-{}", i)) .spawn_scoped(scope, move || { let thread = thread::current(); println!("Worker thread started: {:?} (ID: {:?})", thread.name(), thread.id()); // Store thread ID in the map thread_id_map.lock().unwrap().insert(thread.id(), i); // Signal that this thread is ready all_threads_ready.fetch_add(1, Ordering::SeqCst); worker_threads.fetch_add(1, Ordering::SeqCst); // Wait until all threads are ready while all_threads_ready.load(Ordering::SeqCst) < 5 { thread::park_timeout(Duration::from_millis(10)); } // Perform work until signaled to stop let mut local_ops = 0; while !should_stop.load(Ordering::Relaxed) { counter.fetch_add(1, Ordering::Relaxed); local_ops += 1; // Store in thread-local storage OPERATIONS_COMPLETED.with(|ops| { ops.set(ops.get() + 1); }); // Sleep briefly to simulate work if local_ops % 100 == 0 { thread::sleep(Duration::from_micros(1)); } } // Report final operations from thread-local storage let final_ops = OPERATIONS_COMPLETED.with(|ops| ops.get()); println!("Thread {:?} completed {} operations locally", thread.name(), final_ops); // Signal that this thread has completed threads_completed.fetch_add(1, Ordering::SeqCst); }) .expect("Failed to spawn thread"); handles.push(handle); } // Create a thread that will unpark other threads { // We can't clone ScopedJoinHandle, so we'll use a different approach let unparker = scope.spawn(move || { thread::sleep(Duration::from_millis(100)); println!("Unparking worker threads..."); // Wait until all worker threads are ready while worker_threads.load(Ordering::SeqCst) < 5 { thread::sleep(Duration::from_millis(10)); } // Signal all threads to wake up by changing the all_threads_ready counter all_threads_ready.store(5, Ordering::SeqCst); println!("All threads should now be unparked"); }); handles.push(unparker); } // Let the threads run for a while thread::sleep(Duration::from_secs(2)); // Signal all threads to stop should_stop.store(true, Ordering::Relaxed); // Wait for all worker threads to finish println!("Waiting for all threads to complete..."); // Check if threads are finished before joining for (i, handle) in handles.iter().enumerate() { match handle.is_finished() { true => println!("Thread {} already finished", i), false => println!("Thread {} still running", i), } } // Join all threads for handle in handles { if let Err(e) = handle.join() { println!("Error joining thread: {:?}", e); } } }); println!("Final counter value: {}", counter.load(Ordering::Relaxed)); }