Database Sharding with PostgreSQL and Node.js: A Practical Guide to Horizontal Scaling
Your single PostgreSQL database handles 1M users just fine. At 10M users writes start queuing, the replication lag creeps up, and every query feels like it is running through molasses. Here is how to shard PostgreSQL at the application layer with Node.js, including shard key selection, query routing, cross-shard operations, and the rebalancing strategy that will save your weekend.
At 2:47 AM the PagerDuty alert fires. Replication lag on the primary database has climbed to 45 seconds. The read replicas are serving stale data faster than the primary can ship WAL segments. By 3:00 AM the CPU is pegged at 98%, connection pool exhaustion is tripping health checks, and your p99 query latency has gone from 12ms to 8 seconds. The launch that added 500,000 new users last week found the ceiling. Your single PostgreSQL instance cannot write fast enough.
You have already done the easy things: vertical scaling (bigger instance), connection pooling (PgBouncer), read replicas, materialized views. They bought you a year. Now you need horizontal scaling. You need to shard.
Sharding is the dirty secret of every database that claims to “scale infinitely.” PostgreSQL does not shard itself. The community tools (Citus, pg_partman) help, but at a certain scale you are writing the sharding logic in your application layer anyway. This post covers a pragmatic, application-level sharding strategy for Node.js and PostgreSQL that has handled tens of millions of users in production.
What sharding actually means
Sharding splits your data across multiple independent PostgreSQL instances (shards) based on a shard key. Every shard is a full, independent database. There is no shared storage. There is no cross-shard query magic. If you need data from shard 1 and shard 2, your application makes two queries and joins the results in memory.
The three things you must design before writing a single line of sharding code:
- Shard key — the column or combination of columns that determines which shard a row lives on.
- Routing layer — the logic that maps a shard key to a specific database connection.
- Resharding plan — what happens when your cluster grows and shards need to split.
Most teams skip step three. That is the one that destroys them.
Step 1: Choose the right shard key
The shard key is the single most important decision in your sharding architecture. Get it wrong and every query either scans all shards or hits the wrong one. Get it right and 95% of your queries hit exactly one shard.
Bad shard keys
- Auto-incrementing integer IDs. User 1 through User 1M go to shard 1, User 1M+1 through User 2M go to shard 2. This produces hot shards. The latest users are always hitting the last shard.
- Timestamps without a prefix. Sharding by
created_atmonth sounds clever until your “January 2026” shard handles 40% of all traffic because you onboarded a big customer that month. - UUID v4. Perfectly distributed but completely meaningless. You cannot route a query without knowing the UUID upfront, which means every query requires a lookup step.
Good shard keys
- User ID or Tenant ID for multi-tenant or user-partitioned workloads.
- Geographic region if your data naturally partitions by location.
- Customer ID hashed into a consistent bucket (the most common production pattern).
Here is the rule: your shard key should appear in every WHERE clause that your application generates. If you cannot put WHERE user_id = ? on 90% of your queries, you have chosen the wrong shard key.
Consistent hashing for your shard key
Instead of shard_id = user_id % N (which breaks when N changes), use consistent hashing. The ring-based approach minimizes the amount of data that must move when you add or remove shards.
import { createHash } from 'node:crypto';
class ConsistentHashRing {
private ring: Array<{ hash: number; shard: string }> = [];
private virtualNodes: number;
constructor(shards: string[], virtualNodes = 100) {
this.virtualNodes = virtualNodes;
for (const shard of shards) {
for (let i = 0; i < virtualNodes; i++) {
const hash = this.hash(`${shard}:${i}`);
this.ring.push({ hash, shard });
}
}
this.ring.sort((a, b) => a.hash - b.hash);
}
private hash(key: string): number {
return createHash('md5').update(key).digest().readUInt32BE(0);
}
getShard(key: string): string {
if (this.ring.length === 0) throw new Error('No shards configured');
const hash = this.hash(key);
// Binary search for the first node with hash >= key hash
let low = 0, high = this.ring.length;
while (low < high) {
const mid = (low + high) >>> 1;
if (this.ring[mid].hash >= hash) high = mid;
else low = mid + 1;
}
// Wrap around if nothing greater found
return this.ring[low % this.ring.length].shard;
}
getShards(): string[] {
return [...new Set(this.ring.map((n) => n.shard))];
}
}
The virtualNodes parameter controls distribution. Too few virtual nodes and the ring is unbalanced. Too many and the lookup is slower. 100 to 200 virtual nodes per physical shard is a good starting point.
Step 2: Build the routing layer
The routing layer maps a shard key to a PostgreSQL connection pool. Every shard gets its own pool. Never share pools across shards.
import pg from 'pg';
interface ShardConfig {
name: string;
host: string;
port: number;
database: string;
user: string;
password: string;
poolSize: number;
}
class ShardRouter {
private pools: Map<string, pg.Pool> = new Map();
private ring: ConsistentHashRing;
constructor(configs: ShardConfig[]) {
const names = configs.map((c) => c.name);
this.ring = new ConsistentHashRing(names, 150);
for (const config of configs) {
this.pools.set(
config.name,
new pg.Pool({
host: config.host,
port: config.port,
database: config.database,
user: config.user,
password: config.password,
max: config.poolSize,
idleTimeoutMillis: 30000,
connectionTimeoutMillis: 5000,
})
);
}
}
getPool(shardKey: string): pg.Pool {
const shardName = this.ring.getShard(shardKey);
const pool = this.pools.get(shardName);
if (!pool) throw new Error(`No pool for shard: ${shardName}`);
return pool;
}
async query(shardKey: string, text: string, params?: any[]) {
const pool = this.getPool(shardKey);
return pool.query(text, params);
}
async queryAll(text: string, params?: any[]): Promise<any[][]> {
// Scatter-gather: run the query on every shard and collect results.
const results: any[][] = [];
for (const [, pool] of this.pools) {
const { rows } = await pool.query(text, params);
results.push(rows);
}
return results;
}
async healthCheck(): Promise<Map<string, boolean>> {
const results = new Map<string, boolean>();
for (const [name, pool] of this.pools) {
try {
await pool.query('SELECT 1');
results.set(name, true);
} catch {
results.set(name, false);
}
}
return results;
}
}
The query method is for queries that target a single shard (the common case). The queryAll method is the scatter-gather pattern for queries that need to touch every shard (reporting, admin dashboards, global searches). Use queryAll sparingly. Every scatter-gather query is O(n) shards in latency and load.
Connection management per shard
Each shard needs its own connection pool tuned to that shard’s capacity. A common mistake is using the same pool size for all shards. If shard 1 has 8 vCPUs and shard 2 has 16 vCPUs, their pool sizes should reflect that difference.
async function createShardPools(configs: ShardConfig[]): Promise<ShardRouter> {
// Warm up all pools in parallel on startup
const router = new ShardRouter(configs);
const health = await router.healthCheck();
for (const [name, ok] of health) {
if (!ok) {
console.error(`Shard ${name} failed health check at startup`);
}
}
return router;
}
Warm up every pool during application startup, not lazily on the first request. A cold pool triggers a cascade of connection creations under load, which causes its own latency spike.
Step 3: Design your data model for sharding
Every table that grows with your user base needs the shard key as part of its primary key. This is non-negotiable.
The tenant-aware schema
-- On every shard, the same schema but different data
CREATE TABLE users (
user_id UUID PRIMARY KEY,
email TEXT NOT NULL,
name TEXT NOT NULL,
created_at TIMESTAMPTZ DEFAULT NOW()
);
CREATE TABLE orders (
order_id UUID PRIMARY KEY,
user_id UUID NOT NULL, -- This is your shard key
total NUMERIC(10,2) NOT NULL,
status TEXT NOT NULL DEFAULT 'pending',
created_at TIMESTAMPTZ DEFAULT NOW()
);
-- The shard key MUST appear in all critical indexes
CREATE INDEX idx_orders_user_id ON orders (user_id);
CREATE INDEX idx_orders_created_at ON orders (created_at);
The user_id column doubles as the shard key. Every query that fetches orders includes WHERE user_id = ?. The router reads the user_id parameter, hashes it through the consistent hash ring, and sends the query to the correct shard.
Cross-shard references
Foreign keys cannot span shards. If your application relies on referential integrity from PostgreSQL, you must drop it for cross-shard relationships. Enforce those constraints in application code instead.
// Never rely on DB foreign keys across shards.
// Instead, verify existence in application code:
async function verifyUserExists(userId: string): Promise<boolean> {
const result = await shardRouter.query(userId,
'SELECT EXISTS(SELECT 1 FROM users WHERE user_id = $1)',
[userId]
);
return result.rows[0].exists;
}
async function createOrder(userId: string, total: number) {
// Application-level foreign key check
const userExists = await verifyUserExists(userId);
if (!userExists) throw new Error(`User ${userId} not found`);
await shardRouter.query(userId,
`INSERT INTO orders (order_id, user_id, total, status)
VALUES (gen_random_uuid(), $1, $2, 'pending')`,
[userId, total]
);
}
This feels wrong if you come from a “database enforces everything” background. Get over it. At sharding scale, application-level integrity is the trade-off you make for horizontal write throughput.
Step 4: Handle cross-shard queries
Not every query targets a single shard. Admin dashboards, global search, and reporting tools need to see data from all shards. These are the queries that will break your system if you do not design for them.
The scatter-gather pattern
interface GlobalOrderSummary {
date: string;
total_orders: number;
total_revenue: number;
}
async function getGlobalOrderSummary(
startDate: Date,
endDate: Date
): Promise<GlobalOrderSummary[]> {
const results = await shardRouter.queryAll(
`SELECT
DATE(created_at) AS date,
COUNT(*) AS total_orders,
SUM(total) AS total_revenue
FROM orders
WHERE created_at >= $1 AND created_at < $2
GROUP BY DATE(created_at)
ORDER BY date`,
[startDate, endDate]
);
// Merge results from all shards
const merged = new Map<string, { orders: number; revenue: number }>();
for (const rows of results) {
for (const row of rows) {
const existing = merged.get(row.date) || { orders: 0, revenue: 0 };
existing.orders += Number(row.total_orders);
existing.revenue += Number(row.total_revenue);
merged.set(row.date, existing);
}
}
return Array.from(merged.entries())
.map(([date, data]) => ({
date,
total_orders: data.orders,
total_revenue: data.revenue,
}))
.sort((a, b) => a.date.localeCompare(b.date));
}
Every scatter-gather query touches all shards and waits for the slowest one. If shard 3 is having a bad day, every dashboard page is slow. This is where you need timeouts per shard:
async function queryAllWithTimeout(
text: string,
params: any[],
timeoutMs: number = 5000
): Promise<any[][]> {
const promises: Promise<{ shard: string; rows: any[] }>[] = [];
for (const [name, pool] of shardRouter.pools) {
const promise = pool.query(text, params).then((r) => ({
shard: name,
rows: r.rows,
}));
// Wrap each shard query with an individual timeout
const timed = Promise.race([
promise,
new Promise<{ shard: string; rows: any[] }>((_, reject) =>
setTimeout(
() => reject(new Error(`Shard ${name} timed out after ${timeoutMs}ms`)),
timeoutMs
)
),
]);
promises.push(timed);
}
const results = await Promise.allSettled(promises);
const rows: any[][] = [];
for (const result of results) {
if (result.status === 'fulfilled') {
rows.push(result.value.rows);
} else {
// Log the failure, maybe increment a metric
console.error('Scatter-gather shard failed:', result.reason);
}
}
return rows;
}
A shard timeout prevents one slow shard from taking down your entire dashboard. Return partial data with a warning banner. Partial data is infinitely better than a timeout error.
Step 5: Plan for resharding
You will need to add shards. Your user base will grow, or one shard will become a hot spot, or you will need to move to bigger instances. Resharding without a plan is a multi-day outage waiting to happen.
The double-write strategy
The safest resharding approach is double-writes: write to both the old shard mapping and the new shard mapping during the migration window.
class MigrationRouter {
private oldRouter: ShardRouter;
private newRouter: ShardRouter;
private migrationCutoff: Date;
constructor(oldRouter: ShardRouter, newRouter: ShardRouter) {
this.oldRouter = oldRouter;
this.newRouter = newRouter;
this.migrationCutoff = new Date();
}
async write(shardKey: string, text: string, params: any[]) {
// Write to both old and new routing. The old router may route to a
// different shard than the new router for this shardKey.
await Promise.all([
this.oldRouter.query(shardKey, text, params),
this.newRouter.query(shardKey, text, params),
]);
}
async read(shardKey: string, text: string, params: any[]) {
// Read from the new router first, fall back to old.
// During migration the new shards may have incomplete data.
try {
return await this.newRouter.query(shardKey, text, params);
} catch {
return this.oldRouter.query(shardKey, text, params);
}
}
}
The migration process has four phases:
Phase 1: Deploy the new router. The application starts writing every insert/update to both the old shard and the new shard. Reads still come from the old shard.
Phase 2: Backfill historical data. A background job reads every row from the old shards and inserts it into the new shards. Run this at a throttled rate to avoid saturating the database.
async function backfillUserData(
oldRouter: ShardRouter,
newRouter: ShardRouter,
batchSize = 1000
) {
const oldShards = oldRouter.getShards();
for (const shard of oldShards) {
let offset = 0;
let hasMore = true;
while (hasMore) {
const { rows } = await oldRouter.query(shard,
`SELECT * FROM users ORDER BY user_id LIMIT $1 OFFSET $2`,
[batchSize, offset]
);
if (rows.length === 0) {
hasMore = false;
break;
}
// Batch insert into the new routing
const insertPromises = rows.map((row: any) =>
newRouter.query(row.user_id,
`INSERT INTO users (user_id, email, name, created_at)
VALUES ($1, $2, $3, $4)
ON CONFLICT (user_id) DO NOTHING`,
[row.user_id, row.email, row.name, row.created_at]
)
);
await Promise.all(insertPromises);
offset += batchSize;
// Throttle: wait 100ms between batches
await new Promise((r) => setTimeout(r, 100));
}
}
}
Phase 3: Compare and verify. Run checksum queries on both old and new shards to verify data consistency before cutting over.
Phase 4: Cut over. Flip reads to the new router. Remove the double-write code. Decommission the old shards.
What about hot spots?
Sometimes one shard gets more traffic than the others regardless of your hash distribution. A single large tenant, a viral feature, or a data skew in your shard key can create a hot shard.
The fix is sub-sharding: split the hot shard’s key space into smaller ranges and redistribute them across the cluster.
function hotShardShardKey(userId: string): string {
const tenantPrefix = userId.substring(0, 8); // e.g., 'tenant_abc'
const subShard = ConsistentHashRing.hash(userId) % 4; // 4 sub-shards
return `${tenantPrefix}:${subShard}`;
}
Apply sub-sharding to the hot tenant only. Do not change the global routing for everyone else.
Step 6: Monitor shard health
Every shard needs independent monitoring. A slow query on shard 2 should not hide behind the average latency of all shards.
interface ShardMetrics {
name: string;
latencyP50: number;
latencyP99: number;
activeConnections: number;
idleConnections: number;
waitingQueries: number;
diskUsageBytes: number;
replicationLag: number;
}
async function collectShardMetrics(
shardRouter: ShardRouter
): Promise<ShardMetrics[]> {
const metrics: ShardMetrics[] = [];
for (const [name, pool] of shardRouter.pools) {
const client = await pool.connect();
try {
// Get pool stats
const activeCount = pool.totalCount - pool.idleCount;
const waitingCount = pool.waitingCount;
// Get disk usage
const { rows: diskRows } = await client.query(`
SELECT pg_database_size(current_database()) AS bytes
`);
// Get replication lag (if this is a replica)
const { rows: lagRows } = await client.query(`
SELECT COALESCE(
EXTRACT(SECONDS FROM pg_last_wal_replay_lag()),
0
) AS lag_seconds
`);
metrics.push({
name,
latencyP50: 0, // populated from your metrics system
latencyP99: 0,
activeConnections: activeCount,
idleConnections: pool.idleCount,
waitingQueries: waitingCount,
diskUsageBytes: Number(diskRows[0].bytes),
replicationLag: Number(lagRows[0].lag_seconds),
});
} finally {
client.release();
}
}
return metrics;
}
Expose these metrics per shard to your monitoring system. Alert on individual shard health, not cluster averages. A 50ms average across 8 shards hides the one shard running at 500ms.
Putting it all together
Here is a complete Express route that uses the sharding layer:
import express from 'express';
const app = express();
app.use(express.json());
const shardRouter = new ShardRouter(shardConfigs);
// Single-shard query: fast path
app.get('/users/:userId/orders', async (req, res) => {
const { userId } = req.params;
const { status, limit = 20, offset = 0 } = req.query;
let query = 'SELECT * FROM orders WHERE user_id = $1';
const params: any[] = [userId];
if (status) {
query += ' AND status = $2';
params.push(status);
}
query += ' ORDER BY created_at DESC LIMIT $3 OFFSET $4';
params.push(limit, offset);
try {
const result = await shardRouter.query(userId, query, params);
res.json({ orders: result.rows, total: result.rowCount });
} catch (err) {
console.error('Query failed:', err);
res.status(500).json({ error: 'Failed to fetch orders' });
}
});
// Cross-shard query: use scatter-gather with timeout
app.get('/admin/orders/summary', async (req, res) => {
try {
const results = await getGlobalOrderSummary(
new Date(Date.now() - 7 * 24 * 60 * 60 * 1000),
new Date()
);
res.json({ summary: results });
} catch (err) {
res.status(500).json({ error: 'Failed to generate summary' });
}
});
What sharding does not solve
Sharding fixes write throughput. It does not fix:
- Bad indexes. Sharding a table with no indexes on
user_idstill produces sequential scans on every shard. - Bad queries. An N+1 query pattern becomes N+1 per shard, multiplied by the number of shards.
- Connection bloat. Each shard needs its own pool. If you have 8 shards with 20 connections each, that is 160 connections total. Your application server needs enough file descriptors and memory to handle that.
- Joins across shards. PostgreSQL cannot join tables across different database servers. Your application must do it.
Do not shard until you have exhausted vertical scaling, query optimization, read replicas, and connection pooling. Sharding adds operational complexity that is worth the cost only when the single-instance ceiling is real and measured.
A note from Yojji
The careful, operational engineering described in this post is not something most teams learn on the job without paying a few painful incident bills first. Choosing the right shard key, building a routing layer that survives a hot shard without toppling the whole cluster, and executing a no-downtime resharding migration all require production experience that is hard to buy in books or blog posts.
That is exactly the kind of work Yojji delivers. Yojji is an international custom software development company, founded in 2016, with offices in Europe, the US, and the UK. Their teams specialize in the JavaScript stack (Node.js, TypeScript, React), cloud platforms (AWS, Azure, Google Cloud), and building scalable distributed systems that handle tens of millions of users. If your team needs senior engineering muscle to design and implement a database architecture that grows with your business, Yojji is worth a conversation.