14. Thread Pool & Asynchronous Tasks
14.1 Queues
There is a flaw in our server since the introduction of the sorted set data type: the deletion of keys. If the size of a sorted set is huge, it can take a long time to free its nodes and the server is stalled during the destruction of the key. This can be easily fixed by using multi-threading to move the destructor away from the main thread.
Firstly, we introduce the “thread pool”, which is literally a pool of
threads. The thread from the pool consumes tasks from a queue and
executes them. It is trivial to code a multi-producer multi-consumer
queue using pthread
APIs. (Although there is only a single
producer in our case.)
The relevant pthread
primitives are
pthread_mutex_t
and pthread_cond_t
; they are
called the mutex and the condition variable respectively. If you are
unfamiliar with them, it is advised to get some education on
multi-threading after reading this chapter. (Such as manpages of the
pthread
APIs, textbooks on operating systems, online
courses, etc.)
Here is a really short introduction to the two pthread
primitives:
- The queue is accessed by multiple threads (both the producer and consumers), so it needs the protection of a mutex, obviously.
- The consumer threads should be sleeping when idle, and only be waken up when the queue is not empty, this is the job of the condition variable.
14.2 The Implementation
The thread pool data type is defined as follows:
struct Work {
void (*f)(void *) = NULL;
void *arg = NULL;
};
struct TheadPool {
std::vector<pthread_t> threads;
std::deque<Work> queue;
pthread_mutex_t mu;
pthread_cond_t not_empty;
};
The thread_pool_init
is for initialization and starting
threads. pthread
types are initialized by
pthread_xxx_init
functions and the
pthread_create
starts a thread with the target function
worker
.
void thread_pool_init(TheadPool *tp, size_t num_threads) {
assert(num_threads > 0);
int rv = pthread_mutex_init(&tp->mu, NULL);
assert(rv == 0);
= pthread_cond_init(&tp->not_empty, NULL);
rv assert(rv == 0);
->threads.resize(num_threads);
tpfor (size_t i = 0; i < num_threads; ++i) {
int rv = pthread_create(&tp->threads[i], NULL, &worker, tp);
assert(rv == 0);
}
}
The consumer code:
static void *worker(void *arg) {
*tp = (TheadPool *)arg;
TheadPool while (true) {
(&tp->mu);
pthread_mutex_lock// wait for the condition: a non-empty queue
while (tp->queue.empty()) {
(&tp->not_empty, &tp->mu);
pthread_cond_wait}
// got the job
= tp->queue.front();
Work w ->queue.pop_front();
tp(&tp->mu);
pthread_mutex_unlock
// do the work
.f(w.arg);
w}
return NULL;
}
The producer code:
void thread_pool_queue(TheadPool *tp, void (*f)(void *), void *arg) {
;
Work w.f = f;
w.arg = arg;
w
(&tp->mu);
pthread_mutex_lock->queue.push_back(w);
tp(&tp->not_empty);
pthread_cond_signal(&tp->mu);
pthread_mutex_unlock}
14.3 pthread APIs
The explanation:
- For both the producer and consumers, the queue access code is
surrounded by the
pthread_mutex_lock
and thepthread_mutex_unlock
, only one thread can access the queue at once. - After a consumer acquired the mutex, check the queue:
- If the queue is not empty, grab a job from the queue, release the mutex and do the work.
- Otherwise, release the mutex and go to sleep, the sleep can be
wakened later by the condition variable. This is accomplished via a
single
pthread_cond_wait
call.
- After the producer
puts a job into the queue, the producer calls the
pthread_cond_signal
to wake up a potentially sleeping consumer. - After a consumer woken up from the
pthread_cond_wait
, the mutex is held again automatically. The consumer must check for the condition again after waking up, if the condition (a non-empty queue) is not satisfied, go back to sleep.
The use of the condition variable needs some more explanations: The
pthread_cond_wait
function is always inside a loop
checking for the condition. This is because the condition could be
changed by other consumers before the wakening consumer grabs the mutex;
the mutex is not transferred from the signaler to the to-be-waked
consumer! It is probably a mistake if you see a condition variable used
without a loop.
A concrete sequence to help you understand the use of condition variables:
- The producer signals.
- The producer releases the mutex.
- Some consumer grabs the mutex and empties the queue.
- A consumer wakes up from the producer’s signal and grabs the mutex, but the queue is empty!
Note that the pthread_cond_signal
doesn’t need to be
protected by the mutex, singaling after releasing the mutex is also
correct.
14.4 Integrating with the Server
The thread pool is done. Let’s add that to our server:
// global variables
static struct {
;
HMap db// a map of all client connections, keyed by fd
std::vector<Conn *> fd2conn;
// timers for idle connections
;
DList idle_list// timers for TTLs
std::vector<HeapItem> heap;
// the thread pool
;
TheadPool tp} g_data;
Inside the main
function:
// some initializations
(&g_data.idle_list);
dlist_init(&g_data.tp, 4); thread_pool_init
The entry_del
function is modified: It will put the
destruction of large sorted sets into the thread pool. And the thread
pool is only for the large ones since multi-threading has some overheads
too.
// deallocate the key immediately
static void entry_destroy(Entry *ent) {
switch (ent->type) {
case T_ZSET:
(ent->zset);
zset_disposedelete ent->zset;
break;
}
delete ent;
}
static void entry_del_async(void *arg) {
((Entry *)arg);
entry_destroy}
// dispose the entry after it got detached from the key space
static void entry_del(Entry *ent) {
(ent, -1);
entry_set_ttl
const size_t k_large_container_size = 10000;
bool too_big = false;
switch (ent->type) {
case T_ZSET:
= hm_size(&ent->zset->hmap) > k_large_container_size;
too_big break;
}
if (too_big) {
(&g_data.tp, &entry_del_async, ent);
thread_pool_queue} else {
(ent);
entry_destroy}
}
Exercises:
- The semaphore is often introduced as a multi-threading primitive instead of the condition variable and the mutex. Try to implement the thread pool using the semaphore.
- Some fun exercises to help you understand these primitives further:
- Implement the mutex using the semaphore. (Trivial)
- Implement the semaphore using the condition variable. (Easy)
- Implement the condition variable using only mutexes. (Intermediate)
- Now that you know these primitives are somewhat equivalent, why should you prefer one to another?
Source code: