11. The AVL Tree and the Sorted Set
11.1 The Sorted Set Data Type
Based on the AVL tree in the last chapter, the sorted set data structure can be easily added. The structure definition:
struct ZSet {
*tree = NULL;
AVLNode ;
HMap hmap};
struct ZNode {
;
AVLNode tree;
HNode hmapdouble score = 0;
size_t len = 0;
char name[0];
};
static ZNode *znode_new(const char *name, size_t len, double score) {
*node = (ZNode *)malloc(sizeof(ZNode) + len);
ZNode assert(node); // not a good idea in real projects
(&node->tree);
avl_init->hmap.next = NULL;
node->hmap.hcode = str_hash((uint8_t *)name, len);
node->score = score;
node->len = len;
node(&node->name[0], name, len);
memcpyreturn node;
}
The sorted set is a sorted list of pairs of
(score, name)
that supports query or update by the sorting
key, or by the name
. It’s a combination of the AVL tree and
hashtable, and the pair node belongs to both, which demonstrates the
flexibility of intrusive data structures. The name
string
is embedded at the end of the pair node, in the hope of saving up some
space overheads.
11.2 Insertion and Deletion
The function for tree insertion is roughly the same as the testing code seen from the previous chapter:
// insert into the AVL tree
static void tree_add(ZSet *zset, ZNode *node) {
if (!zset->tree) {
->tree = &node->tree;
zsetreturn;
}
*cur = zset->tree;
AVLNode while (true) {
**from = zless(&node->tree, cur) ? &cur->left : &cur->right;
AVLNode if (!*from) {
*from = &node->tree;
->tree.parent = cur;
node->tree = avl_fix(&node->tree);
zsetbreak;
}
= *from;
cur }
}
zless
is the helper function for comparing two
pairs:
// compare by the (score, name) tuple
static bool zless(
*lhs, double score, const char *name, size_t len)
AVLNode {
*zl = container_of(lhs, ZNode, tree);
ZNode if (zl->score != score) {
return zl->score < score;
}
int rv = memcmp(zl->name, name, min(zl->len, len));
if (rv != 0) {
return rv < 0;
}
return zl->len < len;
}
static bool zless(AVLNode *lhs, AVLNode *rhs) {
*zr = container_of(rhs, ZNode, tree);
ZNode return zless(lhs, zr->score, zr->name, zr->len);
}
The insertion/update subroutines:
// update the score of an existing node (AVL tree reinsertion)
static void zset_update(ZSet *zset, ZNode *node, double score) {
if (node->score == score) {
return;
}
->tree = avl_del(&node->tree);
zset->score = score;
node(&node->tree);
avl_init(zset, node);
tree_add}
// add a new (score, name) tuple, or update the score of the existing tuple
bool zset_add(ZSet *zset, const char *name, size_t len, double score) {
*node = zset_lookup(zset, name, len);
ZNode if (node) {
(zset, node, score);
zset_updatereturn false;
} else {
= znode_new(name, len, score);
node (&zset->hmap, &node->hmap);
hm_insert(zset, node);
tree_addreturn true;
}
}
// lookup by name
*zset_lookup(ZSet *zset, const char *name, size_t len) {
ZNode // just a hashtable look up
// code omitted...
}
11.3 Rank-Based Queries
Here is the primary use case of sorted sets: the range query.
// find the (score, name) tuple that is greater or equal to the argument,
// then offset relative to it.
*zset_query(
ZNode *zset, double score, const char *name, size_t len, int64_t offset)
ZSet {
*found = NULL;
AVLNode *cur = zset->tree;
AVLNode while (cur) {
if (zless(cur, score, name, len)) {
= cur->right;
cur } else {
= cur; // candidate
found = cur->left;
cur }
}
if (found) {
= avl_offset(found, offset);
found }
return found ? container_of(found, ZNode, tree) : NULL;
}
The range query is just a regular binary tree look-up, followed by an offset operation. The offset operation is what makes the sorted set special, it is not a regular binary tree walk.
Let’s review the AVLNode
:
struct AVLNode {
uint32_t depth = 0;
uint32_t cnt = 0;
*left = NULL;
AVLNode *right = NULL;
AVLNode *parent = NULL;
AVLNode };
It has an extra cnt
field (the size of the tree), which
is not explained in the previous chapter. It is used by the
avl_offset
function:
// offset into the succeeding or preceding node.
// note: the worst-case is O(log(n)) regardless of how long the offset is.
*avl_offset(AVLNode *node, int64_t offset) {
AVLNode int64_t pos = 0; // relative to the starting node
while (offset != pos) {
if (pos < offset && pos + avl_cnt(node->right) >= offset) {
// the target is inside the right subtree
= node->right;
node += avl_cnt(node->left) + 1;
pos } else if (pos > offset && pos - avl_cnt(node->left) <= offset) {
// the target is inside the left subtree
= node->left;
node -= avl_cnt(node->right) + 1;
pos } else {
// go to the parent
*parent = node->parent;
AVLNode if (!parent) {
return NULL;
}
if (parent->right == node) {
-= avl_cnt(node->left) + 1;
pos } else {
+= avl_cnt(node->right) + 1;
pos }
= parent;
node }
}
return node;
}
With the size information embedded in the node, we can determine
whether the offset target is inside a subtree or not. The offset
operation runs in two phases: firstly, it walks up along the tree if the
target is not in a subtree, then it walks down the tree, narrowing the
distance until the target is met. The worst-case is
O(log(n))
regardless of how long the offset is, which is
better than offsetting by walking to the succeeding node one by one
(best-case of O(offset)
). The real Redis project uses a
similar technique for skip lists.
It is a good idea to stop and test the new avl_offset
function now.
static void test_case(uint32_t sz) {
;
Container cfor (uint32_t i = 0; i < sz; ++i) {
(c, i);
add}
*min = c.root;
AVLNode while (min->left) {
= min->left;
min }
for (uint32_t i = 0; i < sz; ++i) {
*node = avl_offset(min, (int64_t)i);
AVLNode assert(container_of(node, Data, node)->val == i);
for (uint32_t j = 0; j < sz; ++j) {
int64_t offset = (int64_t)j - (int64_t)i;
*n2 = avl_offset(node, offset);
AVLNode assert(container_of(n2, Data, node)->val == j);
}
assert(!avl_offset(node, -(int64_t)i - 1));
assert(!avl_offset(node, sz - i));
}
(c.root);
dispose}
11.4 New Commands & Testing
For now, we have implemented major functionalities of the sorted set. Let’s add the sorted set type to our server.
enum {
= 0,
T_STR = 1,
T_ZSET };
// the structure for the key
struct Entry {
struct HNode node;
std::string key;
std::string val;
uint32_t type = 0;
*zset = NULL;
ZSet };
The rest of the code is considered trivial, which will be omitted in the code listing.
Adding a Python script for testing new commands:
= r'''
CASES $ ./client zscore asdf n1
(nil)
$ ./client zquery xxx 1 asdf 1 10
(arr) len=0
(arr) end
# more cases...
'''
import shlex
import subprocess
= []
cmds = []
outputs = CASES.splitlines()
lines for x in lines:
= x.strip()
x if not x:
continue
if x.startswith('$ '):
2:])
cmds.append(x['')
outputs.append(else:
-1] = outputs[-1] + x + '\n'
outputs[
assert len(cmds) == len(outputs)
for cmd, expect in zip(cmds, outputs):
= subprocess.check_output(shlex.split(cmd)).decode('utf-8')
out assert out == expect, f'cmd:{cmd} out:{out}'
Exercises:
- The
avl_offset
function gives us the ability to query sorted set by rank, now do the reverse, given a node in an AVL tree, find its rank, with a worst-case ofO(log(n))
. (This is thezrank
command.) - Another sorted set application: count the number of elements within
a range. (also with a worst-case of
O(log(n))
.) - The
11_server.cpp
file already contains some sorted set commands, try adding more.
Source code:
codecrafters.io offers “Build Your Own X” courses in many programming languages.
Including Redis, Git, SQLite, Docker, and more.