Async Generator Data Pipelines in Node.js: Stop Loading Everything Into Memory
Async generators let you build composable, memory-efficient data pipelines that handle datasets larger than RAM, with better testability and simpler error semantics than Node.js streams. Here is the pattern for pagination, transformation, fan-out, and backpressure in production.
Your data pipeline reads every row from a Postgres table into an array, transforms each record with a map(), then writes the result somewhere else. It works fine with 10,000 rows. Then someone throws 10 million rows at it and the process is OOM-killed before it finishes the first SELECT.
The standard fix is Node.js streams. But streams have a learning curve that keeps most teams cargo-culting them from Stack Overflow snippets, and their error handling is notoriously fiddly. A pipe breaking in the middle of a transformation silently loses data unless you wire every error event and pipeline wrapper correctly.
There is a simpler abstraction that handles the same use case with less ceremony: async generators. They compose with for await...of, they play well with the standard library, they support AbortSignal for cancellation, and they are just functions. If you can write a function, you can write an async generator.
Here is the production pattern for building composable, memory-efficient data pipelines with async generators, from pagination through backpressure to testability.
The core pattern: pagination without the OOM
The most common data pipeline starts with a paginated API or database query. The naive approach loads everything first:
// Bad: loads all rows into memory
async function getAllUsers() {
const allUsers = [];
let offset = 0;
const pageSize = 1000;
while (true) {
const rows = await db.query(
'SELECT * FROM users ORDER BY id LIMIT $1 OFFSET $2',
[pageSize, offset]
);
if (rows.length === 0) break;
allUsers.push(...rows);
offset += pageSize;
}
return allUsers;
}
This blows up with large datasets. The async generator version never holds more than one page in memory:
// Good: yields rows as they arrive
async function* paginateUsers(pageSize = 1000) {
let offset = 0;
while (true) {
const rows = await db.query(
'SELECT * FROM users ORDER BY id LIMIT $1 OFFSET $2',
[pageSize, offset]
);
if (rows.length === 0) break;
for (const row of rows) {
yield row;
}
offset += pageSize;
}
}
// Usage
for await (const user of paginateUsers()) {
await sendToAnalytics(user);
}
Memory profile: O(1) relative to dataset size. The function suspends after each yield, so the consumer controls when the next page is fetched.
Keyset pagination for production
The OFFSET approach degrades on large tables because Postgres still scans skipped rows. A keyset cursor avoids that:
async function* paginateUsersKeyset(batchSize = 1000) {
let cursor = null;
while (true) {
const query = cursor
? 'SELECT * FROM users WHERE id > $1 ORDER BY id LIMIT $2'
: 'SELECT * FROM users ORDER BY id LIMIT $1';
const params = cursor ? [cursor, batchSize] : [batchSize];
const rows = await db.query(query, params);
if (rows.length === 0) break;
for (const row of rows) {
yield row;
}
cursor = rows[rows.length - 1].id;
}
}
This keeps pagination fast even after millions of rows because it uses the primary key index directly.
Composing generators: transform, filter, fan-out
The real power of async generators is composition. Each step in the pipeline is its own generator that wraps the previous one. This gives you Unix-pipe-style chaining with plain functions and no stream boilerplate.
Transform
async function* anonymizeUsers(userGen: AsyncGenerator<User>) {
for await (const user of userGen) {
yield {
...user,
email: hashEmail(user.email),
phone: null,
};
}
}
for await (const anonUser of anonymizeUsers(paginateUsersKeyset())) {
await writeToExportBucket(anonUser);
}
Filter
async function* onlyActiveUsers(userGen: AsyncGenerator<User>) {
for await (const user of userGen) {
if (user.status === 'active') {
yield user;
}
}
}
for await (const user of onlyActiveUsers(paginateUsersKeyset())) {
await sendNewsletter(user);
}
Batch (window into chunks)
Sometimes you need to batch rows for efficient writes. A batch generator wraps the stream:
async function* batchUsers(
userGen: AsyncGenerator<User>,
batchSize = 100
) {
let batch: User[] = [];
for await (const user of userGen) {
batch.push(user);
if (batch.length >= batchSize) {
yield batch;
batch = [];
}
}
if (batch.length > 0) {
yield batch; // partial final batch
}
}
for await (const batch of batchUsers(paginateUsersKeyset(), 500)) {
await bulkInsertAnalytics(batch);
}
Fan-out (one-to-many)
A single database row can yield multiple output records:
async function* expandOrders(userGen: AsyncGenerator<User>) {
for await (const user of userGen) {
const orders = await db.query(
'SELECT * FROM orders WHERE user_id = $1',
[user.id]
);
for (const order of orders) {
yield { user: user.id, order: order.id, total: order.total };
}
}
}
These compose naturally:
const pipeline = batchUsers(
anonymizeUsers(
onlyActiveUsers(
paginateUsersKeyset()
)
),
500
);
for await (const batch of pipeline) {
await bulkInsertAnalytics(batch);
}
Each layer is a generator, so the inner ones only produce values as fast as the outer ones consume them.
Backpressure: how generators handle it for free
Backpressure is where streams shine and where async generators match them beat-for-beat without the plumbing.
When you iterate with for await...of, the consumer blocks the generator at each yield. The generator cannot produce a new value until the consumer finishes processing the current one. This is pushback by design:
async function* fastProducer() {
for (let i = 0; i < 100_000; i++) {
yield i;
}
}
async function slowConsumer() {
for await (const val of fastProducer()) {
await slowExternalCall(val); // blocks the generator
}
}
The generator pauses at each yield until slowExternalCall resolves. The memory footprint stays small because the producer cannot get ahead of the consumer.
Compare with a naive buffered approach where the producer loads 100k items into an array and the consumer processes them one at a time. The memory graph is flat from start to finish with the generator, while the array version spikes to hold everything at once.
Concurrency within the consumer
Sometimes you want to process items concurrently while still keeping backpressure. Use a bounded concurrency pattern inside the consumer loop:
async function concurrentProcess<T>(
gen: AsyncGenerator<T>,
concurrency: number,
handler: (item: T) => Promise<void>
) {
const running = new Set<Promise<void>>();
for await (const item of gen) {
const task = handler(item).finally(() => running.delete(task));
running.add(task);
if (running.size >= concurrency) {
await Promise.race(running); // backpressure: wait for a slot
}
}
await Promise.all(running); // drain remaining
}
await concurrentProcess(
paginateUsersKeyset(),
10,
async (user) => sendToAnalytics(user)
);
This keeps N tasks in flight at all times without buffering the entire dataset. The generator only produces new items when a slot opens up.
Error handling: simpler than streams
Stream error handling is treacherous. A single unhandled error event on a Readable crashes the process. The pipeline() helper from stream/promises helps, but it still wraps a fundamentally event-driven API into promise-land.
Async generators use regular try/catch:
async function* safePaginate(pageSize = 1000) {
let offset = 0;
let consecutiveErrors = 0;
while (true) {
try {
const rows = await db.query(
'SELECT * FROM users ORDER BY id LIMIT $1 OFFSET $2',
[pageSize, offset]
);
consecutiveErrors = 0;
if (rows.length === 0) break;
for (const row of rows) yield row;
offset += pageSize;
} catch (err) {
consecutiveErrors++;
if (consecutiveErrors >= 3) {
throw new Error(`Pagination failed after 3 consecutive errors: ${err.message}`);
}
await sleep(1000 * consecutiveErrors); // backoff
}
}
}
The consumer sees a single rejected promise if the pipeline fails after retries. No data event, no error event, no end event, no close event, no worry about whether the stream is in flowing or paused mode. Just a thrown exception in a try/catch.
Transactional processing with rollback
Because async generators are plain functions, you can wrap the pipeline in a transaction:
async function processInTransaction() {
const client = await pool.connect();
try {
await client.query('BEGIN');
for await (const user of paginateUsersKeyset()) {
await client.query(
'UPDATE users SET processed_at = NOW() WHERE id = $1',
[user.id]
);
}
await client.query('COMMIT');
} catch (err) {
await client.query('ROLLBACK');
throw err;
} finally {
client.release();
}
}
If the consumer throws partway through the pipeline, the transaction rolls back. There is no stream cleanup, no destroy(), no unpipe(). Just a finally block.
Cancellation with AbortSignal
Generators accept signals naturally. Pass an AbortSignal into the generator function and check it between yields:
async function* paginateWithCancellation(
signal: AbortSignal,
pageSize = 1000
) {
let offset = 0;
while (!signal.aborted) {
const rows = await db.query(
'SELECT * FROM users ORDER BY id LIMIT $1 OFFSET $2',
[pageSize, offset]
);
if (rows.length === 0) break;
for (const row of rows) {
if (signal.aborted) return;
yield row;
}
offset += pageSize;
}
}
// Usage with a timeout
const controller = new AbortController();
setTimeout(() => controller.abort(), 30_000);
try {
for await (const user of paginateWithCancellation(controller.signal)) {
await processUser(user);
}
} catch (err) {
if (controller.signal.aborted) {
console.log('Pipeline cancelled after timeout');
} else {
throw err;
}
}
The generator stops yielding as soon as the signal fires. The database query in flight still runs to completion, but no further queries are sent.
Testability: generators are just functions
This is the killer feature. Because each stage is a standalone async generator, you can test it in isolation with a fake input generator:
import { describe, it, expect } from 'node:test';
async function* fakeUsers(users: User[]) {
for (const user of users) yield user;
}
it('anonymizes email addresses', async () => {
const input = fakeUsers([
{ id: 1, email: 'alice@example.com', phone: '555-0100', status: 'active' },
{ id: 2, email: 'bob@example.com', phone: '555-0101', status: 'inactive' },
]);
const results: User[] = [];
for await (const user of anonymizeUsers(input)) {
results.push(user);
}
expect(results[0].email).not.toContain('alice');
expect(results[0].phone).toBeNull();
expect(results[0].status).toBe('active');
});
No mocking a database. No spinning up Docker. No sinon stubs. The test creates an array, wraps it in a tiny generator, pipes it through the transform, and asserts on the output. This makes pipeline tests fast, deterministic, and easy to write.
Testing error paths
it('retries on transient errors', async () => {
let attempts = 0;
async function* flakySource() {
for (let i = 0; i < 5; i++) {
attempts++;
if (attempts <= 2) throw new Error('connection reset');
yield { id: i };
}
}
const results = [];
for await (const item of safePaginateFrom(flakySource())) {
results.push(item);
}
expect(results).toHaveLength(5);
expect(attempts).toBe(7); // 2 failures + 5 successful yields
});
You control exactly when errors happen and observe exactly how the generator responds. No timing-dependent flaky tests.
When not to use async generators
Async generators are not a universal replacement for streams. Here is where streams still win:
- Binary data: JPEG, video, large binary payloads. Streams operate on Buffers without encoding overhead.
- Very high throughput: If you are processing hundreds of thousands of operations per second, the per-iteration cost of the generator overhead (the hidden state machine that
async function*compiles to) becomes measurable. Streams push data in chunks with less per-event overhead. - Piping through existing stream transformers: If your ecosystem already speaks streams (Sharp for images, zlib for compression, multiparty for form parsing), wrapping generators around them adds ceremony without benefit.
For everything else (paginated APIs, ETL, CSV/JSON transformation, database migration scripts, report generation), async generators are simpler to write, simpler to test, and simpler to reason about than streams. They compose with for await...of, they handle errors with try/catch, and they give you backpressure for free.
Real pipeline: export 5 million records to CSV
Here is the pattern assembled into a production export job:
import { createWriteStream } from 'node:fs';
import { pipeline } from 'node:stream/promises';
async function* exportOrders(since: Date) {
let cursor = null;
const batchSize = 5000;
while (true) {
const query = cursor
? 'SELECT * FROM orders WHERE created_at >= $1 AND id > $2 ORDER BY id LIMIT $3'
: 'SELECT * FROM orders WHERE created_at >= $1 ORDER BY id LIMIT $2';
const params = cursor
? [since, cursor, batchSize]
: [since, batchSize];
const rows = await db.query(query, params);
if (rows.length === 0) break;
for (const row of rows) yield row;
cursor = rows[rows.length - 1].id;
}
}
async function* toCsv(orderGen: AsyncGenerator<Order>): AsyncGenerator<string> {
let isFirst = true;
for await (const order of orderGen) {
if (isFirst) {
yield 'id,created_at,total,status\n';
isFirst = false;
}
const escaped = [
order.id,
order.created_at.toISOString(),
order.total.toFixed(2),
order.status,
].join(',');
yield escaped + '\n';
}
}
// Stream the generator output into a file
const fileStream = createWriteStream('/tmp/orders-export.csv');
for await (const line of toCsv(exportOrders(new Date('2026-01-01')))) {
fileStream.write(line);
}
fileStream.end();
await new Promise((resolve, reject) => {
fileStream.on('finish', resolve);
fileStream.on('error', reject);
});
Memory usage: one page of orders (5,000 rows) plus one CSV line buffer. The 5 million rows flow through without accumulating.
The takeaway
Async generators give you composable, memory-efficient pipelines without the ceremony of streams. They compose with for await...of, handle errors with try/catch, support AbortSignal for cancellation, and are trivially testable by injecting a fake generator.
The next time you write a script that loads data into an array, paginates through an API, or transforms every row in a table, reach for async function*. Start with pagination, add transforms as generators, and compose them in the consumer. The memory graph stays flat, the code stays readable, and the next person who adds a new step to the pipeline just writes another generator.
A note from Yojji
Building data pipelines that process millions of records without OOM-killing the process is exactly the kind of production engineering that separates a prototype from a platform. The patterns in this post (keyset pagination, generator composition, backpressure-aware concurrency) are the kind of infrastructure decisions that keep services stable as data grows. Yojji is an international custom software development company founded in 2016, with offices in Europe, the US, and the UK. Their teams specialize in the JavaScript ecosystem (React, Node.js, TypeScript), cloud platforms (AWS, Azure, Google Cloud), and full-cycle product engineering from discovery through DevOps. If your data processing pipeline is held together by batch scripts that nobody wants to touch, Yojji’s teams have built this pattern at scale and can help ship a production-grade replacement.