6b. Coding an Event Loop (Part 2)
6.5 Event loop basics
What we learned in the previous chapter:
- Callback-based programming: Reacting to events instead of blocking the thread.
- Handling input with non-blocking IO: Keep data in a buffer until there is enough to proceed.
- The boundary between the event loop and application code. A minimal event loop is still non-trivial, separate generic event handling from application logic.
- Persist application state between loop iterations. For request-response protocols, alternate between 2 states (read and write).
This is the bare minimum you need to know. Then there are more advanced topics to bridge the gap between toy code and production software.
6.6 Pipelined requests
Batching requests without changing the protocol
A typical Redis server can handle 20K~200K requests/second for simple operations like get, set, del. Many other request-response applications are in the same range. This is limited by the number of IO events a single thread can handle if the application logic (processing requests) is insignificant.
The server is bottlenecked by single-threaded IO, so it’s desirable to increase the amount of work per IO by batching multiple requests per read.
This can be done by pipelining requests. Normally a client sends 1 request, then waits for 1 response, a pipelined client sends n requests, then waits for n responses. The server still handles each request in order, the only difference is that it can get multiple requests in 1 read, effectively reducing the number of IOs.
Requests pipelining also reduces latency on the client side because it can get multiple responses in 1 RTT (round trip time). This is a documented use case for Redis.
Problems caused by pipelined requests
Pipelining does not change the protocol or server-side handling; requests are sent in order, the server handles them in order, then sends responses in order. So it should just work? Not so simple.
Remember that TCP is just a stream of ordered bytes; a server simply consuming messages from it one at a time cannot tell the difference. But implementations often make extra assumptions. For example:
static void handle_read(Conn *conn) {
// ...
(conn) // ASSUMPTION: at most 1 request
try_one_requestif (conn->outgoing.size() > 0) { // 1. Processed 1 request.
->want_read = false;
conn->want_write = true;
conn}
}
static void handle_write(Conn *conn) {
// ...
if (conn->outgoing.size() == 0) { // 2. Written 1 response.
->want_read = true; // 3. Wait for read readiness
conn->want_write = false;
conn}
}
handle_read()
assumes that there is at most 1 request in
the input buffer. After it has processed 1 request, the logic continues
with handle_write()
, and after it’s done,
handle_write()
asks the event loop to wait for read
readiness, even though there is still unhandled data in the input
buffer. So the pipeline is stuck!
Treat the input as a byte stream
To fix this, drop the extra assumption about the input buffer, which is to keep consuming from it until there is nothing left to do.
static void handle_read(Conn *conn) {
// ...
// try_one_request(conn); // WRONG
while (try_one_request(conn)) {} // CORRECT
// ...
}
And after we have processed 1 request, we cannot just empty the input buffer because there may be more messages.
static bool try_one_request(Conn *conn) {
// ...
// conn->incoming.clear() // WRONG
(conn->incoming, 4 + len); // CORRECT
buf_consumereturn true;
}
In summary, when reading a byte stream, you can wait for n bytes to come, but do not assume that at most n bytes have come, because the byte stream is only about order, not time.
Test for correctness with pipelined messages
Browsers have tried and failed to use pipelined requests because many servers cannot handle them.
The TCP byte stream is a major obstacle to network programming. A
server cannot handle pipelined requests without interpreting the byte
stream correctly. Thus, testing an implementation with pipelined
messages can reveal bugs. For example, if a server makes decisions based
on the number of bytes returned by read()
, it will fail
with pipelined messages.
6.7 Debug & test
Byte stream handling
The protocol is the same as in chapter 04. But there are more things to test. One is the pipeline test:
std::vector<std::string> query_list = {
"hello1", "hello2", "hello3",
};
for (const std::string &s : query_list) {
int32_t err = send_req(fd, s.data(), s.size());
if (err) { /* ... */ }
}
for (size_t i = 0; i < query_list.size(); ++i) {
int32_t err = read_res(fd);
if (err) { /* ... */ }
}
The server is likely to receive multiple messages in 1 read, which tests the parser.
Task spanning multiple loop iterations
Test the case where a large message takes multiple iterations to process. So let’s bump the message size limit to a larger number:
const size_t k_max_msg = 32 << 20; // likely larger than the kernel buffer
Then include a large request to the pipeline.
std::vector<std::string> query_list = {
"hello1", "hello2", "hello3",
std::string(k_max_msg, 'z'), // requires multiple event loop iterations
"hello5",
};
Debug with `strace`
Use the strace
command to verify that we are testing
what we intended.
$ strace ./server >/dev/null
strace
will show all syscalls:
Create the listening socket:
socket(AF_INET, SOCK_STREAM, IPPROTO_IP) = 3
setsockopt(3, SOL_SOCKET, SO_REUSEADDR, [1], 4) = 0
bind(3, {sa_family=AF_INET, sin_port=htons(1234), sin_addr=inet_addr("0.0.0.0")}, 16) = 0
fcntl(3, F_GETFL) = 0x2 (flags O_RDWR)
fcntl(3, F_SETFL, O_RDWR|O_NONBLOCK) = 0
listen(3, 4096) = 0
Enter the event loop, then wake up to accept a client connection:
poll([{fd=3, events=POLLIN}], 1, -1) = 1 ([{fd=3, revents=POLLIN}])
accept(3, {sa_family=AF_INET, sin_port=htons(52184), sin_addr=inet_addr("127.0.0.1")}, [16]) = 4
fcntl(4, F_GETFL) = 0x2 (flags O_RDWR)
fcntl(4, F_SETFL, O_RDWR|O_NONBLOCK) = 0
Wake up to read from the client (note the pipelined requests):
poll([{fd=3, events=POLLIN}, {fd=4, events=POLLIN|POLLERR}], 2, -1) = 1 ([{fd=4, revents=POLLIN}])
read(4, "\6\0\0\0hello1\6\0\0\0hello2\6\0\0\0hello3", 65536) = 30
Wake up to write 3 responses at once (pipelined processing works):
poll([{fd=3, events=POLLIN}, {fd=4, events=POLLOUT|POLLERR}], 2, -1) = 1 ([{fd=4, revents=POLLOUT}])
write(4, "\6\0\0\0hello1\6\0\0\0hello2\6\0\0\0hello3", 30) = 30
Wake up multiple times to read a large request:
poll([{fd=3, events=POLLIN}, {fd=4, events=POLLIN|POLLERR}], 2, -1) = 1 ([{fd=4, revents=POLLIN}])
read(4, "\0\0\0\2zzzzzzzzzzzzzzzzzzzzzzzzzzzz"..., 65536) = 65536
brk(0x559c87e41000) = 0x559c87e41000
poll([{fd=3, events=POLLIN}, {fd=4, events=POLLIN|POLLERR}], 2, -1) = 1 ([{fd=4, revents=POLLIN}])
read(4, "zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz"..., 65536) = 65536
... omitted ...
poll([{fd=3, events=POLLIN}, {fd=4, events=POLLIN|POLLERR}], 2, -1) = 1 ([{fd=4, revents=POLLIN}])
read(4, "zzzz\6\0\0\0hello5", 65536) = 14
Wake up multiple times to write large responses:
poll([{fd=3, events=POLLIN}, {fd=4, events=POLLOUT|POLLERR}], 2, -1) = 1 ([{fd=4, revents=POLLOUT}])
write(4, "\0\0\0\2zzzzzzzzzzzzzzzzzzzzzzzzzzzz"..., 33554446) = 2621440
poll([{fd=3, events=POLLIN}, {fd=4, events=POLLOUT|POLLERR}], 2, -1) = 1 ([{fd=4, revents=POLLOUT}])
write(4, "zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz"..., 30933006) = 3175899
poll([{fd=3, events=POLLIN}, {fd=4, events=POLLOUT|POLLERR}], 2, -1) = 1 ([{fd=4, revents=POLLOUT}])
write(4, "zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz"..., 27757107) = 27757107
The client closes the connection, then the server closes as well:
poll([{fd=3, events=POLLIN}, {fd=4, events=POLLIN|POLLERR}], 2, -1) = 1 ([{fd=4, revents=POLLIN}])
read(4, "", 65536) = 0
close(4) = 0
6.8 Optimistic non-blocking writes
In a request-response protocol, the client reads the response before sending the next request. The server can assume that the socket is ready to write when it receives a request because the client has consumed the previous writes. So the server can bindly write without waiting for the next loop iterations to save 1 syscall.
static void handle_read(Conn *conn) {
// ...
if (conn->outgoing.size() > 0) { // has a response
->want_read = false;
conn->want_write = true;
conn// The socket is likely ready to write in a request-response protocol,
// try to write it without waiting for the next iteration.
return handle_write(conn); // optimization
}
}
This assumption is false with a pipelined client. The client may not
have started reading while the server is writing. So the write buffer
can be full on the server-side. For this to work,
handle_write()
must check for EAGAIN
in case
the socket is not ready.
static void handle_write(Conn *conn) {
assert(conn->outgoing.size() > 0);
ssize_t rv = write(conn->fd, &conn->outgoing[0], conn->outgoing.size());
if (rv < 0 && errno == EAGAIN) {
return; // actually not ready
}
if (rv < 0) {
->want_close = true;
connreturn; // error
}
// ...
}
Verify this optimization with strace. Before, poll()
before each write()
:
poll([{fd=3, events=POLLIN}, {fd=4, events=POLLIN|POLLERR}], 2, -1) = 1 ([{fd=4, revents=POLLIN}])
read(4, "\6\0\0\0hello1", 65536) = 10
poll([{fd=3, events=POLLIN}, {fd=4, events=POLLOUT|POLLERR}], 2, -1) = 1 ([{fd=4, revents=POLLOUT}])
write(4, "\6\0\0\0hello1", 10) = 10
After, read()
and write()
in the same loop
iteration:
poll([{fd=3, events=POLLIN}, {fd=4, events=POLLIN|POLLERR}], 2, -1) = 1 ([{fd=4, revents=POLLIN}])
read(4, "\6\0\0\0hello1", 65536) = 10
write(4, "\6\0\0\0hello1", 10) = 10
6.9 Better buffer handling
We used std::vector
as a FIFO (first-in-first-out),
since we append to the back and remove from the front.
// append to the back
static void
(std::vector<uint8_t> &buf, const uint8_t *data, size_t len) {
buf_append.insert(buf.end(), data, data + len);
buf}
// remove from the front
static void buf_consume(std::vector<uint8_t> &buf, size_t n) {
.erase(buf.begin(), buf.begin() + n);
buf}
Appending to the back is efficient for dynamic arrays because it doesn’t reallocate each time; its capacity grows exponentially to amortize the reallocation cost. But removing stuff from the front is not efficient because it has to move the rest of the data each time. If the buffer contains many pipelined requests, removing each one from the front one by one is O(N2).
So we need a buffer type that can operate at both ends:
| unused | data | unused |
v v v v
buffer_begin data_begin data_end buffer_end
struct Buffer {
uint8_t *buffer_begin;
uint8_t *buffer_end;
uint8_t *data_begin;
uint8_t *data_end;
};
Removing from the front is just advancing a pointer; no need to move the data.
static void buf_consume(struct Buffer *buf, size_t n) {
->data_begin += n;
buf}
Appending to the back now has 2 options:
- reallocate like a dynamic array,
- or move the data to the front to make room.
This is left as an exercise for the reader.
Source code: