11. Sorted Set

11.1 Sorted set interfaces

Three types of queries

A sorted set is a collection of sorted (score, name) pairs. The real Redis has many sorted set commands, but they are just interfaces to 3 types of queries:

  1. Point query: Find a single pair by name.
  2. Range query:
    • Seek to the nearest pair by (score, name).
    • Iterate from a pair in sorted order.
  3. Rank query:
    • Find the nth pair in sort order.
    • Find the position of a given pair in sort order.

Hashtables can only do point queries. Databases have both point and range queries because they are based on trees. Rank queries are a unique Redis feature, requiring augmented tree data structures, which will be explained later.

Two types of updates

  1. Insert a new pair.
  2. Query existing pairs, then update or delete them.

Sorted set commands in real Redis

Point query commands in real Redis:

  • Zadd key score name: Add a new pair or update an existing pair by name.
  • Zscore key name: Find a pair by name, return the score.
  • Zrem key name: Find a pair by name, then delete it.

Range query commands in real Redis:

  • Zscan: Iterate through the whole collection.
  • ZrangeByScore: Range query by score without name.
  • ZrangeByLex: Assume same score, range query by name only.
  • Zrange: Another syntax for ZrangeByScore or ZrangeByLex.
  • Zrev*: Range query in descending order.
  • ZremRangeByScore, ZremRangeByLex: Range query, then delete the results.

Rank queries in real Redis:

  • Zrank: Find a pair by name, return its rank (position in sort order).
  • The offset argument in range queries is based on finding the pair by its rank.

Simplified sorted set interface

Although there are only 3 types of queries, there are many sorted set commands in real Redis, none of which uses the full functionality of sorted sets. The pairs are ordered by (score, name), but ZrangeByScore uses only score, ZrangeByLex uses only name, and Zscan uses none.

Let’s replace them with a simpler, more generic interface:

ZQUERY key score name offset limit

It returns pairs that are greater than or equal to (score, name) using tuple comparison.

Time complexity

The complexity of the user interface is not worth copying. What’s worth copying is the optimal time complexity in real Redis.

Operation Average Worst Data structure
Add or remove a pair O(log N) O(N) Hashtable & tree insert
Point query by name O(1) O(N) Hashtable lookup
Seek by (score, name) O(log N) O(log N) Tree lookup
Iterate in sort order O(1) O(log N) Tree iteration
Find the rank of a pair O(log N) O(log N) Augmented tree
Seek by rank O(log N) O(log N) Augmented tree

Hashtables are optimal for unordered indexes; nothing is faster than O(1). Comparison-based sorting is O(log N) per item, which is achieved by balanced trees.

The new thing is the rank query, which is used for offsetting. In a database, offsetting a range query by d is at least O(d), because it walks to the next d-th item one by one. That’s why pagination with offsets is a bad idea in SQL. Redis augments the data structure so that it can find the d-th item in O(log N); offsetting by a huge number is not a problem for sorted sets.

11.2 Sorted set data types

The sorted set API

A sorted set is a collection of sorted (score, name) pairs indexed in 2 ways.

// pseudo code
struct SortedSet {
    std::set<std::pair<double, std::string>> by_score;
    std::unordered_map<std::string, double>  by_name;
};

With intrusive data structures, there is only 1 copy of the data:

struct ZSet {
    AVLNode *root = NULL;   // index by (score, name)
    HMap hmap;              // index by name
};

struct ZNode {
    // data structure nodes
    AVLNode tree;
    HNode   hmap;
    // data
    double  score = 0;
    size_t  len = 0;
    char    name[0];        // flexible array
};

We’ll implement the following point queries and updates:

bool   zset_insert(ZSet *zset, const char *name, size_t len, double score);
ZNode *zset_lookup(ZSet *zset, const char *name, size_t len);
void   zset_delete(ZSet *zset, ZNode *node);

Flexible arrays

In C, a zero-length array at the end of the struct can use any extra space.

struct ZNode {
    // ...
    size_t  len = 0;
    char    name[0];        // flexible array
};

This is used to embed the string into the node to reduce memory allocations.

static ZNode *znode_new(const char *name, size_t len, double score) {
    ZNode *node = (ZNode *)malloc(sizeof(ZNode) + len); // struct + array
    avl_init(&node->tree);
    node->hmap.next = NULL;
    node->hmap.hcode = str_hash((uint8_t *)name, len);
    node->score = score;
    node->len = len;
    memcpy(&node->name[0], name, len);
    return node;
}

C++ doesn’t know about flexible arrays, so you can’t new the struct. Thus we have an allocation function using malloc(). We should also add the pairing deallocation function so that the details of the allocation method do not leak.

static void znode_del(ZNode *node) {
    free(node);
}

Multiple value types

There are 2 types of values: string and sorted set. Let’s add them to Entry.

enum {
    T_INIT  = 0,
    T_STR   = 1,    // string
    T_ZSET  = 2,    // sorted set
};

struct Entry {
    struct HNode node;  // hashtable node
    std::string key;
    // value
    uint32_t type = 0;
    // one of the following
    std::string str;
    ZSet zset;
};

Side node: Tagged union

It’s a waste of space to have both values in Entry; we can use a tagged union.

struct Entry {
    // ...
    uint32_t type = 0;
    union {
        std::string str;
        ZSet zset;
    };
};

But the problem is that std::string has a constructor and a destructor, and C++ doesn’t know how to construct or destruct a union. So we have to call the constructor and destructor manually.

struct Entry {
    // ...
    explicit Entry(uint32_t type) : type(type) {
        if (type == T_STR) {
            new (&str) std::string;     // call the constructor
        } else if (type == T_ZSET) {
            new (&zset) ZSet;
        }
    }
    ~Entry() {
        if (type == T_STR) {
            str.~basic_string();        // call the destructor
        } else if (type == T_ZSET) {
            zset_clear(&zset);
        }
    }
};

The syntax for these is a bit crazy. I prefer C-style code with plain functions.

Side node: Runtime polymorphism

An alternative to a tagged union is a class hierarchy with runtime type info.

// base class without the value
struct Entry {
    // common fields ...
    virtual ~Entry() {}     // must present
};
// subclasses with a value
struct EntryKV : Entry {
    std::string str;
};
struct EntryZSet : Entry {
    ZSet zset;
    virtual ~EntryZSet() {
        zset_clear(&zset);
    }
};

Entry is created as one of its subclasses when adding new keys. But for lookups, there must be a way to tell which one it is.

Entry *ent = lookup();  // base class
EntryZSet *ent_zset = dynamic_cast<EntryZSet *>(ent);   // runtime type info
if (ent_zset) {
    // a certain subclass
}

dynamic_cast() can determine which subclass it is iff virtual methods are present, because the type info is associated with the vtable. This is satisfied by the virtual destructor. Destructors must be virtual when runtime polymorphism is used, because they can be called from pointers to the base class.

Runtime polymorphism can be done in plain C with tags or function pointers. You don’t need to practice these C++-isms in real projects.

11.3 Sorted set lookup, insert, and delete

Lookup by name

Lookup by name is just a hashtable lookup.

ZNode *zset_lookup(ZSet *zset, const char *name, size_t len) {
    if (!zset->tree) {
        return NULL;
    }
    HKey key;
    key.node.hcode = str_hash((uint8_t *)name, len);
    key.name = name;
    key.len = len;
    HNode *found = hm_lookup(&zset->hmap, &key.node, &hcmp);
    return found ? container_of(found, ZNode, hmap) : NULL;
}

HKey is a helper struct for the hashtable key compare function.

struct HKey {
    HNode node;
    const char *name = NULL;
    size_t len = 0;
};

static bool hcmp(HNode *node, HNode *key) {
    ZNode *znode = container_of(node, ZNode, hmap);
    HKey *hkey = container_of(key, HKey, node);
    if (znode->len != hkey->len) {
        return false;
    }
    return 0 == memcmp(znode->name, hkey->name, znode->len);
}

AVL tree insert

Tree insert is the same as search_and_insert() from the last chapter.

static void tree_insert(ZSet *zset, ZNode *node) {
    AVLNode *parent = NULL;         // insert under this node
    AVLNode **from = &zset->root;   // the incoming pointer to the next node
    while (*from) {                 // tree search
        parent = *from;
        from = zless(&node->tree, parent) ? &parent->left : &parent->right;
    }
    *from = &node->tree;            // attach the new node
    node->tree.parent = parent;
    zset->root = avl_fix(&node->tree);
}

zless() is the tuple comparison function.

// (lhs.score, lhs.name) < (rhs.score, rhs.name)
static bool zless(AVLNode *lhs, AVLNode *rhs) {
    ZNode *zl = container_of(lhs, ZNode, tree);
    ZNode *zr = container_of(rhs, ZNode, tree);
    if (zl->score != zr->score) {
        return zl->score < zl->score;
    }
    int rv = memcmp(zl->name, zr->name, min(zl->len, zr->len));
    return (rv != 0) ? (rv < 0) : (zl->len < zr->len);
}

Insert or update

The Zadd command must handle the case where the pair already exists.

bool zset_insert(ZSet *zset, const char *name, size_t len, double score) {
    if (ZNode *node = zset_lookup(zset, name, len)) {
        zset_update(zset, node, score);
        return false;
    }
    ZNode *node = znode_new(name, len, score);
    hm_insert(&zset->hmap, &node->hmap);
    tree_insert(zset, node);
    return true;
}

Detaching and re-inserting the AVL node will fix the order if the score changes.

static void zset_update(ZSet *zset, ZNode *node, double score) {
    // detach the tree node
    zset->root = avl_del(&node->tree);
    avl_init(&node->tree);
    // reinsert the tree node
    node->score = score;
    tree_insert(zset, node);
}

Delete a pair

We don’t have a search-and-delete function because delete can be done with a node reference. Having search and delete in separate functions is more flexible because there are multiple ways to search.

void zset_delete(ZSet *zset, ZNode *node) {
    // remove from the hashtable
    HKey key;
    key.node.hcode = node->hmap.hcode;
    key.name = node->name;
    key.len = node->len;
    HNode *found = hm_delete(&zset->hmap, &key.node, &hcmp);
    assert(found);
    // remove from the tree
    zset->root = avl_del(&node->tree);
    // deallocate the node
    znode_del(node);
}

11.4 Sorted set range query

The range query command: ZQUERY key score name offset limit.

  1. Seek to the first pair where pair >= (score, name).
  2. Walk to the n-th successor/predecessor (offset).
  3. Iterate and output.
ZNode *zset_seekge(ZSet *zset, double score, const char *name, size_t len);
ZNode *znode_offset(ZNode *node, int64_t offset);
ZNode *zset_seekge(ZSet *zset, double score, const char *name, size_t len) {
    AVLNode *found = NULL;
    for (AVLNode *node = zset->root; node; ) {
        if (zless(node, score, name, len)) {
            node = node->right; // node < key
        } else {
            found = node;       // candidate
            node = node->left;
        }
    }
    return found ? container_of(found, ZNode, tree) : NULL;
}

Offset and iterate

Iterate is just offset by ±1, which is just walking the AVL tree.

// offset into the succeeding or preceding node.
AVLNode *avl_offset(AVLNode *node, int64_t offset);

ZNode *znode_offset(ZNode *node, int64_t offset) {
    AVLNode *tnode = node ? avl_offset(&node->tree, offset) : NULL;
    return tnode ? container_of(tnode, ZNode, tree) : NULL;
}

avl_offset() will be coded later. We can code ZQUERY now.

static void do_zquery(std::vector<std::string> &cmd, Buffer &out) {
    // parse the arguments and lookup the KV pair
    // ...
    // 1. seek to the key
    ZNode *znode = zset_seekge(zset, score, name.data(), name.size());
    // 2. offset
    znode = znode_offset(znode, offset);
    // 3. iterate and output
    size_t ctx = out_begin_arr(out);
    int64_t n = 0;
    while (znode && n < limit) {
        out_str(out, znode->name, znode->len);
        out_dbl(out, znode->score);
        znode = znode_offset(znode, +1);
        n += 2;
    }
    out_end_arr(out, ctx, (uint32_t)n);
}

The number of pairs is not known until the end of the iteration, so the serialized array size is a dummy value that will be patched later.

static size_t out_begin_arr(Buffer &out) {
    out.push_back(TAG_ARR);
    buf_append_u32(out, 0);     // filled by out_end_arr()
    return out.size() - 4;      // the `ctx` arg
}
static void out_end_arr(Buffer &out, size_t ctx, uint32_t n) {
    assert(out[ctx - 1] == TAG_ARR);
    memcpy(&out[ctx], &n, 4);
}

Exercise: Add a reversed version that does seek and iterate in descending order.

11.5 Sorted set rank query

Regular tree traversal

The naive way to offset a node is to traverse the tree one step at a time.

AVLNode *avl_offset(AVLNode *node, int64_t offset) {
    for (; offset > 0 && node; offset--) {
        node = successor(node);
    }
    for (; offset < 0 && node; offset++) {
        node = predecessor(node);
    }
    return node;
}

Finding the successor is O(log N) in the worst case but O(1) on average. So offsetting a node by d one by one is O(d) on average.

static AVLNode *successor(AVLNode *node) {
    // find the leftmost node in the right subtree
    if (node->right) {
        for (node = node->right; node->left; node = node->left) {}
        return node;
    }
    // find the ancestor where I'm the rightmost node in the left subtree
    while (AVLNode *parent = node->parent) {
        if (node == parent->left) {
            return parent;
        }
        node = parent;
    }
    return NULL;
}
static AVLNode *predecessor(AVLNode *node);     // mirror

The subtree size augmentation

We’ll augment the tree to do rank queries:

  • Find the nth pair in sort order.
  • Find the rank of a given pair in sort order.

To offset a node by d, find the current rank r, then find the r + d ranked node. Both steps are O(log N) in the worst case regardless of d.

This is done by augmenting the tree node with the subtree size.

struct AVLNode {
    AVLNode *parent = NULL;
    AVLNode *left = NULL;
    AVLNode *right = NULL;
    uint32_t height = 0;
    uint32_t cnt = 0;       // augmented: subtree size
};

inline uint32_t avl_cnt(AVLNode *node) { return node ? node->cnt : 0; }

This subtree size is propagated along with the subtree height.

static void avl_update(AVLNode *node) {
    node->height = 1 + max(avl_height(node->left), avl_height(node->right));
    node->cnt = 1 + avl_cnt(node->left) + avl_cnt(node->right);     // added
}

Rank-based tree traversal

The rank between a parent and a child (B & D) has the following relationship:

             D (rank=r+s+1)        B (rank=r)
         ┌───┴───┐             ┌───┴───┐
(rank=r) B       E             A       D (rank=r+s+1)
       ┌─┴─┐                         ┌─┴─┐
       A   C (size=s)       (size=s) C   E
  • Observation 1: The rank difference between a parent and a child is the subtree size between them plus 1. So the rank difference to any node can be calculated by following the path.
  • Observation 2: The maximum rank difference within a subtree plus 1 is the subtree size. So we can know whether a target rank is in a subtree or not.

This means that offsetting a node by d is just going up or down until the rank difference is d.

AVLNode *avl_offset(AVLNode *node, int64_t offset) {
    int64_t pos = 0;    // the rank difference from the starting node
    while (offset != pos) {
        if (pos < offset && pos + avl_cnt(node->right) >= offset) {
            // the target is inside the right subtree
            node = node->right;
            pos += avl_cnt(node->left) + 1;
        } else if (pos > offset && pos - avl_cnt(node->left) <= offset) {
            // the target is inside the left subtree
            node = node->left;
            pos -= avl_cnt(node->right) + 1;
        } else {
            // go to the parent
            AVLNode *parent = node->parent;
            if (!parent) {
                return NULL;
            }
            if (parent->right == node) {
                pos -= avl_cnt(node->left) + 1;
            } else {
                pos += avl_cnt(node->right) + 1;
            }
            node = parent;
        }
    }
    return node;
}

The algorithm finds a path to the target node via the lowest common ancestor. It goes up at most once and goes down at most once. So it’s O(log N) in the worst case.

Absolute rank

avl_offset() uses rank differences to offset, we can also use rank differences to find the absolute rank of a node, which implements the ZRank command.

// returns the rank of node in sorted order
int64_t avl_rank(AVLNode *node);

This is left to the reader as an exercise. You can also implement the ZCount command, which counts the number of pairs in a range without iterating through it.

Order statistic tree

The idea of rank differences is applicable to any tree-like data structures, including the Skip list used in real Redis. This is called order statistic tree in textbooks.

If applied to the B+tree, databases can also offset or count in O(log N). Unfortunately, databases are very conservative about data structures.

An application is the Rope data structure used by text editors. It’s a tree that stores small pieces of strings. It’s used like a byte array, but it can be indexed and inserted at any position in O(log N).

11.6 What we learned

  • Multi-indexed data with intrusive data structures.
  • Order statistic tree: A fast way to offset a tree node.

Source code: