06. Event Loop
6.1 Introduction
Rewrite the echo server from chapter 04 into an event loop.
while running:
= [...] # socket fds
want_read = [...] # socket fds
want_write = wait_for_readiness(want_read, want_write) # blocks!
can_read, can_write for fd in can_read:
= read_nb(fd) # non-blocking, only consume from the buffer
data # application logic without IO
handle_data(fd, data) for fd in can_write:
= pending_data(fd) # produced by the application
data = write_nb(fd, data) # non-blocking, only append to the buffer
n # n <= len(data), limited by the available space data_written(fd, n)
Application code vs. event loop code
Some libraries can abstract away the event loop: the event loop code interacts with the application code via callbacks, and the application code interacts with the event loop via a well-defined API. We are not writing a library, but there is still an implicit boundary between the event loop code and the application code.
6.2 Per-connection state
With an even loop, an application task can span multiple loop iterations, so the state must be explicitly stored somewhere. Here is our per-connection state:
struct Conn {
int fd = -1;
// application's intention, for the event loop
bool want_read = false;
bool want_write = false;
bool want_close = false;
// buffered input and output
std::vector<uint8_t> incoming; // data to be parsed by the application
std::vector<uint8_t> outgoing; // responses generated by the application
};
Conn::want_read
&Conn::want_write
represents the fd list for the readiness API.Conn::want_close
tells the event loop to destroy the connection.Conn::incoming
buffers data from the socket for the protocol parser to work on.Conn::outgoing
buffers generated responses that are written to the socket.
The need for input buffers
Since reads are now non-blocking, we cannot just wait for n bytes
while parsing the protocol; the read_full()
function is now
irrelevant. We’ll do this instead:
At each loop iteration, if the socket is ready to read:
- Do a non-blocking read.
- Add new data to the
Conn::incoming
buffer. - Try to parse the accumulated buffer.
- If there is not enough data, do nothing in that iteration.
- Process the parsed message.
- Remove the message from
Conn::incoming
.
Why buffer output data?
Since writes are now non-blocking, we cannot write to sockets at
will; data is written iff the socket is ready to write. A large response
may take multiple loop iterations to complete. So the response data must
be stored in a buffer (Conn::outgoing
).
6.3 The event loop code
Map from fd to connection state
poll()
returns a fd list. We need to map each fd to the
Conn
object.
// a map of all client connections, keyed by fd
std::vector<Conn *> fd2conn;
On Unix, an fd is allocated as the smallest available non-negative
integer, so the mapping from fd to Conn
can be a flat array
indexed by fd, and the array will be densely packed. Nothing can be more
efficient. Sometimes simple arrays can replace complex data structures
like hashtables.
The `poll()` syscall
The readiness API takes a list of fds that the program wants to do IO on, then returns a list of fds ready for IO. There are 2 types of readiness: read and write.
= wait_for_readiness(want_read, want_write) can_read, can_write
We’ll use poll()
, it uses the same fd list for both
input and output.
int poll(struct pollfd *fds, nfds_t nfds, int timeout);
struct pollfd {
int fd;
short events; // request: want to read, write, or both?
short revents; // returned: can read? can write?
};
- The
nfds
argument is the size of thefds
array. - The
timeout
argument is set to -1, which means no timeout. pollfd::events
is a combination ofPOLLIN
,POLLOUT
,POLLERR
:POLLIN
&POLLOUT
correspond to thewant_read
&want_write
fd list.POLLERR
indicates a socket error that we always want to be notified about.
pollfd::revents
is returned bypoll()
. It uses the same set of flags to indicate whether the fd is in thecan_read
orcan_write
list.
Step 1: Construct the fd list for `poll()`
The application code decides the type of readiness notifications. It
communicates with the event loop via the want_read
&
want_write
flags in Conn
, the fds
argument is then constructed from these flags:
// a map of all client connections, keyed by fd
std::vector<Conn *> fd2conn;
// the event loop
std::vector<struct pollfd> poll_args;
while (true) {
// prepare the arguments of the poll()
.clear();
poll_args// put the listening sockets in the first position
struct pollfd pfd = {fd, POLLIN, 0};
.push_back(pfd);
poll_args// the rest are connection sockets
for (Conn *conn : fd2conn) {
if (!conn) {
continue;
}
struct pollfd pfd = {conn->fd, POLLERR, 0};
// poll() flags from the application's intent
if (conn->want_read) {
.events |= POLLIN;
pfd}
if (conn->want_write) {
.events |= POLLOUT;
pfd}
.push_back(pfd);
poll_args}
// more ...
}
Step 2: Call `poll()`
// the event loop
while (true) {
// prepare the arguments of the poll()
// ...
// wait for readiness
int rv = poll(poll_args.data(), (nfds_t)poll_args.size(), -1);
if (rv < 0 && errno == EINTR) {
continue; // not an error
}
if (rv < 0) {
("poll");
die}
// ...
}
poll()
is the only blocking syscall in the entire
program. Normally it returns when at least one of the fds is ready.
However, it may occasionally return with errno = EINTR
even
if nothing is ready.
If a process receives a Unix signal during a blocking syscall, the
syscall is immediately returned with EINTR
to give the
process a chance to handle the signal. EINTR
is not
expected for non-blocking syscalls.
EINTR
is not an error, the syscall should be retried.
Even if you do not use signals, you should still handle
EINTR
, because there may be unexpected sources of
signals.
Step 3: Accept new connections
We put the listening socket at position 0 on the fd list.
// the event loop
while (true) {
// prepare the arguments of the poll()
.clear();
poll_args// put the listening sockets in the first position
struct pollfd pfd = {fd, POLLIN, 0};
.push_back(pfd);
poll_args// ...
// wait for readiness
int rv = poll(poll_args.data(), (nfds_t)poll_args.size(), -1);
// ...
// handle the listening socket
if (poll_args[0].revents) {
if (Conn *conn = handle_accept(fd)) {
// put it into the map
if (fd2conn.size() <= (size_t)conn->fd) {
.resize(conn->fd + 1);
fd2conn}
[conn->fd] = conn;
fd2conn}
}
// ...
} // the event loop
accept()
is treated as read()
in readiness
notifications, so it uses POLLIN
. After poll()
returns, check the 1st fd to see if we can accept()
.
handle_accept()
creates the Conn
object for
the new connection. We’ll code this later.
Step 4: Invoke application callbacks
The rest of the fd list is for connection sockets. Call the application code if they are ready for IO.
while (true) {
// ...
// wait for readiness
int rv = poll(poll_args.data(), (nfds_t)poll_args.size(), -1);
// ...
// handle connection sockets
for (size_t i = 1; i < poll_args.size(); ++i) { // note: skip the 1st
uint32_t ready = poll_args[i].revents;
*conn = fd2conn[poll_args[i].fd];
Conn if (ready & POLLIN) {
(conn); // application logic
handle_read}
if (ready & POLLOUT) {
(conn); // application logic
handle_write}
}
}
Step 5: Terminate connections
We always poll()
for POLLERR
on connection
sockets, so we can destroy the connection on error. Application code can
also set Conn::want_close
to request the event loop to
destroy the connection.
// handle connection sockets
for (size_t i = 1; i < poll_args.size(); ++i) {
uint32_t ready = poll_args[i].revents;
*conn = fd2conn[poll_args[i].fd];
Conn // read & write ...
// close the socket from socket error or application logic
if ((ready & POLLERR) || conn->want_close) {
(void)close(conn->fd);
[conn->fd] = NULL;
fd2conndelete conn;
}
}
You can add a callback handle_err()
to let the
application code handle the error, but there is nothing to do in our
application, so we just close the socket here.
6.4 Application code with non-blocking IO
Non-blocking `accept()`
Before entering the event loop, make the listening socket
non-blocking with fcntl
.
static void fd_set_nb(int fd) {
(fd, F_SETFL, fcntl(fd, F_GETFL, 0) | O_NONBLOCK);
fcntl}
Then the event loop calls back the application code to do
accept()
.
static Conn *handle_accept(int fd) {
// accept
struct sockaddr_in client_addr = {};
socklen_t socklen = sizeof(client_addr);
int connfd = accept(fd, (struct sockaddr *)&client_addr, &socklen);
if (connfd < 0) {
return NULL;
}
// set the new connection fd to nonblocking mode
(connfd);
fd_set_nb// create a `struct Conn`
*conn = new Conn();
Conn ->fd = connfd;
conn->want_read = true; // read the 1st request
connreturn conn;
}
The connection socket is also made non-blocking, waiting for its 1st read.
Protocol parser with non-blocking read
See the comments for each sub-step.
static void handle_read(Conn *conn) {
// 1. Do a non-blocking read.
uint8_t buf[64 * 1024];
ssize_t rv = read(conn->fd, buf, sizeof(buf));
if (rv <= 0) { // handle IO error (rv < 0) or EOF (rv == 0)
->want_close = true;
connreturn;
}
// 2. Add new data to the `Conn::incoming` buffer.
(conn->incoming, buf, (size_t)rv);
buf_append// 3. Try to parse the accumulated buffer.
// 4. Process the parsed message.
// 5. Remove the message from `Conn::incoming`.
(conn)
try_one_request// ...
}
The handling is split into try_one_request()
. If there
is not enough data, it will do nothing until a future loop iteration
with more data.
// process 1 request if there is enough data
static bool try_one_request(Conn *conn) {
// 3. Try to parse the accumulated buffer.
// Protocol: message header
if (conn->incoming.size() < 4) {
return false; // want read
}
uint32_t len = 0;
(&len, conn->incoming.data(), 4);
memcpyif (len > k_max_msg) { // protocol error
->want_close = true;
connreturn false; // want close
}
// Protocol: message body
if (4 + len > conn->incoming.size()) {
return false; // want read
}
const uint8_t *request = &conn->incoming[4];
// 4. Process the parsed message.
// ...
// generate the response (echo)
(conn->outgoing, (const uint8_t *)&len, 4);
buf_append(conn->outgoing, request, len);
buf_append// 5. Remove the message from `Conn::incoming`.
(conn->incoming, 4 + len);
buf_consumereturn true; // success
}
We use std::vector
as the buffer type, which is just a
dynamic array.
// append to the back
static void
(std::vector<uint8_t> &buf, const uint8_t *data, size_t len) {
buf_append.insert(buf.end(), data, data + len);
buf}
// remove from the front
static void buf_consume(std::vector<uint8_t> &buf, size_t n) {
.erase(buf.begin(), buf.begin() + n);
buf}
Non-blocking write
There is no application logic, just write some data and remove it
from the buffer. write()
can return less bytes and that’s
OK because the event loop will call it again.
static void handle_write(Conn *conn) {
assert(conn->outgoing.size() > 0);
ssize_t rv = write(conn->fd, conn->outgoing.data(), conn->outgoing.size());
if (rv < 0) {
->want_close = true; // error handling
connreturn;
}
// remove written data from `outgoing`
(conn->outgoing, (size_t)rv);
buf_consume// ...
}
State transitions between request and response
In a request-response protocol, the program is either reading a
request or writing a response. At the end of handle_read()
and handle_write()
, we need to switch between the 2
states.
static void handle_read(Conn *conn) {
// ...
// update the readiness intention
if (conn->outgoing.size() > 0) { // has a response
->want_read = false;
conn->want_write = true;
conn} // else: want read
}
static void handle_write(Conn *conn) {
// ...
if (conn->outgoing.size() == 0) { // all data written
->want_read = true;
conn->want_write = false;
conn} // else: want write
}
This is not universally true. For example, some proxies and messaging protocols are not request-response and can read and write simultaneously.
That’s all
The protocol is the same as in chapter 04. So you can reuse the test client. The server is the bare minimum that resembles something production grade, but it’s still a toy, go to the next chapter for more advanced stuff.
Source code: