A Trio of Concurrency in Rust: Join, Arc, and mpsc Channel Synchronization in Practice

A Trio of Concurrency in Rust:<span>Join</span><span>Arc<Mutex></span> and <span>mpsc</span> Channel Synchronization in Practice

Concurrency is one of Rust’s core advantages, but handling shared state and thread communication has always been a challenge in programming. Rust, with its ownership system and unique synchronization primitives, makes multithreaded programming safe and efficient, completely eliminating common issues like data races and deadlocks.

A Trio of Concurrency in Rust: Join, Arc<Mutex>, and mpsc Channel Synchronization in Practice

This article will systematically master the three core aspects of Rust concurrency programming through three practical examples, from basic to advanced: task initiation and result synchronization, safe updates of shared mutable state, and message passing with MPSC channels and graceful exit. We will delve into the collaboration mechanisms of <span>thread::spawn</span>, <span>JoinHandle</span>, <span>Arc</span>, <span>Mutex</span>, and <span>mpsc::channel</span> to help you build robust systems in Rust’s high-concurrency world.

Practical Operations

Example One

// threads1.rs
//
// This program spawns multiple threads that each run for at least 250ms, and
// each thread returns how much time they took to complete. The program should
// wait until all the spawned threads have finished and should collect their
// return values into a vector.

use std::thread;
use std::time::{Duration, Instant};

fn main() {
    let mut handles = vec![];
    for i in 0..10 {
        handles.push(thread::spawn(move || {
            let start = Instant::now();
            thread::sleep(Duration::from_millis(250));
            println!("thread {} is complete", i);
            start.elapsed().as_millis()
        }));
    }

    let mut results: Vec = vec![];
    for handle in handles {
        results.push(handle.join().unwrap());
    }

    if results.len() != 10 {
        panic!("Oh no! All the spawned threads did not finish!");
    }

    println!();
    for (i, result) in results.into_iter().enumerate() {
        println!("thread {} took {}ms", i, result);
    }
}

This Rust code demonstrates how to use multithreading to execute tasks in parallel and wait for all tasks to complete before collecting results. The program first creates ten independent threads in a loop, using <span>thread::spawn</span> to start them and stores each thread’s <span>JoinHandle</span> in a vector <span>handles</span>. Inside each thread, it records the start time, forces a sleep of 250 milliseconds, and then calculates and returns the total time taken by the thread (in milliseconds). In the main thread, the code iterates over the <span>handles</span> vector, calling <span>handle.join().unwrap()</span> on each handle. This <span>join</span> method blocks the main thread until the corresponding child thread has finished executing and retrieves its returned result value. Finally, the program collects the time results of all threads into the <span>results</span> vector, verifies that all threads have completed, and prints out the running time of each thread, demonstrating the concurrent execution of tasks and result synchronization in multithreaded programming.

Example Two

// threads2.rs
//
// Building on the last exercise, we want all of the threads to complete their
// work but this time the spawned threads need to be in charge of updating a
// shared value: JobStatus.jobs_completed

use std::sync::Arc;
use std::sync::Mutex;
use std::thread;
use std::time::Duration;

struct JobStatus {
    jobs_completed: u32,
}

fn main() {
    let status = Arc::new(Mutex::new(JobStatus { jobs_completed: 0 }));
    let mut handles = vec![];
    for _ in 0..10 {
        let status_shared = Arc::clone(&status);
        let handle = thread::spawn(move || {
            thread::sleep(Duration::from_millis(250));
            let mut status = status_shared.lock().unwrap();
            status.jobs_completed += 1;
        });
        handles.push(handle);
    }
    for handle in handles {
        handle.join().unwrap();
        let status = status.lock().unwrap();
        println!("jobs completed {}", status.jobs_completed);
    }
}

This Rust code demonstrates how to safely update shared mutable data in a multithreaded environment, primarily relying on the two smart pointers <span>Arc<T></span> and <span>Mutex<T></span>. The program creates a <span>JobStatus</span> struct containing a counter for <span>jobs_completed</span>, and wraps it with <span>Mutex</span> to ensure mutual exclusion, then uses <span>Arc</span> to safely share this locked data pointer among multiple threads. In the loop, the code spawns ten threads, each acquiring ownership of the shared data through <span>Arc::clone</span>. In the child threads, before modifying <span>jobs_completed</span>, it must call <span>.lock().unwrap()</span> to obtain the lock, thereby gaining exclusive mutable access; once the counter is updated, the lock is automatically released. Finally, the main thread waits for all threads to complete by calling <span>.join()</span> on each thread handle, and after each wait, it prints the current number of completed jobs to synchronize and display the final state of the shared variable.

Example Three

// threads3.rs

use std::sync::mpsc;
use std::sync::Arc;
use std::thread;
use std::time::Duration;

struct Queue {
    length: u32,
    first_half: Vec,
    second_half: Vec,
}

impl Queue {
    fn new() -> Self {
        Queue {
            length: 10,
            first_half: vec![1, 2, 3, 4, 5],
            second_half: vec![6, 7, 8, 9, 10],
        }
    }
}

// Method One
fn send_tx(q: Queue, tx: mpsc::Sender) -> () {
    let qc = Arc::new(q);

    let qc1 = Arc::clone(&qc);
    let qc2 = Arc::clone(&qc);

    let tx1 = tx.clone();

    let handle1 = thread::spawn(move || {
        for val in &qc1.first_half {
            println!("sending {:?}", val);
            tx1.send(*val).unwrap();
            thread::sleep(Duration::from_secs(1));
        }
    });

    let handle2 = thread::spawn(move || {
        for val in &qc2.second_half {
            println!("sending {:?}", val);
            tx.send(*val).unwrap();
            thread::sleep(Duration::from_secs(1));
        }
    });

    handle1.join().unwrap();
    handle2.join().unwrap();
}

fn main() {
    let (tx, rx) = mpsc::channel();
    let queue = Queue::new();
    let queue_length = queue.length;

    send_tx(queue, tx);

    let mut total_received: u32 = 0;
    for received in rx {
        println!("Got: {}", received);
        total_received += 1;
    }

    println!("total numbers received: {}", total_received);
    assert_eq!(total_received, queue_length)
}

// Method Two
fn send_tx(q: Queue, tx: mpsc::Sender) -> () {
    let qc = Arc::new(q);

    let qc1 = Arc::clone(&qc);
    let qc2 = Arc::clone(&qc);

    let tx1 = tx.clone();

    thread::spawn(move || {
        for val in &qc1.first_half {
            println!("sending {:?}", val);
            tx1.send(*val).unwrap();
            thread::sleep(Duration::from_secs(1));
        }
    });

    let tx2 = tx.clone();
    thread::spawn(move || {
        for val in &qc2.second_half {
            println!("sending {:?}", val);
            tx2.send(*val).unwrap();
            thread::sleep(Duration::from_secs(1));
        }
    });
}

The core of this Rust code is to utilize <span>mpsc::channel</span> (multi-producer, single-consumer channel) and <span>Arc</span> (atomic reference counting) to achieve safe data transmission and synchronization between threads.

Core Functions and Mechanisms

The program first creates a <span>Queue</span> struct containing two sets of numbers. <span>Arc</span> smart pointers are used to safely share <span>Queue</span> data among multiple child threads, allowing them to concurrently act as data producers. The main thread acts as a consumer, receiving all sent data through a <span>for received in rx</span> loop.

To ensure that the receiving end <span>rx</span> can gracefully exit after all data has been sent (i.e., knowing the channel is closed), the code proposes two different management methods for the sender:

  1. Method One: Use <span>handle.join()</span> for forced synchronization
  • Principle: Inside the <span>send_tx</span> function, by calling <span>join()</span> on the two thread handles, it forces blocking<span>send_tx</span> function execution until both child threads finish sending and exit. This ensures that before <span>send_tx</span> returns to the main thread, all <span>mpsc::Sender</span> instances (<span>tx1</span> and the original <span>tx</span>) in the child threads have been destroyed (<span>drop</span><code><span>). This is the most </span><strong><span>reliable</span></strong><span> and logically clear synchronization method, as it explicitly guarantees that the channel is closed when the function returns.</span>
  • Method Two: Use the automatic <span>drop</span> exit of the <span>tx</span> instance
    • Principle: Inside the <span>send_tx</span> function, two sender clones (<span>tx1</span> and <span>tx2</span>) are created and moved to the two child threads, while the original passed-in <span>tx</span> instance is left idle. When the <span>send_tx</span> function execution ends and returns, this idle original <span>tx</span> will be automatically destroyed. Once all three senders (the original <span>tx</span>, <span>tx1</span>, and <span>tx2</span>) are destroyed, <span>rx</span> will receive a signal that the channel is closed and exit the loop. This method avoids blocking, making the code cleaner, but its correctness relies on the child threads being able to complete their work while the main thread receives data.

    Ultimately, regardless of the method, the main thread successfully receives 10 numbers, then exits the loop, and performs the final <span>assert_eq!</span> verification of the total received count, perfectly demonstrating the mechanism of multithreaded cooperation and channel closure.

    Conclusion

    These three examples cover the three key pillars of Rust concurrency programming: thread management, state sharing, and message passing.

    1. Task synchronization: Through the <span>handle.join()</span> mechanism, ensure that the main thread synchronizes when the child threads complete their work, safely obtaining and collecting return results.
    2. State sharing: For shared mutable data, <span>Arc<Mutex<T>></span> is the standard pattern provided by Rust for thread-safe sharing, with <span>Mutex</span> ensuring that only one thread can modify it at a time.
    3. Channel synchronization:<span>mpsc::channel</span> provides a decoupled solution for inter-thread communication. By precisely managing the lifecycle of all <span>mpsc::Sender</span> instances (ensuring they are eventually <span>drop</span>), the receiving end can be notified of channel closure, achieving a graceful exit from the program loop.

    Mastering these concurrency primitives gives you the core capabilities of the Rust language in building high-performance, highly reliable concurrent systems.

    References

    • The Rust Programming Language: https://kaisery.github.io/trpl-zh-cn/
    • Learn Rust by Example: https://rustwiki.org/zh-CN/rust-by-example/
    • The Rust Language Bible: https://course.rs/about-book.html
    • The Rust Nomicon: https://nomicon.purewhite.io/intro.html
    • Rust Algorithm Tutorial: https://algo.course.rs/about-book.html
    • Rust Reference Manual: https://rustwiki.org/zh-CN/reference/introduction.html

    A Trio of Concurrency in Rust: Join, Arc<Mutex>, and mpsc Channel Synchronization in Practice

    If you like it, give it a “like”; if you want more “Seekers of the Moon” to see it, give it a “recommend”.

    Leave a Comment