Rustling Up Fun: A Simple Dive into Thread-Pool Patterns with Rust!

Photo by Ezequiel Da Silva: https://www.pexels.com/photo/white-concrete-framed-swimming-pool-near-benches-with-gate-surrounded-by-trees-832975/

Introduction

In some situation you have a lot of short-lived tasks which need to be performed concurrently. In many cases the overhead of creating and destroying these threads can inhibit the performance of the program. A solution to that would be to have a pool of these tasks, and take them out of that pool when needed. These tasks are all quite similar, by the very nature of this pattern. That is why this pattern is sometimes and less frequently called the replicated worker or the worker-crew model.

Another advantage of this pattern is the fact that the number of available threads can be tuned to the available compute resource, i.e. the number of processor cores, or the amount of memory.

One of the constraints on these tasks is that they are not interdependent, that is, the results of one a task does not depend on the results by a previous task, or a next task depends on the results of your current task. This keeps tasks isolated and easy to store in a pool.

Typical use-cases include:

  • Web serving and API-serving. Requests are normall quite small and short-lived and therefore ideally suited for a threadpool and indeed many webservers implement this pattern.
  • Batch processing of images, video-files or audio-files. Resizing images for example is also a small and well-defined task ideally suited for a thread pool.
  • Data-processing pipelines: Each stage in a data processing pipeline can be handled by a threadpool. As mentioned before tasks should not be interdependent, to enhance the effectivity of the thread-pool

Implementation in Rust

In our example we will build a fake web-server, but this could easily be extended to a real web-server.

Before we start we need to add a crate to our Cargo.toml file

cargo add fstrings

We will use this crate to format our strings in a Python-like way.

Next in our main.rs add the following lines:

use std::sync::{Arc, Mutex};
use std::thread;
#[macro_use]
extern crate fstrings;

Line by line:

  • We will use Arc and Mutex to make our thread-pool threadsafe.
  • Using std::thread we can spawn new threads.

The WebRequest struct

The WebRequest struct looks like this:

struct WebRequest {
    work: Box<dyn FnOnce(&str) + Send + Sync>,
}

impl WebRequest {
    fn new<F>(f: F) -> WebRequest
    where
        F: FnOnce(&str) + Send + Sync + 'static,
    {
        WebRequest { work: Box::new(f) }
    }
}

Some notes:

  1. The WebRequest in our simplified example contains one field, work which is a boxed closure. Why is it boxed? Well, the closure is dynamically sized, or in other words, the size is not known at compile-time and so we need to store it in a heap-allocated container like Box. The Send and Sync trait indicate to us and the compiler that this particular closure can be safely sent and accessed across multiple threads.
  2. The constructor takes a closure as its single argument. Of course it must satisfy the same constraints as the field in our struct. The 'static lifetime is needed because the closure could outlive the scope in which it was defined.

The ThreadPool struct

The ThreadPool struct is the heart of our pattern. Let’s have a look at the struct first:

struct ThreadPool {
    workers: Vec<thread::JoinHandle<()>>,
    queue: Arc<Mutex<Vec<WebRequest>>>,
}

This struct has two fields:

  1. The workers vector, which represents the workers threads. Each element is a handle to a thread. We need to be able to do this in order to wait for its completion
  2. The queue This is a queue of tasks to be executed by one of the worker threads. The Arc allows for sharing across more than one thread and we can make sure that only thread can access the queue by using the Mutex struct.

Now we can have a look at the implementation. First the constructor:

fn new(num_threads: usize) -> ThreadPool {
        let mut workers = Vec::with_capacity(num_threads);
        let queue = Arc::new(Mutex::new(Vec::<WebRequest>::new()));

        for i in 0..num_threads {
            let number=f!("Request {i}");
            let queue_clone = Arc::clone(&queue);
            workers.push(thread::spawn(move || loop {
                let task = queue_clone.lock().unwrap().pop();
                if let Some(task) = task {
                    (task.work)(&number);
                } else {
                    break;
                }
            }));
        }
        ThreadPool { workers, queue }
    }

This method initializes the pool with the specifed number of threads. It creates the queue. After which the constructor spawns the worker threads. These threads enter a loop, popping task of the queue, and performing them. If the queue happens to be empty the worker thread the loop breaks.

The execute() method

The execute() method looks like this:

    fn execute<F>(&self, f: F)
    where
        F: FnOnce(&str) + Send + Sync + 'static,
    {
        let task = WebRequest::new(f);
        self.queue.lock().unwrap().push(task);
    }

This method simply creates a new WebRequest with the specified closure, and pushes it onto the queue.

The join() method

Let’s have a look at the join() method:

    fn join(self) {
        for worker in self.workers {
            worker.join().unwrap();
        }
    }

This is a blocking operation as it waits for the threads to finish.

Testing time

Now we can test our setup:

fn main() {
    let pool = ThreadPool::new(6);
    for i in 0..6 {
        pool.execute(move |message| {
            println!("Task: {} prints  {}",i, message);
        });
    }
    pool.join();
}

Line by line:

  1. We create a ThreadPool struct with 6 worker threads.
  2. Next we add workers by repeatedly calling the execute method. All these workers do, is print out a message. You can make the logic as complicated as you want of course. It also becomes clear why the tasks can not depend on each other, as in our current very strict yet simple setup, there is no way to communicate between the different threads.
  3. After all the workers have been added to the pool, we wait for each to thread to end by calling the join() method.

Conclusion

As you can see this pattern is very flexible, within its limitations, however consider the following factors influencing performance and resource-usage:

  1. If the program destroys threads too slowly, of if destruction is blocked, this can starve other threads of resources.
  2. If you create too many threads this can waste resources and time creating unused threads.
  3. If thread creation takes too long this can inhibit performance.
  4. Destroying too many threads can cost time at a later point when you need to create them again.

All in all finding the right number of threads may sometimes be very clear, and sometime you will need to use some trial-and-error to find the optimal number. A more advanced algorithm could dynamically increase and decrease the number of available threads based on demand. In a next article I will try and implement such an algorithm.

Leave a Reply

Your email address will not be published. Required fields are marked *