Event Sourcing with PostgreSQL: The Pragmatic 80% Solution
Your product team wants an audit trail, replayable history, and the ability to rebuild read models without running migrations on a 500GB table. Here is how to implement event sourcing in PostgreSQL without Kafka, schema registries, or six months of migration pain — just an append-only table, a projection function, and the replay logic that makes it useful.
The support ticket was simple: “Show me exactly how this order got to status shipped.” You looked at the orders table. It has a status column, but that column only stores the current value. The previous value — paid, packed, in_transit — was overwritten each time. You have no record of who changed it, when, or why. You have a snapshot, not a history.
The usual fix is an order_status_history table with order_id, old_status, new_status, changed_at, and changed_by. That works until the next ticket: “Show me how the shipping address changed too.” So you add order_address_history. Then order_item_history. Then order_payment_history. By the fourth table you are maintaining a shadow schema that is half the size of your real one, and every migration needs to touch two places.
Event sourcing replaces all of that with one idea: store the facts, not the state. Instead of updating a row, you append an event. OrderCreated, OrderPaid, OrderShipped. The current state is a projection — a left fold over the event stream. If you want to know how the order got to shipped, you read the events. If you want to add a new read model, you replay the events and build it. No migration scripts. No history tables. One append-only log.
The enterprise version of this uses Kafka, Avro schemas, event stores, and snapshot rebuilds that take a weekend. The pragmatic version uses PostgreSQL, a single table, and a few hundred lines of TypeScript. This post is the pragmatic version.
The schema: one table, no migrations
The events table is intentionally minimal. Every event is a row. The payload is JSONB so you do not need a schema change when a new event type needs a new field.
CREATE TABLE events (
id BIGSERIAL PRIMARY KEY,
stream_id TEXT NOT NULL,
stream_type TEXT NOT NULL,
event_type TEXT NOT NULL,
event_version INT NOT NULL DEFAULT 1,
payload JSONB NOT NULL,
metadata JSONB NOT NULL DEFAULT '{}',
occurred_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
sequence BIGINT NOT NULL
);
CREATE UNIQUE INDEX idx_events_stream_sequence
ON events(stream_id, sequence);
CREATE INDEX idx_events_stream_type_occurred
ON events(stream_type, occurred_at);
CREATE INDEX idx_events_event_type
ON events(event_type, occurred_at);
stream_idis the aggregate identifier: an order UUID, a user ID, a workspace ID.stream_typeis the aggregate type:order,user,inventory_item. It keeps your indexes selective.sequenceis the optimistic concurrency control. The first event for a stream hassequence = 1, the secondsequence = 2, and so on. The unique index guarantees that two concurrent writers cannot append the same sequence number. One succeeds, the other gets a unique-violation error and retries.payloadis the event data. JSONB is queryable, indexable, and schema-flexible.metadataholdscorrelation_id,actor_id,ip_address, or anything else you need for observability and audit.
That is it. No foreign keys. No UPDATE. No DELETE. Append only.
Appending an event with optimistic concurrency control
The critical operation is appendEvent. It inserts the next sequence number for a stream, or fails if another writer got there first.
import { Pool } from 'pg';
interface Event {
streamId: string;
streamType: string;
eventType: string;
eventVersion: number;
payload: unknown;
metadata?: Record<string, unknown>;
sequence: number;
}
export async function appendEvent(
pool: Pool,
event: Event
): Promise<void> {
const client = await pool.connect();
try {
await client.query('BEGIN');
await client.query(
`INSERT INTO events (
stream_id, stream_type, event_type, event_version,
payload, metadata, occurred_at, sequence
) VALUES ($1, $2, $3, $4, $5, $6, NOW(), $7)`,
[
event.streamId,
event.streamType,
event.eventType,
event.eventVersion,
JSON.stringify(event.payload),
JSON.stringify(event.metadata ?? {}),
event.sequence,
]
);
await client.query('COMMIT');
} catch (err: any) {
await client.query('ROLLBACK');
// Unique violation on (stream_id, sequence) means a concurrent append won.
if (err.code === '23505' && err.constraint === 'idx_events_stream_sequence') {
throw new Error(
`Concurrency conflict on stream ${event.streamId} sequence ${event.sequence}`
);
}
throw err;
} finally {
client.release();
}
}
The caller decides the sequence. Typically they read the current state first — which gives them the highest existing sequence — then append currentMax + 1. If two callers do this simultaneously, one wins, the other gets 23505, and the application retries.
This is the same optimistic locking pattern you already use on version columns. The only difference is that here the version is the event sequence, and the row is never updated.
Reading a stream: rebuilding state on demand
The simplest projection is a function that reads every event for a stream and returns the current state. You do not need a framework for this. A reduce over the event array is enough.
export async function readStream(
pool: Pool,
streamId: string
): Promise<Event[]> {
const { rows } = await pool.query(
`SELECT * FROM events
WHERE stream_id = $1
ORDER BY sequence ASC`,
[streamId]
);
return rows;
}
interface OrderState {
orderId: string;
customerId: string;
items: Array<{ sku: string; qty: number }>;
status: 'pending' | 'paid' | 'shipped' | 'cancelled';
shippingAddress?: string;
paidAt?: Date;
shippedAt?: Date;
}
function projectOrder(events: Event[]): OrderState | null {
return events.reduce<OrderState | null>((state, ev) => {
switch (ev.event_type) {
case 'OrderCreated': {
const p = ev.payload as any;
return {
orderId: p.orderId,
customerId: p.customerId,
items: p.items,
status: 'pending',
};
}
case 'OrderPaid': {
if (!state) throw new Error('OrderPaid before OrderCreated');
return { ...state, status: 'paid', paidAt: new Date(ev.occurred_at) };
}
case 'OrderShipped': {
if (!state) throw new Error('OrderShipped before OrderCreated');
return {
...state,
status: 'shipped',
shippedAt: new Date(ev.occurred_at),
shippingAddress: (ev.payload as any).address ?? state.shippingAddress,
};
}
case 'OrderCancelled': {
if (!state) throw new Error('OrderCancelled before OrderCreated');
return { ...state, status: 'cancelled' };
}
default:
return state;
}
}, null);
}
Usage:
const events = await readStream(pool, 'order-123');
const order = projectOrder(events);
console.log(order?.status); // 'shipped'
For an order with fifty events, this query is sub-millisecond. The index on (stream_id, sequence) makes it a simple index scan. If you have a stream with ten thousand events — an unusually chatty aggregate — you add a snapshot table, not a new event store.
Snapshots: trade accuracy for speed
Snapshots are read-only caches of a projection at a specific sequence. They are not the source of truth; the event log is. Rebuild them whenever you want.
CREATE TABLE order_snapshots (
stream_id TEXT PRIMARY KEY,
sequence BIGINT NOT NULL,
state JSONB NOT NULL,
projected_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
export async function saveSnapshot(
pool: Pool,
streamId: string,
sequence: number,
state: unknown
): Promise<void> {
await pool.query(
`INSERT INTO order_snapshots (stream_id, sequence, state, projected_at)
VALUES ($1, $2, $3, NOW())
ON CONFLICT (stream_id) DO UPDATE
SET sequence = EXCLUDED.sequence,
state = EXCLUDED.state,
projected_at = EXCLUDED.projected_at`,
[streamId, sequence, JSON.stringify(state)]
);
}
export async function loadSnapshot(
pool: Pool,
streamId: string
): Promise<{ sequence: number; state: OrderState } | null> {
const { rows } = await pool.query(
`SELECT sequence, state FROM order_snapshots WHERE stream_id = $1`,
[streamId]
);
if (!rows[0]) return null;
return { sequence: rows[0].sequence, state: rows[0].state };
}
To read with a snapshot: load the snapshot, then read only events with sequence > snapshot.sequence, and fold them over the snapshot state. This turns a 10,000-event read into a 3-event read.
Snapshots are not required on day one. Start without them. When a query slows down — measure first, guess never — add snapshots for that aggregate type only.
Projections: building read models without migrations
The real power of event sourcing is not the audit log. It is the ability to create a new read model by replaying history. Suppose the marketing team wants a table of daily_revenue_by_category. In a CRUD system, you write a backfill script that scans orders and order_items, handles every edge case in the current schema, and runs for six hours.
In an event-sourced system, you write a projector function and run it over the event log.
async function buildDailyRevenue(pool: Pool, start: Date, end: Date) {
const { rows: events } = await pool.query(
`SELECT * FROM events
WHERE event_type = 'OrderPaid'
AND occurred_at BETWEEN $1 AND $2
ORDER BY occurred_at ASC`,
[start, end]
);
const revenue = new Map<string, number>();
for (const ev of events) {
const p = ev.payload as any;
for (const item of p.items) {
const day = ev.occurred_at.toISOString().slice(0, 10);
const key = `${day}:${item.category}`;
revenue.set(key, (revenue.get(key) ?? 0) + item.price * item.qty);
}
}
for (const [key, total] of revenue) {
const [day, category] = key.split(':');
await pool.query(
`INSERT INTO daily_revenue (day, category, total)
VALUES ($1, $2, $3)
ON CONFLICT (day, category) DO UPDATE SET total = EXCLUDED.total`,
[day, category, total]
);
}
}
If the definition of “revenue” changes — exclude refunds, add tax, whatever — you drop the daily_revenue table, adjust the projector, and replay. The events are immutable facts; the projections are disposable views.
The traps that make event sourcing painful
Trap 1: Putting the current state in the events table. The events table stores facts. If you add a current_status column and update it on every append, you have re-created the CRUD table with extra steps. Keep projections separate.
Trap 2: Global ordering addiction. Some systems use a single sequence column across all streams and insist on a total order. PostgreSQL can do this, but the insert throughput drops because every append contends for the same index hot spot. Per-stream sequences are enough. Cross-stream causality should be handled by correlation_id in metadata, not by a global clock.
Trap 3: Events that contain state from other aggregates. An OrderShipped event should contain the order ID and the shipping address, not the full Customer record. If you embed customer data, the event becomes a lie when the customer updates their address later. Use IDs and look up the current projection when you need it.
Trap 4: Forgetting that JSONB is not free. Events are forever. A 5MB JSONB payload per event will bloat your table, slow your replays, and make pg_dump painful. Keep payloads small. Store file references, not file contents.
Trap 5: No deletion policy. Events are append-only, but they are not immortal. Add a retention_class column — hot, warm, cold — and archive old events to S3 after two years. The audit requirement rarely says “keep every click event in PostgreSQL forever.”
When this pattern is the wrong choice
Event sourcing adds complexity. Do not use it for:
- Simple CRUD with no audit requirement. A
statuscolumn that changes twice a month does not need an event log. - High-frequency sensor data. A thousand events per second per stream will choke PostgreSQL. Use a time-series database or a dedicated event store.
- Tight latency requirements on reads. If every read must be under 5ms and your aggregates have hundreds of events, the replay cost matters. Use snapshots aggressively, or stick to mutable state with a separate audit log.
Use it when the history is the product feature — order tracking, audit trails, workflow replay — or when you need multiple read models that evolve faster than your core schema.
The practical checklist
Before you declare an event-sourced table done, verify:
- Unique index on
(stream_id, sequence)is in place. Without it, you have no concurrency safety. - Append path uses
23505error detection. The retry loop should re-read the stream, recompute the next sequence, and try again. - Projections are idempotent. Replaying the same event twice produces the same state. No
+=without a deduplication key. - Snapshots are optional but measurable. If a stream read exceeds your latency budget, snapshot it. Not before.
- Payloads are small and self-contained. No embedded foreign aggregates, no binary blobs.
- Retention is planned. Archive or drop events older than your compliance window.
- A replay script exists. You can rebuild any projection from the event log in one command.
Event sourcing does not have to mean Kafka, schema registries, and distributed snapshot stores. It can mean one PostgreSQL table, a reduce function, and the confidence that you can answer “how did we get here?” without guessing.
A note from Yojji
Building systems where the history itself is queryable — and where new read models can be reconstructed without touching production tables — is the kind of architectural foresight that pays off when business requirements change every quarter. Yojji engineers use these patterns in the Node.js and PostgreSQL systems they build for clients, favoring simple, inspectable designs over infrastructure that outruns the team’s operational bandwidth.
Yojji is an international custom software development company founded in 2016, with teams across Europe, the US, and the UK. They specialize in the JavaScript ecosystem, cloud platforms, and the kind of resilient backend architecture that makes production systems easier to reason about, not harder.