Mastering Rust’s Simple Join Pattern: Effortless Concurrency for Parallelism

Photo by Tim Gouw: https://www.pexels.com/photo/traffic-light-under-blue-sky-147430/

Introduction

In some applications it is handy not to say necessary for the main thread (or a thread) to wait for the completion of several worker threads.

Examples of these kinds of problems are:

  1. Web scraping: when fetching data from several sites, or from different parts of the same site, it is often efficient to have separate threads handle those different. We will see a very basic implementation of this later on.
  2. File downloads: Again, you could start a separate thread for each download, and wait for each thread to complete
  3. Database operations: You can have threads handling different queries or transactions from differentr users, and in the main thread wait for the completion for each of them

There are probably many other use cases, the common thing between these use cases is that we delegate part of the work to separate threads, and we have one thread waiting for the completion for each thread.

One constraint about the worker threads is that they represent independent tasks, so there is no internal dependency between them.

Implementation in Rust

In this example we will build a simple web scraper, using the reqwest crate. You can start by adding this to your Cargo.toml in the [dependencies] section:

reqwest = { features = ["blocking"],version = "0.11.24" }

We will start by defining the scrape() function. In this function, all we do is get the contents from url and return those, or in case something went wrong we return the an Error. That is why we have Result return type:

fn scrape(url: &str) -> Result<String, reqwest::Error> {
    reqwest::blocking::get(url)?.text()
}

You may have noticed two things:

  1. We use get(url)?. We use the ?-operator to propagate errors upward, so if there is an error, an Err variant is immediately returned.
  2. Why do we use a blocking request in a multi-threaded context? Since, as you will see, the scrape() function will be executed in a separate thread, and so this function does not need to be asynchronous itself. Otherwise we’d have to use an async runtime, in a context that basically does not support it.

Next we implement the join() function. This function returns a vector of JoinHandle. In it we iterate over the supplied vector of url strings, and create handles by spawning threads. We push these handles onto a vector which is returned.

fn join(urls: Vec<&'static str>) -> Vec<std::thread::JoinHandle<()>> {
    let mut handles = vec![];
    for url in urls {
        let handle = std::thread::spawn(move || {
            let body_result = scrape(url);
            match body_result {
                Ok(body) => {
                    println!("Url {} returned {} bytes", url, body.len());
                }
                Err(e) => {
                    println!("Error is {}", e);
                }
            }
        });
        handles.push(handle);
    }
    handles
}

One thing to note is ‘static lifetime on the parameter. In Rust the ‘static lifetime is a special lifetime, as it represents the entire duration of the program. Because we need to safely move our url-strings into the newly created threads, guaranteeing that these variables won’t be dropped or invalidated before the program terminates, we use this lifetime. In practice the ‘static lifetime is often used when data needs to be shared between threads.

Time to test

Now we can test our setup:

fn main() {
    let urls = vec![
        "https://www.rust-lang.org",
        "https://www.google.com",
        "https://hackingwithrust.net",
    ];
    let handles = join(urls);
    for handle in handles {
        match handle.join() {
            Ok(_) => {}
            Err(_) => println!("Thread failed"),
        }
    }
}

Line by line:

  1. We initialize vector of urls
  2. Next we call the join() function to get our vector of JoinHandle
  3. After which we iterate over the handles, and we wait for the threads to finish.

Conclusion

Implementing this pattern, made me realize that it is basically built-in, since the standard library provides most or all of the functionality you might need. However, I learned several things from implementing this simple web-scraper:

  1. If you are calling a method or a function in an async context, like our scrape() function, this method or function can contain blocking function, like our blocking::get call.
  2. The use of ?-operator to propagate errors to the caller.
  3. When moving values into threads, you often need the ‘static lifetime, and is quite handy.
  4. Try to unwrap possible error conditions as much as possible, especially in production code.

Implementation of this version of this pattern was quite easy, since all threads are independent of each other. What happens if this is not the case? Well, that is a question for a next post.

Leave a Reply

Your email address will not be published. Required fields are marked *