04. Promises and Events
In this chapter, we will implement the echo server again, but using the promise-based style which is the basis for the rest of the book. This decision will be explained later.
4.1 Introduction to `async` and `await`
Let’s take a quick tour of async/await
and the
Promise
type in JS, in case you are not already familiar
with them.
Using Callbacks
An example of a callback-based API. The application logic is continued in a callback function.
function my_app() {
do_something_cb((err, result) => {
if (err) {
// fail.
else {
} // success, use the result.
};
}) }
Using Promises and `await`
An example of using await
on a promise. The application
logic continues in the same async
function.
function do_something_prmoise(): Promise<T>;
async function my_app() {
try {
const result: T = await do_something_prmoise();
catch (err) {
} // fail.
} }
Advantage: The application logic is not broken into multiple functions.
Creating Promises
An example of creating promises: converting a callback-based API to promise-based.
function do_something_prmoise() {
return new Promise<T>((resolve, reject) => {
do_something_cb((err, result) => {
if (err) {
reject(err);
else {
} resolve(result);
};
});
}) }
Callbacks are unavoidable in JS. When creating a promise object, an executor callback is passed as an argument to receive 2 more callbacks:
resolve()
causes theawait
statement to return a value.reject()
causes theawait
statement to throw an exception.
You must call one of them when the result is available or the operation has failed. This may happen outside the executor function, so you may need to store these callbacks somewhere.
Terminology for promises:
- Fulfilled:
resolve()
called. - Rejected:
reject()
called. - Settled: Either fulfilled or rejected.
- Pending: Not settled.
4.2 Understanding `async` and `await`
Normal Functions Yield to the Runtime by `return`
There are 2 types of JS functions: normal functions and
async
functions. Normal functions execute from start to
return
(either explicitly or implicitly). Since the JS
runtime is single-threaded and event-based, you cannot do blocking IOs
in JS, instead you register callbacks for the completion of IOs, and
then the JS code ends. Once back in the runtime, the runtime can poll
for events and invoke callbacks, that’s the event loop we talked about
earlier!
`async` Functions Yield to the Runtime by `await`
Initially, the Promise
type is just a way to manage
callbacks. It allows chaining multiple callbacks without too many nested
functions. However, we will not bother with this use of promises because
of the addition of async functions.
Unlike normal functions, async functions can return to the runtime in
the middle of execution; this happens when you use the
await
statement on a promise. And when the promise is
settled, execution of the async function resumes with the
result of the promise. This is a superior coding experience because you
can write sequential IO code in the same function
without being interrupted by callbacks.
Calling `async` Functions Start New Tasks
Invoking an async function results in a promise that settles itself
when the async function returns or throws. You can await
on
it like a normal promise, but if you don’t, the async function will
still be scheduled by the runtime. This is similar to starting a thread
in multi-threaded programming. But all JS code shares a single OS
thread, so a better word to use is task.
A list of ways to start tasks in different environments:
- In JS, you start background tasks by not waiting for promises.
- In Go, you use the
go
statement. - In Python,
async/await
is similar to JS, except that you have to run an event loop yourself, as it’s not a language built-in. - In environments without an event loop, you can start OS threads instead of user-level tasks.
4.3 From Events To Promises
The net
module doesn’t provide a promise-based API, so
we have to implement the hypothetical API from the last chapter.
function soRead(conn: TCPConn): Promise<Buffer>;
function soWrite(conn: TCPConn, data: Buffer): Promise<void>;
Step 1: Analyze The Solution
The soRead
function returns a promise which is resolved
with socket data. It depends on 3 events.
- The
'data'
event fulfills the promise. - While reading a socket, we also need to know if the EOF has
occurred. So the
'end'
event also fulfills the promise. A common way to indicate the EOF is to return zero-length data. - There is also the
'error'
event, we want to reject the promise when this happens, otherwise, the promise hangs forever.
To resolve or reject the promise from these events, the promise has
to be stored somewhere. We will create the TCPConn
wrapper
object for this purpose.
// A promise-based API for TCP sockets.
type TCPConn = {
// the JS socket object
: net.Socket;
socket// the callbacks of the promise of the current read
: null|{
reader: (value: Buffer) => void,
resolve: (reason: Error) => void,
reject;
}; }
The promise’s resolve
and reject
callbacks
are stored in the TCPConn.reader
field.
Step 2: Handle the 'data' Event
Let’s try to implement the 'data'
event now. Here we
have a problem: the 'data'
event is emitted whenever data
arrives, but the promise only exists when the program is reading from
the socket. So there must be a way to control when the
'data'
event is ready to fire.
.pause(); // pause the 'data' event
socket.resume(); // resume the 'data' event socket
With this knowledge, we can now implement the soRead
function.
// create a wrapper from net.Socket
function soInit(socket: net.Socket): TCPConn {
const conn: TCPConn = {
: socket, reader: null,
socket;
}.on('data', (data: Buffer) => {
socketconsole.assert(conn.reader);
// pause the 'data' event until the next read.
.socket.pause();
conn// fulfill the promise of the current read.
.reader!.resolve(data);
conn.reader = null;
conn;
})return conn;
}
function soRead(conn: TCPConn): Promise<Buffer> {
console.assert(!conn.reader); // no concurrent calls
return new Promise((resolve, reject) => {
// save the promise callbacks
.reader = {resolve: resolve, reject: reject};
conn// and resume the 'data' event to fulfill the promise later.
.socket.resume();
conn;
}) }
Since the 'data'
event is paused until we read the
socket, the socket should be paused by default after it is created.
There is a flag to do this.
const server = net.createServer({
: true, // required by `TCPConn`
pauseOnConnect; })
Step 3: Handle the 'end' and 'error' Event
Unlike the 'data'
event, the 'end'
and
'error'
events cannot be paused and are emitted as they
happen. We can handle this by storing them in the wrapper object and
checking them in soRead
.
// A promise-based API for TCP sockets.
type TCPConn = {
// the JS socket object
: net.Socket;
socket// from the 'error' event
: null|Error;
err// EOF, from the 'end' event
: boolean;
ended// the callbacks of the promise of the current read
: null|{
reader: (value: Buffer) => void,
resolve: (reason: Error) => void,
reject;
}; }
If there is a current reader promise, resolve or reject it.
// create a wrapper from net.Socket
function soInit(socket: net.Socket): TCPConn {
const conn: TCPConn = {
: socket, err: null, ended: false, reader: null,
socket;
}.on('data', (data: Buffer) => {
socket// omitted ...
;
}).on('end', () => {
socket// this also fulfills the current read.
.ended = true;
connif (conn.reader) {
.reader.resolve(Buffer.from('')); // EOF
conn.reader = null;
conn
};
}).on('error', (err: Error) => {
socket// errors are also delivered to the current read.
.err = err;
connif (conn.reader) {
.reader.reject(err);
conn.reader = null;
conn
};
})return conn;
}
Events that happened before soRead
are stored and
checked.
// returns an empty `Buffer` after EOF.
function soRead(conn: TCPConn): Promise<Buffer> {
console.assert(!conn.reader); // no concurrent calls
return new Promise((resolve, reject) => {
// if the connection is not readable, complete the promise now.
if (conn.err) {
reject(conn.err);
return;
}if (conn.ended) {
resolve(Buffer.from('')); // EOF
return;
}
// save the promise callbacks
.reader = {resolve: resolve, reject: reject};
conn// and resume the 'data' event to fulfill the promise later.
.socket.resume();
conn;
}) }
Step 4: Write to Socket
The socket.write
method accepts a callback to notify the
completion of the write, so the conversion to promise is trivial.
function soWrite(conn: TCPConn, data: Buffer): Promise<void> {
console.assert(data.length > 0);
return new Promise((resolve, reject) => {
if (conn.err) {
reject(conn.err);
return;
}
.socket.write(data, (err?: Error) => {
connif (err) {
reject(err);
else {
} resolve();
};
});
}) }
There is also the 'drain'
event in the Node.js documentation which can be used for this task.
Node.js libraries often give you multiple ways to do the same thing, you
can just choose one and ignore the others.
4.4 Using `async` and `await`
Let’s return to the echo server implementation. In order to use
await
on the promise-based API, the handler for new
connections (newConn
) becomes an async
function.
async function newConn(socket: net.Socket): Promise<void> {
console.log('new connection', socket.remoteAddress, socket.remotePort);
try {
await serveClient(socket);
catch (exc) {
} console.error('exception:', exc);
finally {
} .destroy();
socket
} }
We also wrapped our code in a try-catch block because the
await
statement can throw exceptions when rejected.
Although you may want to actually handle errors in production code
instead of using a catch-all exception handler.
// echo server
async function serveClient(socket: net.Socket): Promise<void> {
const conn: TCPConn = soInit(socket);
while (true) {
const data = await soRead(conn);
if (data.length === 0) {
console.log('end connection');
break;
}
console.log('data', data);
await soWrite(conn, data);
} }
The code to use the socket now becomes straightforward. There are no callbacks to interrupt the application logic.
Note that the newConn
async function is not
await
ed anywhere. It is simply invoked as a callback of the
listening socket. This means that multiple connections are handled
concurrently.
Exercise for the reader: convert the “accept” primitive to promise-based.
type TCPListener = {
: net.Socket;
socket// ...
;
}
function soListen(...): TCPListener;
function soAccept(listener: TCPListner): Promise<TCPConn>;
4.5 Discussion: Backpressure
Waiting for Socket Writes to Complete?
Our new echo server has a major difference — we now wait for
socket.write()
to complete. But what does the “completion
of the write” mean? And why do we have to wait for it?
To answer the question, socket.write()
is completed when
the data is submitted to the OS, but a new question arises, why the data
cannot be submitted to the OS immediately. This question actually goes
deeper than network programming itself.
Producers are Bottlenecked by Consumers
Wherever there is asynchronous communication, there are queues or buffers that connect producers to consumers. Queues and buffers in our physical world are bounded in size and cannot hold an infinite amount of data. One problem with asynchronous communication is that what happens when the producer is producing faster than the consumer is consuming? There must be a mechanism to prevent the queue or buffer from overflowing. This mechanism is often called backpressure in network applications.
Backpressure in TCP: Flow Control
Backpressure in TCP is known as flow control.
- The consumer’s TCP stack stores incoming data in a receive buffer for the application to consume.
- The amount of data the producer’s TCP stack can send is bounded by a window known to the producer’s TCP stack, and it will pause sending data when the window is full.
- The consumer’s TCP stack manages the window; when the app drains from the receive buffer, it moves the window forward and notifies the producer’s TCP stack to resume sending.
The effect of flow control: TCP can pause and resume transmission so that the consumer’s receive buffer is bounded.
flow ctrl bounded!
|producer| ==> |send buf| ===========> |recv buf| ==> |consumer|
app OS TCP OS app
TCP flow control should not be confused with TCP congestion control, which also controls the window.
Backpressure Between Applications & OS
This nice mechanism needs to be implemented not only in TCP, but also in applications. Let’s focus on the producer side. The application produces data and submits it to the OS, the data goes to the send buffer, and the TCP stack consumes from the send buffer and transmits the data.
write() may block!
|producer| ========> |send buf| =====> ...
app OS TCP
How does the OS prevent the send buffer from overflowing? Simple, the application cannot write more data when the buffer is full. Now the application is responsible for throttling itself from overproducing, because the data has to go somewhere, but memory is finite.
If the application is doing blocking IO, the call will block when the send buffer is full, so backpressure is effortless. However, this is not the case when coding in JS with an event loop.
Unbounded Queues are Footguns
We can now answer the question: why wait for writes to complete?
Because while the application is waiting, it cannot produce! The
socket.write()
will always succeed even if the runtime
cannot submit more data to the OS due to a full send buffer, but the
data has to go somewhere, it goes to an unbounded internal queue in the
runtime, which is a footgun that can cause unbounded memory usage.
write() unbounded! event loop
|producer| ======> |internal queue| =========> |send buf| =====> ...
app Node.js OS TCP
Taking our old echo server as an example, the server is both a producer and a consumer, as is the client. If the client produces data faster than the client consumes the echoed data (or the client does not consume any data at all), the server’s memory will grow indefinitely if the server does not wait for writes to complete.
Backpressure should exist in any system that connects producers to consumers. A rule of thumb is to look for unbounded queues in software systems, as they are a sign of the lack of backpressure.
4.6 Discussion: Events and Ordered Execution
Another difference from the old one is the use of
socket.pause()
. You can now understand why this is
essential, because it is used to implement backpressure.
There is another reason to pause the 'data'
event. In
callback-based code, when the event handler returns, the runtime can
fire the next 'data'
event if it is not paused. The problem
is that the completion of the event callback doesn’t mean the
completion of the event handling — the handling can continue
with further callbacks. And the interleaved handling can cause problems,
considering that the data is an ordered sequence of bytes!
This situation is called a race condition, and is a class of problems related to concurrency. In this situation, unwanted concurrency is introduced.
4.7 Conclusion: Promise vs. Callback
Following the discussions above, we can now explain why we switched to the promise-based API, because there are advantages.
- If you stick to promises and
async/await
, it’s harder to create the kind of race conditions described above because things happen in order. - With callback-based code, it’s not only harder to figure out the order of code execution, it’s also harder to control the order. In short, callbacks are harder to read and more error-prone to write.
- Backpressure is naturally present when using the promse-based style. This is similar to coding with blocking IO (which you can’t do in Node.js).
We have learned the basics of the socket API. Let’s move on to the next topic: protocol.