14. Thread Pool
14.1 The need for multi-threading
The event loop assumes that everything in the loop will be finished quickly, so blocking IO is forbidden. However, there are still problems:
- Third-party code that uses blocking IOs.
- CPU-intensive code that slows down the event loop.
Client-side code
Most client-side code is blocking, such as the DNS client in libc
(getaddrinfo
), the HTTP client libcurl. They cannot be used
in an event loop, requiring multi-threading. Or you can
find a non-blocking, callback-based API that is compatible with the
event loop, such as c-ares for DNS. Libcurl has
both blocking and non-blocking APIs.
Even if a non-blocking API exists, it’s much harder to use due to callbacks. So it may still be desirable to use a blocking client. Fortunately, our Redis server doesn’t need any client-side code.
CPU-intensive code
An event loop avoids blocking IO, but it doesn’t help if the code takes too long on the CPU. We already have a few workarounds, such as the progressive hashtable resizing, and the simultaneous expiring timer limit. But we missed one thing: When deleting a sorted set, the destructor deallocates each pair. This O(N) operation is problematic for large sorted sets.
The real Redis has added the UNLINK
command as a
workaround, which deletes the key but runs the destructor in another
thread. This is a common use of multi-threading: Run a piece of code in
another thread (asynchronously).
The producer-consumer problem
Most multi-threading problems can be restated as producer-consumer problems. For example, to run an asynchronous task in a thread, there is a consumer thread, called a “worker”, waiting for the next piece of work (task), and a producer thread with the work that is trying to find and communicate with a consumer. If you can solve the producer-consumer problem, you can solve most multi-threading problems.
14.2 Synchronization primitives
Communicate by sharing data
Producers communicate with consumers via a shared data structure, usually a queue.
// pseudo code
std::vector<Work> queue; // shared between producers and consumers
void produce() {
= ...;
Work work .push_back(work); // add the work to the queue
queue}
void try_to_consume() {
if (!queue.empty()) {
= queue.front();
Work work .pop_front();
queue// Do something with the work ...
} else {
// Nothing to do! How to wait for the queue?
}
}
But that’s only half the problem. How does the consumer wait for the queue to become non-empty? Without knowing anything about multi-threading, one may just check the queue in a loop:
void consumer() {
while (true) {
();
try_to_consume}
}
This is called busy waiting, or spinning, and it’s
not a practical solution because it uses 100% CPU all the time. Some
people will try to add a sleep()
to the loop. This is how
to do multi-threading if you don’t know multi-threading.
Sleep-signal-wakeup mechanisms
In real multi-threaded code, the consumer should sleep when it’s waiting for work, and the producer can signal the consumer after updating the queue so that the consumer thread only executes when it’s needed. This sleep-signal-wakeup mechanism must be provided by the operating system, because the OS scheduler controls which thread should run at what time.
The mechanisms are called synchronization primitives. There are many of them, and they are all sleep-signal-wakeup mechanisms.
Common synchronization primitives
There are multiple levels of synchronization primitives. Low-level
primitives are Linux
futex and Windows WaitOnAddress()
. They are provided
directly by the OS, most generic, but hard to use. They are used to
implement high-level primitives instead of being used directly by
programmers.
High-level primitives are mutex, condition variable, and semaphore. They are implemented from the low-level OS primitives, easier to use, but still generic. You can solve any multi-threading problem after mastering them. This is what to learn.
There are higher-level stuff built on top of these primitives, such as concurrent queue, thread pool, readers-writer lock, Go’s channel, etc. They are less generic and only suitable for specific use cases. They are easy to use even without multi-threading knowledge, giving people the illusion that they can do multi-threading.
CPU Atomics
A CPU atomic operation makes a multi-step memory read-write operation appear as if it’s a single step. An instruction set includes a fixed set of atomic operations on fixed-size integers. Common multi-step atomic operations include atomic exchange, atomic compare and exchange, etc.
// pseudo code
(T *cur_value, T new_value) {
T atomic_exchange= *cur_value;
T old_value *cur_value = new_value;
return old_value;
}
(T *cur_value, T expected, T new_value) {
T atomic_compare_and_exchange= *cur_value;
T old_value if (*cur_value == expected) {
*cur_value = new_value;
}
return old_value;
}
Atomic values are often mentioned in multi-threading introductions, but they are only for some niche applications because they cannot sleep/wake threads. Atomics are neither necessary nor sufficient for most multi-threading programs.
14.3 Mutex and condition variable
Mutex
When sharing data between threads, there must be a way to prevent consumers from using the partially updated data while a producer is updating. This is usually done with a mutex (mutual exclusion), also called a lock. Only one thread can hold the lock at a time, while the other threads must wait.
How does a thread wait for a lock? There are 2 kinds of locks: spinlocks and regular locks. A spinlock waits by busy waiting using CPU atomics, wasting CPU cycles.
// pseudo code
struct SpinLock {
bool is_locked = false;
void lock() {
while (atomic_swap(&is_locked, true)) {} // busy waiting
}
void unlock() {
(&is_locked, false);
atomic_store}
};
A regular lock can put the thread to sleep while waiting. Locks used in userspace programs are hybrids; they spin for a while before sleeping.
Semaphore
A semaphore is like a lock, but it’s an integer counter instead of a single boolean flag:
- “Lock” decreases the counter by 1.
- “Unlock” increases the counter by 1.
- “Lock” sleeps if the counter becomes negative.
- “Unlock” wakes up a sleeping thread if the counter was negative.
struct Semaphore {
int counter;
explicit Semaphore(int init) : counter(init) { }
void increase();
void decrease();
};
A semaphore doesn’t require that “lock” and “unlock” to be paired, and the counter can be initialized to any value. A regular lock is a special case of a semaphore, where 1 is the unlocked state and 0 is the locked state.
struct Mutex {
;
Semaphore semexplicit Mutex() : sem(1) {}
void lock() { sem.decrease(); }
void unlock() { sem.increase(); }
};
Semaphores are popular in textbooks, but they are tricky to use in practice. We won’t discuss them because there are less tricky options.
Wait for something to happen
Let’s make queue access mutual exclusive in the producer-consumer problem.
// pseudo code
;
Mutex mustd::vector<Work> queue; // shared between producers and consumers
void produce() {
= ...;
Work work .lock();
mu.push_back(work); // update the queue
queue.unlock();
mu}
void try_to_consume() {
.lock();
muif (!queue.empty()) {
= queue.front();
Work work .pop_front();
queue// Do something with the work ...
} // else: Nothing to do! How to wait for the queue?
.unlock();
mu}
The remaining problem is waiting for the queue to become
non-empty without busy waiting, which requires a
sleep-signal-wakeup mechanism. But can you do this with mutexes, since
they are such a mechanism? It is possible, but not obvious. We need a
new, easier way to wake a sleeping thread, let’s call it
wait()
& signal()
.
// a hypothetical primitive
struct Event {
void wait(); // put the thread to sleep
void signal(); // wake up 1 sleeping thread
};
;
Mutex mu;
Event evstd::vector<Work> queue; // shared between producers and consumers
void produce() {
= ...;
Work work .lock();
mu.push_back(work); // update the queue
queue.signal(); // wake up a sleeping thread if there is one
ev.unlock();
mu}
void consume() {
.lock();
mu// wait for the queue to become non-empty
while (queue.empty()) {
.unlock(); // release the lock before sleeping
mu.wait(); // wait by sleeping
ev.lock(); // reclaim the lock
mu}
// update the queue
= queue.front();
Work work .pop_front();
queue// Do something with the work ...
.unlock();
mu}
The consumer must release the lock before sleeping, so that the
producer can update the queue. But the problem is: The producer may
update the queue and call signal()
between
unlock()
and wait()
.
Step | Producer | Consumer |
---|---|---|
1 | lock | |
2 | queue empty? | |
3 | unlock | |
4 | lock | |
5 | update the queue | |
6 | unlock | |
7 | signal (nobody!) | |
8 | wait (not empty!) |
If signal()
is called before wait()
, the
consumer will just sleep forever!
Condition variable
The hypothetical signal & wait primitive doesn’t work because the consumer cannot sleep unconditionally; it can only sleep when the queue is really empty, which cannot be guaranteed because the lock is released before it sleeps.
To make it work, “unlock” and “sleep” should be a single step that prevents the condition from changing in between. This is called a condition variable, it has 2 operations:
wait()
releases the lock, goes to sleep, and reclaims the lock when it wakes up.signal()
wakes up 1 sleeping thread, if there are any.
struct Cond {
void wait(Mutex &mu);
void signal();
};
;
Mutex mu;
Cond condstd::vector<Work> queue; // shared between producers and consumers
void produce() {
= ...;
Work work .lock();
mu.push_back(work); // update the queue
queue.signal(); // wake up a sleeping thread if there is one
cond.unlock();
mu}
void consume() {
.lock();
mu// wait for the queue to become non-empty
while (queue.empty()) {
// release the lock and enter sleep in a single step
.wait(mu);
cond// lock reclaimed
}
// update the queue
= queue.front();
Work work .pop_front();
queue// Do something with the work ...
.unlock();
mu}
Now that “unlock” and “wait” are a single step, the producer cannot update the queue before the consumer enters sleep, so the queue is guaranteed to be empty when the consumer sleeps.
A condition variable is always associated with a lock; the lock protects the condition. The condition can be arbitrary, so you can solve any multi-threading problem with condition variables.
Condition variable spurious wakeups
An important note is that the condition is always checked in a loop:
// The condition is always checked in a loop
while (queue.empty()) {
.wait(mu);
cond}
This is NOT how to use condition variables:
// WRONG way to use condition variables
if (queue.empty()) {
.wait(mu);
cond}
// This is wrong because the condition may have been changed.
In the producer-consumer scenario, you may reason that the consumer
can assume that the queue is not empty after the wakeup. This assumption
is only true for a single consumer. If there are multiple consumers, the
one that is woken by signal()
may not get to consume the
queue; other consumers can acquire the lock and steal the item before
the woken consumer, leaving the queue empty again!
Step | Producer | Consumer A | Consumer B |
---|---|---|---|
1 | signal (wakes A) | sleeping… | whatever… |
2 | unlock | sleeping… | whatever… |
3 | sleeping… | lock | |
4 | sleeping… | consume the queue | |
5 | sleeping… | unlock | |
6 | woken by signal | ||
7 | lock | ||
8 | queue empty? | ||
9 | wait again |
signal()
doesn’t pass the lock to the woken thread;
the
woken thread competes with other consumers for the lock, so the
condition may have changed by the time the woken thread reclaims the
lock, requiring it to check the condition again. This is called
spurious wakeups for condition variables.
signal()
doesn’t depend on the lock, unlike
wait()
, so it can be called from anywhere.
// Both are correct!
void produce_1() {
= ...;
Work work .lock();
mu.push_back(work);
queue.signal(); // signal before releasing the lock
cond.unlock();
mu}
void produce_2() {
= ...;
Work work .lock();
mu.push_back(work);
queue.unlock();
mu.signal(); // signal after releasing the lock
cond}
14.4 The `pthread` API
Threading in Linux C
Mutexes and condition variables are the most versatile
synchronization primitives. They are included in all major
multi-threading APIs. On Linux, they are available as the
pthread
API, implemented with futex
as part of
libc.
The C++ threading API is a thin wrapper around pthread
on Linux. It doesn’t matter which API to use, since they are the same
set of synchronization primitives.
Start a thread
A thread is referred to as a handle pthread_t
. The
handle is initialized by creating a new thread with
pthread_create()
.
int pthread_create(
pthread_t *thread, const pthread_attr_t *attr,
void *(*func)(void *arg), void *arg);
Extra options are passed via pthread_attr_t
. It can be
NULL
if not needed.
The new thread starts by executing the specified function, which
takes a void *
argument. To pass more arguments, pass a
pointer to struct.
All pthread functions return an error code instead of messing with
errno
.
Join a thread
Waiting for a thread or process to finish is called “joining”, which
is done with pthread_join()
.
int pthread_join(pthread_t thread, void **retval);
“Join” is part of the “fork and join” concurrency model, where “fork” means spawning new processes or threads. In server-side applications, the lifetime of a thread pool is often the same as the lifetime of the server, in which case there is no need for “join”.
Terminate a thread
Unlike processes, most threads cannot be legitimately killed, even though there is an API. Threads in a process share all resources; if a thread has allocated some memory or acquired some locks, those resources won’t be released when is killed.
A thread either never dies, or it dies naturally.
Mutex
These functions are self-explanatory:
int pthread_mutex_init(
pthread_mutex_t *mutex, const pthread_mutexattr_t *mutexattr);
int pthread_mutex_destroy(pthread_mutex_t *mutex);
int pthread_mutex_lock(pthread_mutex_t *mutex);
int pthread_mutex_unlock(pthread_mutex_t *mutex);
pthread_mutexattr_t
are extra options not needed for
typical apps. None of these functions are expected to fail, except for
unrecoverable programming errors.
Condition variable
Condition variables are explained in the previous section.
int pthread_cond_init(pthread_cond_t *cond, pthread_condattr_t *cond_attr);
int pthread_cond_destroy(pthread_cond_t *cond);
int pthread_cond_signal(pthread_cond_t *cond);
int pthread_cond_broadcast(pthread_cond_t *cond); // wake up all threads
int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex);
int pthread_cond_timedwait(
pthread_cond_t *cond, pthread_mutex_t *mutex,
const struct timespec *abstime);
The new thing is broadcast()
: If the producer creates a
condition that can only be consumed multiple times, such as putting
multiple items in a queue, the producer must signal multiple times, or
wake up all consumers by broadcast()
’ing.
timedwait()
is a variant of wait()
that can
timeout. Surprisingly, it doesn’t use monotonic time by default, which
can be fixed with an option:
(&cond_attr, CLOCK_MONOTONIC);
pthread_condattr_setclock(&cond, &cond_attr); pthread_cond_init
14.5 Code a thread pool
Concurrent queue
A thread pool has a fixed number of consumer threads, called “workers”. An unspecified number of producers can issue tasks to workers via a queue. Consumers sleep when the queue is empty, until they are woken up by a producer, as illustrated by the condition variable introduction.
struct TheadPool {
std::vector<pthread_t> threads;
std::deque<Work> queue;
pthread_mutex_t mu;
pthread_cond_t not_empty;
};
The “task” is just a function pointer with a void *
argument.
struct Work {
void (*f)(void *) = NULL;
void *arg = NULL;
};
The producer (event loop)
Let’s turn the pseudo code produce()
into real code.
void thread_pool_queue(TheadPool *tp, void (*f)(void *), void *arg) {
(&tp->mu);
pthread_mutex_lock->queue.push_back(Work {f, arg});
tp(&tp->not_empty);
pthread_cond_signal(&tp->mu);
pthread_mutex_unlock}
Then we can do the sorted set destruction in the thread pool.
// previous `entry_del()`
static void entry_del_sync(Entry *ent) {
if (ent->type == T_ZSET) {
(&ent->zset);
zset_clear}
delete ent;
}
// a wrapper function for the thread pool
static void entry_del_func(void *arg) {
((Entry *)arg);
entry_del_sync}
// new `entry_del()`
static void entry_del(Entry *ent) {
// unlink it from any data structures
(ent, -1); // remove from the heap data structure
entry_set_ttl// run the destructor in a thread pool for large data structures
size_t set_size = (ent->type == T_ZSET) ? hm_size(&ent->zset.hmap) : 0;
const size_t k_large_container_size = 1000;
if (set_size > k_large_container_size) {
(&g_data.thread_pool, &entry_del_func, ent);
thread_pool_queue} else {
(ent); // small; avoid context switches
entry_del_sync}
}
Deleting a large sorted set won’t freeze the event loop, but frequent context switching can degrade system performance, so we added a threshold for the sorted set size.
The consumers (workers)
The thread start function takes the thread pool as an argument.
void thread_pool_init(TheadPool *tp, size_t num_threads) {
(&tp->mu, NULL);
pthread_mutex_init(&tp->not_empty, NULL);
pthread_cond_init->threads.resize(num_threads);
tpfor (size_t i = 0; i < num_threads; ++i) {
int rv = pthread_create(&tp->threads[i], NULL, &worker, tp);
assert(rv == 0);
}
}
worker()
is the real code of
consumer()
.
static void *worker(void *arg) {
*tp = (TheadPool *)arg;
TheadPool while (true) {
(&tp->mu);
pthread_mutex_lock// wait for the condition: a non-empty queue
while (tp->queue.empty()) {
(&tp->not_empty, &tp->mu);
pthread_cond_wait}
// condition met, consume the queue
= tp->queue.front();
Work w ->queue.pop_front();
tp(&tp->mu);
pthread_mutex_unlock// do the work
.f(w.arg);
w}
return NULL;
}
The thread pool is created in main()
:
int main() {
(&g_data.thread_pool, 4);
thread_pool_init// ...
}
14.6 More to learn
Bounded queue
In our case, the consumers (workers) block on an empty queue, and the producer (event loop) never blocks. However, in some use cases, there is a size limit on the queue, so when the queue is full, the producer should either:
- block (sleep).
- do not block and do nothing, but a notification mechanism is needed.
A bounded queue, where both producers and consumers can block, is a common building block for multi-threading programs. It needs 2 condition variables for both the producers and the consumers. Try coding this as an exercise:
struct BlockingQueue {
std::deque<Work> queue;
size_t limit; // maximum size
pthread_mutex_t mu;
pthread_cond_t not_empty; // consumer blocks
pthread_cond_t not_full; // producer blocks
void produce(Work work);
();
Work consume};
Send the result back to the event loop
Our thread pool is fire-and-forget, but sometimes the producer needs to do something with the result. This is easily done with any synchronization primitive. An example using a semaphore:
struct Result {
;
T value;
Semaphore done() : done(0) {}
Result};
// application logic
void caller() {
;
Result result(&result);
submit_some_async_task// block on the semaphore
.done.decrease();
result// the result is ready
(result.value);
do_something_with}
// in another thread
void some_async_task(Result *result) {
// produce the result ...
->value = ...;
result// unblock the caller
->done.increase();
result}
But if the caller is in an event loop, it cannot block. So
do_something_with()
cannot be done in the event loop unless
there is a way to send the result back.
If do_something_with()
involves IO, such as replying to
the client based on the result, it must be executed in the event loop.
We can put the result in a queue, but the consumer (the event loop)
cannot block. So there should be a way to wake up the event loop
from the other threads.
This is usually done with a Unix pipe. The event loop
poll()
s for the read end of a pipe, and the other threads
write to the write end of the pipe, causing poll()
to wake
up so that the event loop can check the result. The result data doesn’t
need to be transferred over the pipe; the notifier can write 1 byte of
junk and the event loop will discard any pipe data.
14.7 What we learned
- Synchronization primitives: mutex & condition variable.
- Concurrent queue as a multi-threading building block.
Source code: