14. Thread Pool

14.1 The need for multi-threading

The event loop assumes that everything in the loop will be finished quickly, so blocking IO is forbidden. However, there are still problems:

  • Third-party code that uses blocking IOs.
  • CPU-intensive code that slows down the event loop.

Client-side code

Most client-side code is blocking, such as the DNS client in libc (getaddrinfo), the HTTP client libcurl. They cannot be used in an event loop, requiring multi-threading. Or you can find a non-blocking, callback-based API that is compatible with the event loop, such as c-ares for DNS. Libcurl has both blocking and non-blocking APIs.

Even if a non-blocking API exists, it’s much harder to use due to callbacks. So it may still be desirable to use a blocking client. Fortunately, our Redis server doesn’t need any client-side code.

CPU-intensive code

An event loop avoids blocking IO, but it doesn’t help if the code takes too long on the CPU. We already have a few workarounds, such as the progressive hashtable resizing, and the simultaneous expiring timer limit. But we missed one thing: When deleting a sorted set, the destructor deallocates each pair. This O(N) operation is problematic for large sorted sets.

The real Redis has added the UNLINK command as a workaround, which deletes the key but runs the destructor in another thread. This is a common use of multi-threading: Run a piece of code in another thread (asynchronously).

The producer-consumer problem

Most multi-threading problems can be restated as producer-consumer problems. For example, to run an asynchronous task in a thread, there is a consumer thread, called a “worker”, waiting for the next piece of work (task), and a producer thread with the work that is trying to find and communicate with a consumer. If you can solve the producer-consumer problem, you can solve most multi-threading problems.

14.2 Synchronization primitives

Communicate by sharing data

Producers communicate with consumers via a shared data structure, usually a queue.

// pseudo code
std::vector<Work> queue;    // shared between producers and consumers

void produce() {
    Work work = ...;
    queue.push_back(work);  // add the work to the queue
}

void try_to_consume() {
    if (!queue.empty()) {
        Work work = queue.front();
        queue.pop_front();
        // Do something with the work ...
    } else {
        // Nothing to do! How to wait for the queue?
    }
}

But that’s only half the problem. How does the consumer wait for the queue to become non-empty? Without knowing anything about multi-threading, one may just check the queue in a loop:

void consumer() {
    while (true) {
        try_to_consume();
    }
}

This is called busy waiting, or spinning, and it’s not a practical solution because it uses 100% CPU all the time. Some people will try to add a sleep() to the loop. This is how to do multi-threading if you don’t know multi-threading.

Sleep-signal-wakeup mechanisms

In real multi-threaded code, the consumer should sleep when it’s waiting for work, and the producer can signal the consumer after updating the queue so that the consumer thread only executes when it’s needed. This sleep-signal-wakeup mechanism must be provided by the operating system, because the OS scheduler controls which thread should run at what time.

The mechanisms are called synchronization primitives. There are many of them, and they are all sleep-signal-wakeup mechanisms.

Common synchronization primitives

There are multiple levels of synchronization primitives. Low-level primitives are Linux futex and Windows WaitOnAddress(). They are provided directly by the OS, most generic, but hard to use. They are used to implement high-level primitives instead of being used directly by programmers.

High-level primitives are mutex, condition variable, and semaphore. They are implemented from the low-level OS primitives, easier to use, but still generic. You can solve any multi-threading problem after mastering them. This is what to learn.

There are higher-level stuff built on top of these primitives, such as concurrent queue, thread pool, readers-writer lock, Go’s channel, etc. They are less generic and only suitable for specific use cases. They are easy to use even without multi-threading knowledge, giving people the illusion that they can do multi-threading.

CPU Atomics

A CPU atomic operation makes a multi-step memory read-write operation appear as if it’s a single step. An instruction set includes a fixed set of atomic operations on fixed-size integers. Common multi-step atomic operations include atomic exchange, atomic compare and exchange, etc.

// pseudo code
T atomic_exchange(T *cur_value, T new_value) {
    T old_value = *cur_value;
    *cur_value = new_value;
    return old_value;
}

T atomic_compare_and_exchange(T *cur_value, T expected, T new_value) {
    T old_value = *cur_value;
    if (*cur_value == expected) {
        *cur_value = new_value;
    }
    return old_value;
}

Atomic values are often mentioned in multi-threading introductions, but they are only for some niche applications because they cannot sleep/wake threads. Atomics are neither necessary nor sufficient for most multi-threading programs.

14.3 Mutex and condition variable

Mutex

When sharing data between threads, there must be a way to prevent consumers from using the partially updated data while a producer is updating. This is usually done with a mutex (mutual exclusion), also called a lock. Only one thread can hold the lock at a time, while the other threads must wait.

How does a thread wait for a lock? There are 2 kinds of locks: spinlocks and regular locks. A spinlock waits by busy waiting using CPU atomics, wasting CPU cycles.

// pseudo code
struct SpinLock {
    bool is_locked = false;
    void lock() {
        while (atomic_swap(&is_locked, true)) {}    // busy waiting
    }
    void unlock() {
        atomic_store(&is_locked, false);
    }
};

A regular lock can put the thread to sleep while waiting. Locks used in userspace programs are hybrids; they spin for a while before sleeping.

Semaphore

A semaphore is like a lock, but it’s an integer counter instead of a single boolean flag:

  • “Lock” decreases the counter by 1.
  • “Unlock” increases the counter by 1.
  • “Lock” sleeps if the counter becomes negative.
  • “Unlock” wakes up a sleeping thread if the counter was negative.
struct Semaphore {
    int counter;
    explicit Semaphore(int init) : counter(init) { }
    void increase();
    void decrease();
};

A semaphore doesn’t require that “lock” and “unlock” to be paired, and the counter can be initialized to any value. A regular lock is a special case of a semaphore, where 1 is the unlocked state and 0 is the locked state.

struct Mutex {
    Semaphore sem;
    explicit Mutex() : sem(1) {}
    void lock()   { sem.decrease(); }
    void unlock() { sem.increase(); }
};

Semaphores are popular in textbooks, but they are tricky to use in practice. We won’t discuss them because there are less tricky options.

Wait for something to happen

Let’s make queue access mutual exclusive in the producer-consumer problem.

// pseudo code
Mutex mu;
std::vector<Work> queue;    // shared between producers and consumers

void produce() {
    Work work = ...;
    mu.lock();
    queue.push_back(work);  // update the queue
    mu.unlock();
}

void try_to_consume() {
    mu.lock();
    if (!queue.empty()) {
        Work work = queue.front();
        queue.pop_front();
        // Do something with the work ...
    }   // else: Nothing to do! How to wait for the queue?
    mu.unlock();
}

The remaining problem is waiting for the queue to become non-empty without busy waiting, which requires a sleep-signal-wakeup mechanism. But can you do this with mutexes, since they are such a mechanism? It is possible, but not obvious. We need a new, easier way to wake a sleeping thread, let’s call it wait() & signal().

// a hypothetical primitive
struct Event {
    void wait();    // put the thread to sleep
    void signal();  // wake up 1 sleeping thread
};

Mutex mu;
Event ev;
std::vector<Work> queue;    // shared between producers and consumers

void produce() {
    Work work = ...;
    mu.lock();
    queue.push_back(work);  // update the queue
    ev.signal();            // wake up a sleeping thread if there is one
    mu.unlock();
}

void consume() {
    mu.lock();
    // wait for the queue to become non-empty
    while (queue.empty()) {
        mu.unlock();    // release the lock before sleeping
        ev.wait();      // wait by sleeping
        mu.lock();      // reclaim the lock
    }
    // update the queue
    Work work = queue.front();
    queue.pop_front();
    // Do something with the work ...
    mu.unlock();
}

The consumer must release the lock before sleeping, so that the producer can update the queue. But the problem is: The producer may update the queue and call signal() between unlock() and wait().

Step Producer Consumer
1 lock
2 queue empty?
3 unlock
4 lock
5 update the queue
6 unlock
7 signal (nobody!)
8 wait (not empty!)

If signal() is called before wait(), the consumer will just sleep forever!

Condition variable

The hypothetical signal & wait primitive doesn’t work because the consumer cannot sleep unconditionally; it can only sleep when the queue is really empty, which cannot be guaranteed because the lock is released before it sleeps.

To make it work, “unlock” and “sleep” should be a single step that prevents the condition from changing in between. This is called a condition variable, it has 2 operations:

  • wait() releases the lock, goes to sleep, and reclaims the lock when it wakes up.
  • signal() wakes up 1 sleeping thread, if there are any.
struct Cond {
    void wait(Mutex &mu);
    void signal();
};

Mutex mu;
Cond cond;
std::vector<Work> queue;    // shared between producers and consumers

void produce() {
    Work work = ...;
    mu.lock();
    queue.push_back(work);  // update the queue
    cond.signal();          // wake up a sleeping thread if there is one
    mu.unlock();
}

void consume() {
    mu.lock();
    // wait for the queue to become non-empty
    while (queue.empty()) {
        // release the lock and enter sleep in a single step
        cond.wait(mu);
        // lock reclaimed
    }
    // update the queue
    Work work = queue.front();
    queue.pop_front();
    // Do something with the work ...
    mu.unlock();
}

Now that “unlock” and “wait” are a single step, the producer cannot update the queue before the consumer enters sleep, so the queue is guaranteed to be empty when the consumer sleeps.

A condition variable is always associated with a lock; the lock protects the condition. The condition can be arbitrary, so you can solve any multi-threading problem with condition variables.

Condition variable spurious wakeups

An important note is that the condition is always checked in a loop:

    // The condition is always checked in a loop
    while (queue.empty()) {
        cond.wait(mu);
    }

This is NOT how to use condition variables:

    // WRONG way to use condition variables
    if (queue.empty()) {
        cond.wait(mu);
    }
    // This is wrong because the condition may have been changed.

In the producer-consumer scenario, you may reason that the consumer can assume that the queue is not empty after the wakeup. This assumption is only true for a single consumer. If there are multiple consumers, the one that is woken by signal() may not get to consume the queue; other consumers can acquire the lock and steal the item before the woken consumer, leaving the queue empty again!

Step Producer Consumer A Consumer B
1 signal (wakes A) sleeping… whatever…
2 unlock sleeping… whatever…
3 sleeping… lock
4 sleeping… consume the queue
5 sleeping… unlock
6 woken by signal
7 lock
8 queue empty?
9 wait again

signal() doesn’t pass the lock to the woken thread; the woken thread competes with other consumers for the lock, so the condition may have changed by the time the woken thread reclaims the lock, requiring it to check the condition again. This is called spurious wakeups for condition variables.

signal() doesn’t depend on the lock, unlike wait(), so it can be called from anywhere.

// Both are correct!
void produce_1() {
    Work work = ...;
    mu.lock();
    queue.push_back(work);
    cond.signal();  // signal before releasing the lock
    mu.unlock();
}
void produce_2() {
    Work work = ...;
    mu.lock();
    queue.push_back(work);
    mu.unlock();
    cond.signal();  // signal after releasing the lock
}

14.4 The `pthread` API

Threading in Linux C

Mutexes and condition variables are the most versatile synchronization primitives. They are included in all major multi-threading APIs. On Linux, they are available as the pthread API, implemented with futex as part of libc.

The C++ threading API is a thin wrapper around pthread on Linux. It doesn’t matter which API to use, since they are the same set of synchronization primitives.

Start a thread

A thread is referred to as a handle pthread_t. The handle is initialized by creating a new thread with pthread_create().

int pthread_create(
    pthread_t *thread, const pthread_attr_t *attr,
    void *(*func)(void *arg), void *arg);

Extra options are passed via pthread_attr_t. It can be NULL if not needed.

The new thread starts by executing the specified function, which takes a void * argument. To pass more arguments, pass a pointer to struct.

All pthread functions return an error code instead of messing with errno.

Join a thread

Waiting for a thread or process to finish is called “joining”, which is done with pthread_join().

int pthread_join(pthread_t thread, void **retval);

“Join” is part of the “fork and join” concurrency model, where “fork” means spawning new processes or threads. In server-side applications, the lifetime of a thread pool is often the same as the lifetime of the server, in which case there is no need for “join”.

Terminate a thread

Unlike processes, most threads cannot be legitimately killed, even though there is an API. Threads in a process share all resources; if a thread has allocated some memory or acquired some locks, those resources won’t be released when is killed.

A thread either never dies, or it dies naturally.

Mutex

These functions are self-explanatory:

int pthread_mutex_init(
    pthread_mutex_t *mutex, const pthread_mutexattr_t *mutexattr);
int pthread_mutex_destroy(pthread_mutex_t *mutex);
int pthread_mutex_lock(pthread_mutex_t *mutex);
int pthread_mutex_unlock(pthread_mutex_t *mutex);

pthread_mutexattr_t are extra options not needed for typical apps. None of these functions are expected to fail, except for unrecoverable programming errors.

Condition variable

Condition variables are explained in the previous section.

int pthread_cond_init(pthread_cond_t *cond, pthread_condattr_t *cond_attr);
int pthread_cond_destroy(pthread_cond_t *cond);
int pthread_cond_signal(pthread_cond_t *cond);
int pthread_cond_broadcast(pthread_cond_t *cond);   // wake up all threads
int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex);
int pthread_cond_timedwait(
    pthread_cond_t *cond, pthread_mutex_t *mutex,
    const struct timespec *abstime);

The new thing is broadcast(): If the producer creates a condition that can only be consumed multiple times, such as putting multiple items in a queue, the producer must signal multiple times, or wake up all consumers by broadcast()’ing.

timedwait() is a variant of wait() that can timeout. Surprisingly, it doesn’t use monotonic time by default, which can be fixed with an option:

pthread_condattr_setclock(&cond_attr, CLOCK_MONOTONIC);
pthread_cond_init(&cond, &cond_attr);

14.5 Code a thread pool

Concurrent queue

A thread pool has a fixed number of consumer threads, called “workers”. An unspecified number of producers can issue tasks to workers via a queue. Consumers sleep when the queue is empty, until they are woken up by a producer, as illustrated by the condition variable introduction.

struct TheadPool {
    std::vector<pthread_t> threads;
    std::deque<Work> queue;
    pthread_mutex_t mu;
    pthread_cond_t not_empty;
};

The “task” is just a function pointer with a void * argument.

struct Work {
    void (*f)(void *) = NULL;
    void *arg = NULL;
};

The producer (event loop)

Let’s turn the pseudo code produce() into real code.

void thread_pool_queue(TheadPool *tp, void (*f)(void *), void *arg) {
    pthread_mutex_lock(&tp->mu);
    tp->queue.push_back(Work {f, arg});
    pthread_cond_signal(&tp->not_empty);
    pthread_mutex_unlock(&tp->mu);
}

Then we can do the sorted set destruction in the thread pool.

// previous `entry_del()`
static void entry_del_sync(Entry *ent) {
    if (ent->type == T_ZSET) {
        zset_clear(&ent->zset);
    }
    delete ent;
}
// a wrapper function for the thread pool
static void entry_del_func(void *arg) {
    entry_del_sync((Entry *)arg);
}
// new `entry_del()`
static void entry_del(Entry *ent) {
    // unlink it from any data structures
    entry_set_ttl(ent, -1); // remove from the heap data structure
    // run the destructor in a thread pool for large data structures
    size_t set_size = (ent->type == T_ZSET) ? hm_size(&ent->zset.hmap) : 0;
    const size_t k_large_container_size = 1000;
    if (set_size > k_large_container_size) {
        thread_pool_queue(&g_data.thread_pool, &entry_del_func, ent);
    } else {
        entry_del_sync(ent);    // small; avoid context switches
    }
}

Deleting a large sorted set won’t freeze the event loop, but frequent context switching can degrade system performance, so we added a threshold for the sorted set size.

The consumers (workers)

The thread start function takes the thread pool as an argument.

void thread_pool_init(TheadPool *tp, size_t num_threads) {
    pthread_mutex_init(&tp->mu, NULL);
    pthread_cond_init(&tp->not_empty, NULL);
    tp->threads.resize(num_threads);
    for (size_t i = 0; i < num_threads; ++i) {
        int rv = pthread_create(&tp->threads[i], NULL, &worker, tp);
        assert(rv == 0);
    }
}

worker() is the real code of consumer().

static void *worker(void *arg) {
    TheadPool *tp = (TheadPool *)arg;
    while (true) {
        pthread_mutex_lock(&tp->mu);
        // wait for the condition: a non-empty queue
        while (tp->queue.empty()) {
            pthread_cond_wait(&tp->not_empty, &tp->mu);
        }
        // condition met, consume the queue
        Work w = tp->queue.front();
        tp->queue.pop_front();
        pthread_mutex_unlock(&tp->mu);
        // do the work
        w.f(w.arg);
    }
    return NULL;
}

The thread pool is created in main():

int main() {
    thread_pool_init(&g_data.thread_pool, 4);
    // ...
}

14.6 More to learn

Bounded queue

In our case, the consumers (workers) block on an empty queue, and the producer (event loop) never blocks. However, in some use cases, there is a size limit on the queue, so when the queue is full, the producer should either:

  • block (sleep).
  • do not block and do nothing, but a notification mechanism is needed.

A bounded queue, where both producers and consumers can block, is a common building block for multi-threading programs. It needs 2 condition variables for both the producers and the consumers. Try coding this as an exercise:

struct BlockingQueue {
    std::deque<Work> queue;
    size_t limit;               // maximum size
    pthread_mutex_t mu;
    pthread_cond_t not_empty;   // consumer blocks
    pthread_cond_t not_full;    // producer blocks

    void produce(Work work);
    Work consume();
};

Send the result back to the event loop

Our thread pool is fire-and-forget, but sometimes the producer needs to do something with the result. This is easily done with any synchronization primitive. An example using a semaphore:

struct Result {
    T value;
    Semaphore done;
    Result() : done(0) {}
};

// application logic
void caller() {
    Result result;
    submit_some_async_task(&result);
    // block on the semaphore
    result.done.decrease();
    // the result is ready
    do_something_with(result.value);
}

// in another thread
void some_async_task(Result *result) {
    // produce the result ...
    result->value = ...;
    // unblock the caller
    result->done.increase();
}

But if the caller is in an event loop, it cannot block. So do_something_with() cannot be done in the event loop unless there is a way to send the result back.

If do_something_with() involves IO, such as replying to the client based on the result, it must be executed in the event loop. We can put the result in a queue, but the consumer (the event loop) cannot block. So there should be a way to wake up the event loop from the other threads.

This is usually done with a Unix pipe. The event loop poll()s for the read end of a pipe, and the other threads write to the write end of the pipe, causing poll() to wake up so that the event loop can check the result. The result data doesn’t need to be transferred over the pipe; the notifier can write 1 byte of junk and the event loop will discard any pipe data.

14.7 What we learned

  • Synchronization primitives: mutex & condition variable.
  • Concurrent queue as a multi-threading building block.

Source code: