Durable Work in PostgresPart 8

Schema and operations

Full Postgres schema, inbox SQL, observability, testing, and schema evolution for order:9182.

A durable work queue is not done when the worker loop runs. It needs schema, indexes, support queries, housekeeping, and tests that prove the guarantees.

This chapter turns the pattern into an operating surface your team can inspect during an incident without reconstructing the design from memory.

The operational contract

The reference table is named inbox, and the same columns work for outbox and projection queues. No Redis, ZooKeeper, workflow engine, or external coordinator is required for this version. Download the migration or read section by section.

The schema carries three kinds of state: worker liveness, row ownership, and retry history. Keep those concepts explicit. If a row is stuck, you should be able to answer who claimed it, when the lease expires, how many attempts ran, and whether the next action is retry, dead-letter, or manual inspection.

The canonical SQL migration lives at schema.sql. The diagram below mirrors the two core tables; per-post source.sql files extend this base.

Core durable work schema Handler dedupe and archive tables
Ship thisPurposeMinimum check
Worker registryKnow which workers are alive, draining, or deadHeartbeat upsert updates last_seen_at.
Inbox tablePersist work, ownership, attempts, and idempotencyDuplicate enqueue with the same key creates one logical row.
Claim queryGive each row to one worker at a timeConcurrent claims never return the same row generation.
Lease cleanupRecover rows from crashed workersExpired processing rows return to pending or dead_letter.
Support queriesLet on-call inspect lag and stuck workRunbook covers oldest pending, expired processing, and dead letters.
Guarantee testsProtect the contract during refactorsTests cover dedupe, exclusivity, expiry, stale completion, and dead lettering.

Workers registry

The worker registry is the source of live membership for hash-ring ownership and housekeeping. It is intentionally small: identity, status, heartbeat time, and optional metadata for deploy version or host diagnostics.

CREATE TABLE workers (
  id           TEXT PRIMARY KEY,          -- e.g. hostname + pid
  status       TEXT NOT NULL DEFAULT 'alive'
               CHECK (status IN ('alive', 'draining', 'dead')),
  last_seen_at TIMESTAMPTZ NOT NULL DEFAULT now(),
  started_at   TIMESTAMPTZ NOT NULL DEFAULT now(),
  metadata     JSONB DEFAULT '{}'         -- version, host, load
);

CREATE INDEX idx_workers_alive
  ON workers (last_seen_at)
  WHERE status = 'alive';

Queue table: inbox

CREATE TYPE inbox_status AS ENUM (
  'pending', 'processing', 'completed', 'failed', 'dead_letter'
);

CREATE TABLE inbox (
  id                UUID PRIMARY KEY DEFAULT uuidv7(),
  partition_key     TEXT NOT NULL,            -- stream id (producer sets this)
  partition_bucket  INT NOT NULL
                    CHECK (partition_bucket >= 0 AND partition_bucket < 1024),
  payload           JSONB NOT NULL,
  status            inbox_status NOT NULL DEFAULT 'pending',
  attempts          INT NOT NULL DEFAULT 0,
  max_attempts      INT NOT NULL DEFAULT 5,
  claimed_by        TEXT REFERENCES workers(id),
  claimed_at        TIMESTAMPTZ,
  lease_expires_at  TIMESTAMPTZ,
  lease_generation  BIGINT NOT NULL DEFAULT 0,  -- fencing token
  available_at      TIMESTAMPTZ NOT NULL DEFAULT now(),
  completed_at      TIMESTAMPTZ,
  last_error        TEXT,
  idempotency_key   TEXT,
  created_at        TIMESTAMPTZ NOT NULL DEFAULT now()
);

CREATE INDEX idx_inbox_claim
  ON inbox (available_at, created_at, id)
  WHERE status = 'pending';

CREATE INDEX idx_inbox_bucket_claim
  ON inbox (partition_bucket, available_at, created_at, id)
  WHERE status = 'pending';

CREATE INDEX idx_inbox_partition_processing
  ON inbox (partition_key)
  WHERE status = 'processing';

CREATE INDEX idx_inbox_stale_processing
  ON inbox (lease_expires_at, id)
  WHERE status = 'processing';

CREATE UNIQUE INDEX idx_inbox_idempotency_key
  ON inbox (idempotency_key)
  WHERE idempotency_key IS NOT NULL;

Heartbeat: upsert on every tick

Heartbeat writes should be small and cheap. They do not claim work and they do not run recovery. They only keep the live worker list fresh enough for ring ownership and stale-worker detection.

-- Called every 10 seconds by each worker
INSERT INTO workers (id, last_seen_at, metadata)
VALUES ('worker-host1-4821', now(), '{"version":"1.2.0"}')
ON CONFLICT (id) DO UPDATE SET
  last_seen_at = EXCLUDED.last_seen_at,
  status       = 'alive',
  metadata     = EXCLUDED.metadata;

-- Heartbeat only, stale-worker sweep runs in maybeRunHousekeeping() (see patterns)

Claim work: filter by bucket in SQL

The claim path is where most performance mistakes happen. Build the ring in application code, turn it into an owned bucket list, and push that filter into SQL. Do not claim rows you plan to discard in memory.

-- Step 1: live workers → build ring → owned_buckets[] in app
SELECT id FROM workers
WHERE status = 'alive'
  AND last_seen_at > now() - interval '30 seconds'
ORDER BY id;

-- Step 2: claim only rows this worker owns (partition_bucket in SQL)
WITH candidates AS (
  SELECT i.id
  FROM inbox i
  WHERE i.status = 'pending'
    AND i.available_at <= now()
    AND i.partition_bucket = ANY($1::int[])  -- owned buckets
  ORDER BY i.created_at, i.id
  LIMIT 25
  FOR UPDATE SKIP LOCKED
)
UPDATE inbox i
SET
  status           = 'processing',
  claimed_by       = $2,
  claimed_at       = now(),
  lease_expires_at = now() + interval '90 seconds',
  lease_generation = i.lease_generation + 1,
  attempts         = i.attempts + 1
FROM candidates c
WHERE i.id = c.id
RETURNING i.*;

Lease cleanup: recover from crashes

Run these inside maybeRunHousekeeping() when the advisory lock is held, not in a separate cron job.

UPDATE inbox
SET
  status           = 'pending',
  claimed_by       = NULL,
  claimed_at       = NULL,
  lease_expires_at = NULL,
  available_at     = now() + (LEAST(POWER(2, attempts)::int, 3600) * interval '1 second')
WHERE status = 'processing'
  AND lease_expires_at < now()
  AND attempts < max_attempts;

-- Move exhausted retries to dead letter
UPDATE inbox
SET status = 'dead_letter'
WHERE status = 'processing'
  AND lease_expires_at < now()
  AND attempts >= max_attempts;

Support queries

Operationally, the queue is only as useful as the questions it can answer quickly. Keep these queries in a runbook so support and on-call engineers can inspect backlog health without writing SQL from scratch during an incident.

-- Oldest pending work: user-visible lag signal
SELECT now() - min(created_at) AS oldest_pending_age
FROM inbox
WHERE status = 'pending';

-- Rows stuck in processing beyond their lease
SELECT id, partition_key, claimed_by, lease_expires_at, attempts, last_error
FROM inbox
WHERE status = 'processing'
  AND lease_expires_at < now()
ORDER BY lease_expires_at
LIMIT 50;

-- Dead letters grouped by handler or stream
SELECT partition_key, count(*) AS dead_letters, max(last_error) AS sample_error
FROM inbox
WHERE status = 'dead_letter'
GROUP BY partition_key
ORDER BY dead_letters DESC;

Guarantee tests

Test the guarantees, not the implementation trivia. The database contract should prove that duplicate enqueue is blocked, claimed rows are exclusive, expired leases return to pending, stale owners cannot complete, and exhausted rows move to dead_letter.

  • Producer retry with the same idempotency_key creates one logical row.
  • Two workers claiming concurrently never receive the same row in the same generation.
  • An expired lease is returned to pending and can be claimed by a new worker.
  • A stale worker completing with an old claimed_by or fence token affects zero rows.
  • A row past max_attempts lands in dead_letter and no longer blocks the stream.