How I run @nestjs/cqrs in production. Commands, projections, sagas, optimistic concurrency, and the failure modes that taught me to measure freshness instead of throughput.
Saturday afternoon at the combat-sports tournament platform I CTO’d in London. A live broadcast was running, the federation and the commentators watching the public standings page, and somewhere mid-card the leaderboard just stopped moving. The match-events topic on Kafka kept growing on the broker. Standings froze. I was acting CTO. Most of that team, I’d hired myself.
That incident is the reason I write CQRS code the way I do now. The write side is fine on its own. It’s the read side, the projections, the consumers reading events and updating denormalized views, that punishes you when you stop paying attention.
This is how I run @nestjs/cqrs in production, and the failure modes I’ve learned to design for.
I don’t reach for CQRS because it’s elegant. I reach for it when reads and writes start contending for the same database connections and the same mental model.
At the federation platform, we ran hundreds of microservices, Kafka as the async backbone, and a rankings surface that absolutely had to be fast. Writes went through aggregates in PostgreSQL. Reads went through Elasticsearch. The two stores never shared a connection pool, never shared the same hot path. That’s CQRS, whether you call it that or not.
Event Sourcing is a different decision. You add it when you need an audit trail you can replay, or multiple read models built off the same write history. Start with CQRS. Add ES later, if at all.
A command is past-tense data. The handler is where the work happens, but the work itself lives in the aggregate.
import { CommandHandler, ICommandHandler, EventPublisher } from '@nestjs/cqrs';
import { Injectable, Logger, ConflictException } from '@nestjs/common';
import { OrderRepository } from './order.repository';
import { Order } from './order.aggregate';
export class PlaceOrderCommand {
constructor(
public readonly orderId: string,
public readonly userId: string,
public readonly items: ReadonlyArray<{ productId: string; quantity: number }>,
public readonly idempotencyKey: string,
) {}
}
@Injectable()
@CommandHandler(PlaceOrderCommand)
export class PlaceOrderHandler implements ICommandHandler<PlaceOrderCommand> {
private readonly logger = new Logger(PlaceOrderHandler.name);
constructor(
private readonly repository: OrderRepository,
private readonly publisher: EventPublisher,
) {}
async execute(cmd: PlaceOrderCommand): Promise<void> {
const existing = await this.repository.findByIdempotencyKey(cmd.idempotencyKey);
if (existing) {
this.logger.warn(`Duplicate PlaceOrder, key=${cmd.idempotencyKey}`);
return;
}
const order = this.publisher.mergeObjectContext(
Order.place(cmd.orderId, cmd.userId, cmd.items, cmd.idempotencyKey),
);
try {
await this.repository.save(order);
} catch (err) {
if (this.repository.isVersionConflict(err)) {
throw new ConflictException(`Concurrent write on order ${cmd.orderId}`);
}
throw err;
}
order.commit();
}
}
Two things to notice. The idempotency check is mandatory, not nice-to-have. At the creator-economy platform I worked at, Apple’s SubscriptionRenewal retried on us once because our endpoint returned 200 OK a second past the deadline, and a sizeable batch of customers got double-charged. The structural fix was an idempotency key on the renewal handler. Since then I treat every command handler as if its input will arrive at least twice. Because eventually it will.
The other thing is mergeObjectContext. That’s how @nestjs/cqrs glues an aggregate to the event bus. Without it, order.commit() does nothing. I’ve debugged a “why isn’t my saga firing” thread for two hours before realizing someone instantiated the aggregate with new Order(...) instead of through the publisher.
Events are facts. Past tense. Multiple handlers can listen, and each one should be able to run independently and be replayed without corrupting anything.
import { EventsHandler, IEventHandler } from '@nestjs/cqrs';
import { Injectable, Logger } from '@nestjs/common';
import { OrderReadRepository } from './order-read.repository';
import { OrderPlacedEvent } from './order.events';
@Injectable()
@EventsHandler(OrderPlacedEvent)
export class OrderPlacedProjector implements IEventHandler<OrderPlacedEvent> {
private readonly logger = new Logger(OrderPlacedProjector.name);
constructor(private readonly reads: OrderReadRepository) {}
async handle(event: OrderPlacedEvent): Promise<void> {
await this.reads.upsert({
orderId: event.orderId,
userId: event.userId,
itemCount: event.items.length,
total: event.items.reduce((sum, i) => sum + i.unitPrice * i.quantity, 0),
status: 'placed',
version: event.aggregateVersion,
occurredAt: event.occurredAt,
});
this.logger.log(
`Projected OrderPlaced order=${event.orderId} v=${event.aggregateVersion}`,
);
}
}
That version field on the read row is the thing I now refuse to ship without. It lets the projector be safely re-run. It lets you ignore a stale event if a newer one already landed. It gives you a freshness signal you can monitor.
Which brings me to the rankings incident.
Back to the federation platform. A federation tournament wrapped on a Saturday night. The new champion should have shown up at the top of the global rankings within minutes. Eight hours later the page still showed the old number one. The athlete in question noticed before we did and posted a screenshot of our broken rankings on social media, tagging the federation.
The rankings-indexer had stopped writing to Elasticsearch overnight. Kafka was fine. The consumer was still consuming. We just weren’t projecting.
First instinct was operational: SSH into the indexer pod, look at logs, restart. The logs were quiet. The restart cleared the in-memory state and the indexer started projecting new events again from a stale checkpoint. Old wrong rankings stayed in the index.
The real fix was a full reindex from PostgreSQL into a new ES index, then an atomic alias swap. The reindex took 25 minutes. Root cause of the original silent failure: the bulk-write client had entered a circuit-open state after a transient cluster blip the night before, and the breaker had no automated path back to half-open. It would have stayed open until the next deploy.
Cost was eight hours of stale rankings during a publicly-visible competition window. One pissed-off athlete posting screenshots. A Monday call with the federation.
The lesson, in one line, is the rule I write into every CQRS service now. Derived indexes need their own freshness metric, not just “is the consumer still consuming”. Throughput says you’re moving. Freshness tells you whether you’re moving toward the truth.
@Injectable()
export class ProjectionFreshnessHealth {
constructor(
private readonly reads: OrderReadRepository,
@Inject('METRICS') private readonly metrics: Metrics,
) {}
@Interval(15_000)
async report(): Promise<void> {
const lastProjectedAt = await this.reads.maxOccurredAt();
const lagMs = Date.now() - lastProjectedAt.getTime();
this.metrics.gauge('projection.lag_ms', lagMs, { stream: 'orders' });
if (lagMs > 60_000) {
this.metrics.increment('projection.stale', { stream: 'orders' });
}
}
}
Datadog alerts on the gauge. That alert has paged me more times than I’d like, and every page was real.
Sagas are how you coordinate across aggregates. The @nestjs/cqrs saga is an RxJS pipeline that maps events to new commands. Powerful, easy to misuse.
import { Injectable } from '@nestjs/common';
import { Saga, ICommand, ofType } from '@nestjs/cqrs';
import { Observable } from 'rxjs';
import { map, filter } from 'rxjs/operators';
import { OrderPlacedEvent, InventoryReservedEvent, InventoryRejectedEvent } from './order.events';
import { ReserveInventoryCommand, CancelOrderCommand, ConfirmOrderCommand } from './order.commands';
@Injectable()
export class OrderSaga {
@Saga()
reserveOnPlace = (events$: Observable<any>): Observable<ICommand> =>
events$.pipe(
ofType(OrderPlacedEvent),
filter((e: OrderPlacedEvent) => e.items.length > 0),
map((e) => new ReserveInventoryCommand(e.orderId, e.items, e.aggregateVersion)),
);
@Saga()
confirmOnReserved = (events$: Observable<any>): Observable<ICommand> =>
events$.pipe(
ofType(InventoryReservedEvent),
map((e: InventoryReservedEvent) => new ConfirmOrderCommand(e.orderId)),
);
@Saga()
cancelOnRejected = (events$: Observable<any>): Observable<ICommand> =>
events$.pipe(
ofType(InventoryRejectedEvent),
map((e: InventoryRejectedEvent) => new CancelOrderCommand(e.orderId, e.reason)),
);
}
Two rules I keep. Every saga step has a compensating path. And every command emitted by a saga carries the source event’s aggregate version, so the receiving handler can reject the command if the aggregate has moved on. Otherwise you ship a race condition into the heart of your business logic.
If you’re event sourcing, the append must check expected version. This is the line that prevents two writers from corrupting the same stream.
async append(
aggregateId: string,
events: DomainEvent[],
expectedVersion: number,
): Promise<void> {
await this.dataSource.transaction(async (trx) => {
const current = await trx
.createQueryBuilder(StoredEvent, 'e')
.select('MAX(e.version)', 'v')
.where('e.aggregate_id = :id', { id: aggregateId })
.getRawOne<{ v: number | null }>();
const head = current?.v ?? 0;
if (head !== expectedVersion) {
throw new ConflictException(
`Version mismatch on ${aggregateId}: expected ${expectedVersion}, head ${head}`,
);
}
const rows = events.map((e, i) => ({
aggregateId,
type: e.constructor.name,
payload: JSON.stringify(e),
version: expectedVersion + i + 1,
occurredAt: new Date(),
}));
await trx.getRepository(StoredEvent).insert(rows);
});
}
You also need a unique constraint on (aggregate_id, version) at the database level. Application-level checks lose every race they meet at scale.
Thanks for reading. If you’ve got thoughts, send them my way.