06. Event Loop

6.1 Introduction

Rewrite the echo server from chapter 04 into an event loop.

while running:
    want_read = [...]           # socket fds
    want_write = [...]          # socket fds
    can_read, can_write = wait_for_readiness(want_read, want_write) # blocks!
    for fd in can_read:
        data = read_nb(fd)      # non-blocking, only consume from the buffer
        handle_data(fd, data)   # application logic without IO
    for fd in can_write:
        data = pending_data(fd) # produced by the application
        n = write_nb(fd, data)  # non-blocking, only append to the buffer
        data_written(fd, n)     # n <= len(data), limited by the available space

Application code vs. event loop code

Some libraries can abstract away the event loop: the event loop code interacts with the application code via callbacks, and the application code interacts with the event loop via a well-defined API. We are not writing a library, but there is still an implicit boundary between the event loop code and the application code.

6.2 Per-connection state

With an even loop, an application task can span multiple loop iterations, so the state must be explicitly stored somewhere. Here is our per-connection state:

struct Conn {
    int fd = -1;
    // application's intention, for the event loop
    bool want_read = false;
    bool want_write = false;
    bool want_close = false;
    // buffered input and output
    std::vector<uint8_t> incoming;  // data to be parsed by the application
    std::vector<uint8_t> outgoing;  // responses generated by the application
};
  • Conn::want_read & Conn::want_write represents the fd list for the readiness API.
  • Conn::want_close tells the event loop to destroy the connection.
  • Conn::incoming buffers data from the socket for the protocol parser to work on.
  • Conn::outgoing buffers generated responses that are written to the socket.

The need for input buffers

Since reads are now non-blocking, we cannot just wait for n bytes while parsing the protocol; the read_full() function is now irrelevant. We’ll do this instead:

At each loop iteration, if the socket is ready to read:

  1. Do a non-blocking read.
  2. Add new data to the Conn::incoming buffer.
  3. Try to parse the accumulated buffer.
    • If there is not enough data, do nothing in that iteration.
  4. Process the parsed message.
  5. Remove the message from Conn::incoming.

Why buffer output data?

Since writes are now non-blocking, we cannot write to sockets at will; data is written iff the socket is ready to write. A large response may take multiple loop iterations to complete. So the response data must be stored in a buffer (Conn::outgoing).

6.3 The event loop code

Map from fd to connection state

poll() returns a fd list. We need to map each fd to the Conn object.

    // a map of all client connections, keyed by fd
    std::vector<Conn *> fd2conn;

On Unix, an fd is allocated as the smallest available non-negative integer, so the mapping from fd to Conn can be a flat array indexed by fd, and the array will be densely packed. Nothing can be more efficient. Sometimes simple arrays can replace complex data structures like hashtables.

The `poll()` syscall

The readiness API takes a list of fds that the program wants to do IO on, then returns a list of fds ready for IO. There are 2 types of readiness: read and write.

    can_read, can_write = wait_for_readiness(want_read, want_write)

We’ll use poll(), it uses the same fd list for both input and output.

int poll(struct pollfd *fds, nfds_t nfds, int timeout);

struct pollfd {
    int   fd;
    short events;   // request: want to read, write, or both?
    short revents;  // returned: can read? can write?
};
  • The nfds argument is the size of the fds array.
  • The timeout argument is set to -1, which means no timeout.
  • pollfd::events is a combination of POLLIN, POLLOUT, POLLERR:
    • POLLIN & POLLOUT correspond to the want_read & want_write fd list.
    • POLLERR indicates a socket error that we always want to be notified about.
  • pollfd::revents is returned by poll(). It uses the same set of flags to indicate whether the fd is in the can_read or can_write list.

Step 1: Construct the fd list for `poll()`

The application code decides the type of readiness notifications. It communicates with the event loop via the want_read & want_write flags in Conn, the fds argument is then constructed from these flags:

    // a map of all client connections, keyed by fd
    std::vector<Conn *> fd2conn;
    // the event loop
    std::vector<struct pollfd> poll_args;
    while (true) {
        // prepare the arguments of the poll()
        poll_args.clear();
        // put the listening sockets in the first position
        struct pollfd pfd = {fd, POLLIN, 0};
        poll_args.push_back(pfd);
        // the rest are connection sockets
        for (Conn *conn : fd2conn) {
            if (!conn) {
                continue;
            }
            struct pollfd pfd = {conn->fd, POLLERR, 0};
            // poll() flags from the application's intent
            if (conn->want_read) {
                pfd.events |= POLLIN;
            }
            if (conn->want_write) {
                pfd.events |= POLLOUT;
            }
            poll_args.push_back(pfd);
        }

        // more ...
    }

Step 2: Call `poll()`

    // the event loop
    while (true) {
        // prepare the arguments of the poll()
        // ...

        // wait for readiness
        int rv = poll(poll_args.data(), (nfds_t)poll_args.size(), -1);
        if (rv < 0 && errno == EINTR) {
            continue;   // not an error
        }
        if (rv < 0) {
            die("poll");
        }

        // ...
    }

poll() is the only blocking syscall in the entire program. Normally it returns when at least one of the fds is ready. However, it may occasionally return with errno = EINTR even if nothing is ready.

If a process receives a Unix signal during a blocking syscall, the syscall is immediately returned with EINTR to give the process a chance to handle the signal. EINTR is not expected for non-blocking syscalls.

EINTR is not an error, the syscall should be retried. Even if you do not use signals, you should still handle EINTR, because there may be unexpected sources of signals.

Step 3: Accept new connections

We put the listening socket at position 0 on the fd list.

    // the event loop
    while (true) {
        // prepare the arguments of the poll()
        poll_args.clear();
        // put the listening sockets in the first position
        struct pollfd pfd = {fd, POLLIN, 0};
        poll_args.push_back(pfd);
        // ...

        // wait for readiness
        int rv = poll(poll_args.data(), (nfds_t)poll_args.size(), -1);
        // ...

        // handle the listening socket
        if (poll_args[0].revents) {
            if (Conn *conn = handle_accept(fd)) {
                // put it into the map
                if (fd2conn.size() <= (size_t)conn->fd) {
                    fd2conn.resize(conn->fd + 1);
                }
                fd2conn[conn->fd] = conn;
            }
        }
        // ...
    }   // the event loop

accept() is treated as read() in readiness notifications, so it uses POLLIN. After poll() returns, check the 1st fd to see if we can accept().

handle_accept() creates the Conn object for the new connection. We’ll code this later.

Step 4: Invoke application callbacks

The rest of the fd list is for connection sockets. Call the application code if they are ready for IO.

    while (true) {
        // ...
        // wait for readiness
        int rv = poll(poll_args.data(), (nfds_t)poll_args.size(), -1);
        // ...

        // handle connection sockets
        for (size_t i = 1; i < poll_args.size(); ++i) { // note: skip the 1st
            uint32_t ready = poll_args[i].revents;
            Conn *conn = fd2conn[poll_args[i].fd];
            if (ready & POLLIN) {
                handle_read(conn);  // application logic
            }
            if (ready & POLLOUT) {
                handle_write(conn); // application logic
            }
        }
    }

Step 5: Terminate connections

We always poll() for POLLERR on connection sockets, so we can destroy the connection on error. Application code can also set Conn::want_close to request the event loop to destroy the connection.

        // handle connection sockets
        for (size_t i = 1; i < poll_args.size(); ++i) {
            uint32_t ready = poll_args[i].revents;
            Conn *conn = fd2conn[poll_args[i].fd];
            // read & write ...

            // close the socket from socket error or application logic
            if ((ready & POLLERR) || conn->want_close) {
                (void)close(conn->fd);
                fd2conn[conn->fd] = NULL;
                delete conn;
            }
        }

You can add a callback handle_err() to let the application code handle the error, but there is nothing to do in our application, so we just close the socket here.

6.4 Application code with non-blocking IO

Non-blocking `accept()`

Before entering the event loop, make the listening socket non-blocking with fcntl.

static void fd_set_nb(int fd) {
    fcntl(fd, F_SETFL, fcntl(fd, F_GETFL, 0) | O_NONBLOCK);
}

Then the event loop calls back the application code to do accept().

static Conn *handle_accept(int fd) {
    // accept
    struct sockaddr_in client_addr = {};
    socklen_t socklen = sizeof(client_addr);
    int connfd = accept(fd, (struct sockaddr *)&client_addr, &socklen);
    if (connfd < 0) {
        return NULL;
    }
    // set the new connection fd to nonblocking mode
    fd_set_nb(connfd);
    // create a `struct Conn`
    Conn *conn = new Conn();
    conn->fd = connfd;
    conn->want_read = true; // read the 1st request
    return conn;
}

The connection socket is also made non-blocking, waiting for its 1st read.

Protocol parser with non-blocking read

See the comments for each sub-step.

static void handle_read(Conn *conn) {
    // 1. Do a non-blocking read.
    uint8_t buf[64 * 1024];
    ssize_t rv = read(conn->fd, buf, sizeof(buf));
    if (rv <= 0) {  // handle IO error (rv < 0) or EOF (rv == 0)
        conn->want_close = true;
        return;
    }
    // 2. Add new data to the `Conn::incoming` buffer.
    buf_append(conn->incoming, buf, (size_t)rv);
    // 3. Try to parse the accumulated buffer.
    // 4. Process the parsed message.
    // 5. Remove the message from `Conn::incoming`.
    try_one_request(conn)
    // ...
}

The handling is split into try_one_request(). If there is not enough data, it will do nothing until a future loop iteration with more data.

// process 1 request if there is enough data
static bool try_one_request(Conn *conn) {
    // 3. Try to parse the accumulated buffer.
    // Protocol: message header
    if (conn->incoming.size() < 4) {
        return false;   // want read
    }
    uint32_t len = 0;
    memcpy(&len, conn->incoming.data(), 4);
    if (len > k_max_msg) {  // protocol error
        conn->want_close = true;
        return false;   // want close
    }
    // Protocol: message body
    if (4 + len > conn->incoming.size()) {
        return false;   // want read
    }
    const uint8_t *request = &conn->incoming[4];
    // 4. Process the parsed message.
    // ...
    // generate the response (echo)
    buf_append(conn->outgoing, (const uint8_t *)&len, 4);
    buf_append(conn->outgoing, request, len);
    // 5. Remove the message from `Conn::incoming`.
    buf_consume(conn->incoming, 4 + len);
    return true;        // success
}

We use std::vector as the buffer type, which is just a dynamic array.

// append to the back
static void
buf_append(std::vector<uint8_t> &buf, const uint8_t *data, size_t len) {
    buf.insert(buf.end(), data, data + len);
}
// remove from the front
static void buf_consume(std::vector<uint8_t> &buf, size_t n) {
    buf.erase(buf.begin(), buf.begin() + n);
}

Non-blocking write

There is no application logic, just write some data and remove it from the buffer. write() can return less bytes and that’s OK because the event loop will call it again.

static void handle_write(Conn *conn) {
    assert(conn->outgoing.size() > 0);
    ssize_t rv = write(conn->fd, conn->outgoing.data(), conn->outgoing.size());
    if (rv < 0) {
        conn->want_close = true;    // error handling
        return;
    }
    // remove written data from `outgoing`
    buf_consume(conn->outgoing, (size_t)rv);
    // ...
}

State transitions between request and response

In a request-response protocol, the program is either reading a request or writing a response. At the end of handle_read() and handle_write(), we need to switch between the 2 states.

static void handle_read(Conn *conn) {
    // ...
    // update the readiness intention
    if (conn->outgoing.size() > 0) {    // has a response
        conn->want_read = false;
        conn->want_write = true;
    }   // else: want read
}
static void handle_write(Conn *conn) {
    // ...
    if (conn->outgoing.size() == 0) {   // all data written
        conn->want_read = true;
        conn->want_write = false;
    } // else: want write
}

This is not universally true. For example, some proxies and messaging protocols are not request-response and can read and write simultaneously.

That’s all

The protocol is the same as in chapter 04. So you can reuse the test client. The server is the bare minimum that resembles something production grade, but it’s still a toy, go to the next chapter for more advanced stuff.

Source code: