06. The Event Loop Implementation
This chapter walks through the real C++ code of an echo server.
6.1 Overview
The definition of struct Conn
:
enum {
= 0,
STATE_REQ = 1,
STATE_RES = 2, // mark the connection for deletion
STATE_END };
struct Conn {
int fd = -1;
uint32_t state = 0; // either STATE_REQ or STATE_RES
// buffer for reading
size_t rbuf_size = 0;
uint8_t rbuf[4 + k_max_msg];
// buffer for writing
size_t wbuf_size = 0;
size_t wbuf_sent = 0;
uint8_t wbuf[4 + k_max_msg];
};
We need buffers for reading/writing, since in nonblocking mode, IO operations are often deferred.
The state
is used to decide what to do with the
connection. There are 2 states for an ongoing connection. The
STATE_REQ
is for reading requests and the
STATE_RES
is for sending responses.
The code for the event loop:
int main() {
int fd = socket(AF_INET, SOCK_STREAM, 0);
if (fd < 0) {
("socket()");
die}
// bind, listen and etc
// code omitted...
// a map of all client connections, keyed by fd
std::vector<Conn *> fd2conn;
// set the listen fd to nonblocking mode
(fd);
fd_set_nb
// the event loop
std::vector<struct pollfd> poll_args;
while (true) {
// prepare the arguments of the poll()
.clear();
poll_args// for convenience, the listening fd is put in the first position
struct pollfd pfd = {fd, POLLIN, 0};
.push_back(pfd);
poll_args// connection fds
for (Conn *conn : fd2conn) {
if (!conn) {
continue;
}
struct pollfd pfd = {};
.fd = conn->fd;
pfd.events = (conn->state == STATE_REQ) ? POLLIN : POLLOUT;
pfd.events = pfd.events | POLLERR;
pfd.push_back(pfd);
poll_args}
// poll for active fds
// the timeout argument doesn't matter here
int rv = poll(poll_args.data(), (nfds_t)poll_args.size(), 1000);
if (rv < 0) {
("poll");
die}
// process active connections
for (size_t i = 1; i < poll_args.size(); ++i) {
if (poll_args[i].revents) {
*conn = fd2conn[poll_args[i].fd];
Conn (conn);
connection_ioif (conn->state == STATE_END) {
// client closed normally, or something bad happened.
// destroy this connection
[conn->fd] = NULL;
fd2conn(void)close(conn->fd);
(conn);
free}
}
}
// try to accept a new connection if the listening fd is active
if (poll_args[0].revents) {
(void)accept_new_conn(fd2conn, fd);
}
}
return 0;
}
The first thing in our event loop is setting up arguments of
poll
. The listening fd is polled with the
POLLIN
flag. For the connection fd, the state of the struct
Conn
determines the poll flag. In this particular case, the
poll flag is either reading (POLLIN
) or writing
(POLLOUT
), never both. If using epoll
, the
first thing in an event loop is usually updating the fd set with
epoll_ctl
.
The poll
also takes a timeout argument which can be used
to implement timers in later chapters. The timeout doesn’t matter at
this point, so we just set it to a big number.
After the return of poll
, we are notified by which fd
are ready for reading/writing and act accordingly.
6.2 New Connections
The accept_new_conn()
function accepts a new connection
and creates the struct Conn
object:
static void conn_put(std::vector<Conn *> &fd2conn, struct Conn *conn) {
if (fd2conn.size() <= (size_t)conn->fd) {
.resize(conn->fd + 1);
fd2conn}
[conn->fd] = conn;
fd2conn}
static int32_t accept_new_conn(std::vector<Conn *> &fd2conn, int fd) {
// accept
struct sockaddr_in client_addr = {};
socklen_t socklen = sizeof(client_addr);
int connfd = accept(fd, (struct sockaddr *)&client_addr, &socklen);
if (connfd < 0) {
("accept() error");
msgreturn -1; // error
}
// set the new connection fd to nonblocking mode
(connfd);
fd_set_nb// creating the struct Conn
struct Conn *conn = (struct Conn *)malloc(sizeof(struct Conn));
if (!conn) {
(connfd);
closereturn -1;
}
->fd = connfd;
conn->state = STATE_REQ;
conn->rbuf_size = 0;
conn->wbuf_size = 0;
conn->wbuf_sent = 0;
conn(fd2conn, conn);
conn_putreturn 0;
}
The connection_io()
is the state machine for client
connections:
static void connection_io(Conn *conn) {
if (conn->state == STATE_REQ) {
(conn);
state_req} else if (conn->state == STATE_RES) {
(conn);
state_res} else {
assert(0); // not expected
}
}
6.3 The State Machine: Reader
The STATE_REQ
state is for reading:
static void state_req(Conn *conn) {
while (try_fill_buffer(conn)) {}
}
static bool try_fill_buffer(Conn *conn) {
// try to fill the buffer
assert(conn->rbuf_size < sizeof(conn->rbuf));
ssize_t rv = 0;
do {
size_t cap = sizeof(conn->rbuf) - conn->rbuf_size;
= read(conn->fd, &conn->rbuf[conn->rbuf_size], cap);
rv } while (rv < 0 && errno == EINTR);
if (rv < 0 && errno == EAGAIN) {
// got EAGAIN, stop.
return false;
}
if (rv < 0) {
("read() error");
msg->state = STATE_END;
connreturn false;
}
if (rv == 0) {
if (conn->rbuf_size > 0) {
("unexpected EOF");
msg} else {
("EOF");
msg}
->state = STATE_END;
connreturn false;
}
->rbuf_size += (size_t)rv;
connassert(conn->rbuf_size <= sizeof(conn->rbuf));
// Try to process requests one by one.
// Why is there a loop? Please read the explanation of "pipelining".
while (try_one_request(conn)) {}
return (conn->state == STATE_REQ);
}
There are lots of things to unpack here. To understand this function, let’s review the pseudo-code from the last chapter:
def do_something_to_client(fd):
if should_read_from(fd):
= read_until_EAGAIN(fd)
data
process_incoming_data(data)# code omitted...
The try_fill_buffer()
function fills the read buffer
with data. Since the size of the read buffer is limited, the read buffer
could be full before we hit EAGAIN
, so we need to process
data immediately after reading to clear some read buffer space, then the
try_fill_buffer()
is looped until we hit
EAGAIN
.
The read
syscall (and any other syscalls) need to be
retried after getting the errno EINTR
. The
EINTR
means the syscall was interrupted by a signal, the
retrying is needed even if our application does not make use of
signals.
6.4 Parsing the Protocol
The try_one_request
function handles the incoming data,
but why is this in a loop? Is there more than one request in the read
buffer? The answer is yes. For a request/response protocol, clients are
not limited to sending one request and waiting for the response at a
time, clients can save some latency by sending multiple requests without
waiting for responses in between, this mode of operation is called
“pipelining”. Thus we can’t assume that the read buffer contains at most
one request.
Listing the try_one_request
function:
static bool try_one_request(Conn *conn) {
// try to parse a request from the buffer
if (conn->rbuf_size < 4) {
// not enough data in the buffer. Will retry in the next iteration
return false;
}
uint32_t len = 0;
(&len, &conn->rbuf[0], 4);
memcpyif (len > k_max_msg) {
("too long");
msg->state = STATE_END;
connreturn false;
}
if (4 + len > conn->rbuf_size) {
// not enough data in the buffer. Will retry in the next iteration
return false;
}
// got one request, do something with it
("client says: %.*s\n", len, &conn->rbuf[4]);
printf
// generating echoing response
(&conn->wbuf[0], &len, 4);
memcpy(&conn->wbuf[4], &conn->rbuf[4], len);
memcpy->wbuf_size = 4 + len;
conn
// remove the request from the buffer.
// note: frequent memmove is inefficient.
// note: need better handling for production code.
size_t remain = conn->rbuf_size - 4 - len;
if (remain) {
(conn->rbuf, &conn->rbuf[4 + len], remain);
memmove}
->rbuf_size = remain;
conn
// change state
->state = STATE_RES;
conn(conn);
state_res
// continue the outer loop if the request was fully processed
return (conn->state == STATE_REQ);
}
The try_one_request
function takes one request from the
read buffer, generates a response, then transits to the
STATE_RES
state.
6.5 The State Machine: Writer
The code for the state STATE_RES
:
static void state_res(Conn *conn) {
while (try_flush_buffer(conn)) {}
}
static bool try_flush_buffer(Conn *conn) {
ssize_t rv = 0;
do {
size_t remain = conn->wbuf_size - conn->wbuf_sent;
= write(conn->fd, &conn->wbuf[conn->wbuf_sent], remain);
rv } while (rv < 0 && errno == EINTR);
if (rv < 0 && errno == EAGAIN) {
// got EAGAIN, stop.
return false;
}
if (rv < 0) {
("write() error");
msg->state = STATE_END;
connreturn false;
}
->wbuf_sent += (size_t)rv;
connassert(conn->wbuf_sent <= conn->wbuf_size);
if (conn->wbuf_sent == conn->wbuf_size) {
// response was fully sent, change state back
->state = STATE_REQ;
conn->wbuf_sent = 0;
conn->wbuf_size = 0;
connreturn false;
}
// still got some data in wbuf, could try to write again
return true;
}
The above code flushes the write buffer until it got
EAGAIN
, or transits back to the STATE_REQ
if
the flushing is done.
6.6 Testing
To test our server, we can run the client from chapter 04 since the protocol is identical. We can also modify the client to demonstrate pipelining client:
// the `query` function was simply splited into `send_req` and `read_res`.
static int32_t send_req(int fd, const char *text);
static int32_t read_res(int fd);
int main() {
int fd = socket(AF_INET, SOCK_STREAM, 0);
if (fd < 0) {
("socket()");
die}
// code omitted...
// multiple pipelined requests
const char *query_list[3] = {"hello1", "hello2", "hello3"};
for (size_t i = 0; i < 3; ++i) {
int32_t err = send_req(fd, query_list[i]);
if (err) {
goto L_DONE;
}
}
for (size_t i = 0; i < 3; ++i) {
int32_t err = read_res(fd);
if (err) {
goto L_DONE;
}
}
:
L_DONE(fd);
closereturn 0;
}
Exercises:
- Try to use
epoll
instead ofpoll
in the event loop. This should be easy. - We are using
memmove
to reclaim read buffer space. However,memmove
on every request is unnecessary, change the code the performmemmove
only beforeread
. - In the
state_res
function,write
was performed for a single response. In pipelined sceneries, we could buffer multiple responses and flush them in the end with a singlewrite
call. Note that the write buffer could be full in the middle.
Source code: