04. Promises and Events

In this chapter, we will implement the echo server again, but using the promise-based style which is the basis for the rest of the book. This decision will be explained later.

4.1 Introduction to `async` and `await`

Let’s take a quick tour of async/await and the Promise type in JS, in case you are not already familiar with them.

Using Callbacks

An example of a callback-based API. The application logic is continued in a callback function.

function my_app() {
    do_something_cb((err, result) => {
        if (err) {
            // fail.
        } else {
            // success, use the result.
        }
    });
}

Using Promises and `await`

An example of using await on a promise. The application logic continues in the same async function.

function do_something_prmoise(): Promise<T>;

async function my_app() {
    try {
        const result: T = await do_something_prmoise();
    } catch (err) {
        // fail.
    }
}

Advantage: The application logic is not broken into multiple functions.

Creating Promises

An example of creating promises: converting a callback-based API to promise-based.

function do_something_prmoise() {
    return new Promise<T>((resolve, reject) => {
        do_something_cb((err, result) => {
            if (err) {
                reject(err);
            } else {
                resolve(result);
            }
        });
    });
}

Callbacks are unavoidable in JS. When creating a promise object, an executor callback is passed as an argument to receive 2 more callbacks:

You must call one of them when the result is available or the operation has failed. This may happen outside the executor function, so you may need to store these callbacks somewhere.

Terminology for promises:

4.2 Understanding `async` and `await`

Normal Functions Yield to the Runtime by `return`

There are 2 types of JS functions: normal functions and async functions. Normal functions execute from start to return (either explicitly or implicitly). Since the JS runtime is single-threaded and event-based, you cannot do blocking IOs in JS, instead you register callbacks for the completion of IOs, and then the JS code ends. Once back in the runtime, the runtime can poll for events and invoke callbacks, that’s the event loop we talked about earlier!

`async` Functions Yield to the Runtime by `await`

Initially, the Promise type is just a way to manage callbacks. It allows chaining multiple callbacks without too many nested functions. However, we will not bother with this use of promises because of the addition of async functions.

Unlike normal functions, async functions can return to the runtime in the middle of execution; this happens when you use the await statement on a promise. And when the promise is settled, execution of the async function resumes with the result of the promise. This is a superior coding experience because you can write sequential IO code in the same function without being interrupted by callbacks.

Calling `async` Functions Start New Tasks

Invoking an async function results in a promise that settles itself when the async function returns or throws. You can await on it like a normal promise, but if you don’t, the async function will still be scheduled by the runtime. This is similar to starting a thread in multi-threaded programming. But all JS code shares a single OS thread, so a better word to use is task.

A list of ways to start tasks in different environments:

4.3 From Events To Promises

The net module doesn’t provide a promise-based API, so we have to implement the hypothetical API from the last chapter.

function soRead(conn: TCPConn): Promise<Buffer>;
function soWrite(conn: TCPConn, data: Buffer): Promise<void>;

Step 1: Analyze The Solution

The soRead function returns a promise which is resolved with socket data. It depends on 3 events.

  1. The 'data' event fulfills the promise.
  2. While reading a socket, we also need to know if the EOF has occurred. So the 'end' event also fulfills the promise. A common way to indicate the EOF is to return zero-length data.
  3. There is also the 'error' event, we want to reject the promise when this happens, otherwise, the promise hangs forever.

To resolve or reject the promise from these events, the promise has to be stored somewhere. We will create the TCPConn wrapper object for this purpose.

// A promise-based API for TCP sockets.
type TCPConn = {
    // the JS socket object
    socket: net.Socket;
    // the callbacks of the promise of the current read
    reader: null|{
        resolve: (value: Buffer) => void,
        reject: (reason: Error) => void,
    };
};

The promise’s resolve and reject callbacks are stored in the TCPConn.reader field.

Step 2: Handle the 'data' Event

Let’s try to implement the 'data' event now. Here we have a problem: the 'data' event is emitted whenever data arrives, but the promise only exists when the program is reading from the socket. So there must be a way to control when the 'data' event is ready to fire.

socket.pause();      // pause the 'data' event
socket.resume();     // resume the 'data' event

With this knowledge, we can now implement the soRead function.

// create a wrapper from net.Socket
function soInit(socket: net.Socket): TCPConn {
    const conn: TCPConn = {
        socket: socket, reader: null,
    };
    socket.on('data', (data: Buffer) => {
        console.assert(conn.reader);
        // pause the 'data' event until the next read.
        conn.socket.pause();
        // fulfill the promise of the current read.
        conn.reader!.resolve(data);
        conn.reader = null;
    });
    return conn;
}

function soRead(conn: TCPConn): Promise<Buffer> {
    console.assert(!conn.reader);   // no concurrent calls
    return new Promise((resolve, reject) => {
        // save the promise callbacks
        conn.reader = {resolve: resolve, reject: reject};
        // and resume the 'data' event to fulfill the promise later.
        conn.socket.resume();
    });
}

Since the 'data' event is paused until we read the socket, the socket should be paused by default after it is created. There is a flag to do this.

const server = net.createServer({
    pauseOnConnect: true,   // required by `TCPConn`
});

Step 3: Handle the 'end' and 'error' Event

Unlike the 'data' event, the 'end' and 'error' events cannot be paused and are emitted as they happen. We can handle this by storing them in the wrapper object and checking them in soRead.

// A promise-based API for TCP sockets.
type TCPConn = {
    // the JS socket object
    socket: net.Socket;
    // from the 'error' event
    err: null|Error;
    // EOF, from the 'end' event
    ended: boolean;
    // the callbacks of the promise of the current read
    reader: null|{
        resolve: (value: Buffer) => void,
        reject: (reason: Error) => void,
    };
};

If there is a current reader promise, resolve or reject it.

// create a wrapper from net.Socket
function soInit(socket: net.Socket): TCPConn {
    const conn: TCPConn = {
        socket: socket, err: null, ended: false, reader: null,
    };
    socket.on('data', (data: Buffer) => {
        // omitted ...
    });
    socket.on('end', () => {
        // this also fulfills the current read.
        conn.ended = true;
        if (conn.reader) {
            conn.reader.resolve(Buffer.from(''));   // EOF
            conn.reader = null;
        }
    });
    socket.on('error', (err: Error) => {
        // errors are also delivered to the current read.
        conn.err = err;
        if (conn.reader) {
            conn.reader.reject(err);
            conn.reader = null;
        }
    });
    return conn;
}

Events that happened before soRead are stored and checked.

// returns an empty `Buffer` after EOF.
function soRead(conn: TCPConn): Promise<Buffer> {
    console.assert(!conn.reader);   // no concurrent calls
    return new Promise((resolve, reject) => {
        // if the connection is not readable, complete the promise now.
        if (conn.err) {
            reject(conn.err);
            return;
        }
        if (conn.ended) {
            resolve(Buffer.from(''));   // EOF
            return;
        }

        // save the promise callbacks
        conn.reader = {resolve: resolve, reject: reject};
        // and resume the 'data' event to fulfill the promise later.
        conn.socket.resume();
    });
}

Step 4: Write to Socket

The socket.write method accepts a callback to notify the completion of the write, so the conversion to promise is trivial.

function soWrite(conn: TCPConn, data: Buffer): Promise<void> {
    console.assert(data.length > 0);
    return new Promise((resolve, reject) => {
        if (conn.err) {
            reject(conn.err);
            return;
        }

        conn.socket.write(data, (err?: Error) => {
            if (err) {
                reject(err);
            } else {
                resolve();
            }
        });
    });
}

There is also the 'drain' event in the Node.js documentation which can be used for this task. Node.js libraries often give you multiple ways to do the same thing, you can just choose one and ignore the others.

4.3 Using `async` and `await`

Let’s return to the echo server implementation. In order to use await on the promise-based API, the handler for new connections (newConn) becomes an async function.

async function newConn(socket: net.Socket): Promise<void> {
    console.log('new connection', socket.remoteAddress, socket.remotePort);
    try {
        await serveClient(socket);
    } catch (exc) {
        console.error('exception:', exc);
    } finally {
        socket.destroy();
    }
}

We also wrapped our code in a try-catch block because the await statement can throw exceptions when rejected. Although you may want to actually handle errors in production code instead of using a catch-all exception handler.

// echo server
async function serveClient(socket: net.Socket): Promise<void> {
    const conn: TCPConn = soInit(socket);
    while (true) {
        const data = await soRead(conn);
        if (data.length === 0) {
            console.log('end connection');
            break;
        }

        console.log('data', data);
        await soWrite(conn, data);
    }
}

The code to use the socket now becomes straightforward. There are no callbacks to interrupt the application logic.

Note that the newConn async function is not awaited anywhere. It is simply invoked as a callback of the listening socket. This means that multiple connections are handled concurrently.

Exercise for the reader: convert the “accept” primitive to promise-based.

type TCPListener = {
    socket: net.Socket;
    // ...
};

function soListen(...): TCPListener;
function soAccept(listener: TCPListner): Promise<TCPConn>;

4.5 Discussion: Backpressure

Waiting for Socket Writes to Complete?

Our new echo server has a major difference — we now wait for socket.write() to complete. But what does the “completion of the write” mean? And why do we have to wait for it?

To answer the question, socket.write() is completed when the data is submitted to the OS, but a new question arises, why the data cannot be submitted to the OS immediately. This question actually goes deeper than network programming itself.

Producers are Bottlenecked by Consumers

Wherever there is asynchronous communication, there are queues or buffers that connect producers to consumers. Queues and buffers in our physical world are bounded in size and cannot hold an infinite amount of data. One problem with asynchronous communication is that what happens when the producer is producing faster than the consumer is consuming? There must be a mechanism to prevent the queue or buffer from overflowing. This mechanism is often called backpressure in network applications.

Backpressure in TCP: Flow Control

Backpressure in TCP is known as flow control.

The effect of flow control: TCP can pause and resume transmission so that the consumer’s receive buffer is bounded.

                           flow ctrl    bounded!
|producer| ==> |send buf| ===========> |recv buf| ==> |consumer|
    app            OS         TCP          OS            app

TCP flow control should not be confused with TCP congestion control, which also controls the window.

Backpressure Between Applications & OS

This nice mechanism needs to be implemented not only in TCP, but also in applications. Let’s focus on the producer side. The application produces data and submits it to the OS, the data goes to the send buffer, and the TCP stack consumes from the send buffer and transmits the data.

            write()  may block!
|producer| ========> |send buf| =====> ...
    app                OS        TCP

How does the OS prevent the send buffer from overflowing? Simple, the application cannot write more data when the buffer is full. Now the application is responsible for throttling itself from overproducing, because the data has to go somewhere, but memory is finite.

If the application is doing blocking IO, the call will block when the send buffer is full, so backpressure is effortless. However, this is not the case when coding in JS with an event loop.

Unbounded Queues are Footguns

We can now answer the question: why wait for writes to complete? Because while the application is waiting, it cannot produce! The socket.write() will always succeed even if the runtime cannot submit more data to the OS due to a full send buffer, but the data has to go somewhere, it goes to an unbounded internal queue in the runtime, which is a footgun that can cause unbounded memory usage.

           write()    unbounded!    event loop
|producer| ======> |internal queue| =========> |send buf| =====> ...
    app                Node.js                     OS      TCP

Taking our old echo server as an example, the server is both a producer and a consumer, as is the client. If the client produces data faster than the client consumes the echoed data (or the client does not consume any data at all), the server’s memory will grow indefinitely if the server does not wait for writes to complete.

Backpressure should exist in any system that connects producers to consumers. A rule of thumb is to look for unbounded queues in software systems, as they are a sign of the lack of backpressure.

4.6 Discussion: Events and Ordered Execution

Another difference from the old one is the use of socket.pause(). You can now understand why this is essential, because it is used to implement backpressure.

There is another reason to pause the 'data' event. In callback-based code, when the event handler returns, the runtime can fire the next 'data' event if it is not paused. The problem is that the completion of the event callback doesn’t mean the completion of the event handling — the handling can continue with further callbacks. And the interleaved handling can cause problems, considering that the data is an ordered sequence of bytes!

This situation is called a race condition, and is a class of problems related to concurrency. In this situation, unwanted concurrency is introduced.

4.7 Conclusion: Promise vs. Callback

Following the discussions above, we can now explain why we switched to the promise-based API, because there are advantages.

We have learned the basics of the socket API. Let’s move on to the next topic: protocol.