Durable Work in PostgresPart 10

Build a small durable workflow engine in Postgres

How to extend a Postgres durable queue into a simple durable workflow engine with workflow definitions, step scheduling, timers, signals, and versioning.

The queue from this series already gives you durable work, claims, leases, retries, and idempotency. A simple workflow engine is the next layer: it decides which durable step to enqueue next and records enough state to resume after crashes.

This is the Postgres version of a Temporal-light, DBOS-style, or pg_durable-style runtime for service-local workflows. You keep the engine small: workflow definitions live in code, execution state lives in Postgres, and workers run one durable step at a time.

The engine is a scheduler around the same claim loop

Do not start by inventing a new runtime. Keep the claim loop. Add a workflow scheduler that persists an instance, enqueues the first step, records step completion, and enqueues the next step in the same transaction.

The engine has five jobs:

Engine jobWhat it doesPostgres primitive
StartCreate one workflow instance and its first runnable stepTransactional INSERT
ScheduleChoose the next step after a step completesDefinition lookup in code + step row INSERT
RunClaim runnable steps and call handlersFOR UPDATE SKIP LOCKED, leases
WaitPause for timers, callbacks, approvals, or retriesavailable_at, waiting rows, signal rows
ResumeWake a waiting step without losing the workflow stateSignal transaction updates the waiting row

Use a workflow when one row is not the unit of progress

The queue pattern says: one row, one handler, one completion. A workflow says: one business process, many durable steps. The unit you inspect is no longer just send_receipt. It is checkout:9182 moving through reserve inventory, charge card, send receipt, wait for shipment, and notify search.

If you need…Use…Reason
One async action with retryInbox rowThe row is the work.
Publish after a domain writeTransactional outboxThe row is durable publish intent.
Many ordered steps with state between themWorkflow instance + step rowsThe instance is the business process; steps are resumable checkpoints.
Wait for a callback or approvalWorkflow signalThe process pauses without holding a thread or worker.
Sleep until a future timeDurable timerThe database stores wake time; workers only run when the step is available.

Store definitions in code and execution state in Postgres

The engine needs two durable tables: one row for the workflow instance and one row per scheduled step. The workflow definition can stay in application code. That keeps schema generic while making workflow versioning explicit.

Workflow engine tables
const checkoutV1 = {
  type: "checkout",
  version: 1,
  start: "reserve_inventory",
  steps: {
    reserve_inventory: { run: reserveInventory, next: "charge_card" },
    charge_card: { run: chargeCard, next: "send_receipt" },
    send_receipt: { run: sendReceipt, next: "wait_for_shipment" },
    wait_for_shipment: { run: waitForShipment, next: "notify_search" },
    notify_search: { run: notifySearch, next: null },
  },
};

Keep the workflow instance small and inspectable. It tells you what business process exists, which definition version owns it, what state it is in, and what durable state has been recorded so far.

CREATE TYPE workflow_status AS ENUM (
  'running', 'waiting', 'completed', 'failed', 'cancelled'
);

CREATE TABLE workflow_instances (
  id              UUID PRIMARY KEY DEFAULT uuidv7(),
  workflow_key    TEXT NOT NULL UNIQUE,   -- checkout:9182, supplied by caller
  workflow_type   TEXT NOT NULL,          -- checkout
  workflow_version INT NOT NULL,           -- 1
  status          workflow_status NOT NULL DEFAULT 'running',
  current_step    TEXT,
  input           JSONB NOT NULL,
  state           JSONB NOT NULL DEFAULT '{}',
  result          JSONB,
  last_error      TEXT,
  created_at      TIMESTAMPTZ NOT NULL DEFAULT now(),
  updated_at      TIMESTAMPTZ NOT NULL DEFAULT now()
);
CREATE TYPE workflow_step_status AS ENUM (
  'pending', 'processing', 'waiting', 'completed', 'failed', 'dead_letter'
);

CREATE TABLE workflow_steps (
  id                UUID PRIMARY KEY DEFAULT uuidv7(),
  workflow_id       UUID NOT NULL REFERENCES workflow_instances(id),
  step_key          TEXT NOT NULL,        -- unique durable key for this scheduled step
  step_name         TEXT NOT NULL,
  step_order        INT NOT NULL,
  status            workflow_step_status NOT NULL DEFAULT 'pending',
  input             JSONB NOT NULL DEFAULT '{}',
  result            JSONB,
  attempts          INT NOT NULL DEFAULT 0,
  max_attempts      INT NOT NULL DEFAULT 5,
  claimed_by        TEXT REFERENCES workers(id),
  lease_expires_at  TIMESTAMPTZ,
  available_at      TIMESTAMPTZ NOT NULL DEFAULT now(),
  idempotency_key   TEXT NOT NULL,
  last_error        TEXT,
  created_at        TIMESTAMPTZ NOT NULL DEFAULT now(),
  completed_at      TIMESTAMPTZ,
  UNIQUE (workflow_id, step_key),
  UNIQUE (idempotency_key)
);

CREATE INDEX idx_workflow_steps_claim
  ON workflow_steps (available_at, created_at, id)
  WHERE status = 'pending';

CREATE INDEX idx_workflow_steps_blockers
  ON workflow_steps (workflow_id, step_order)
  WHERE status IN ('pending', 'processing', 'waiting');

Start a workflow by inserting the instance and first step together

Starting a workflow should be idempotent. The caller supplies a stable workflow id such as checkout:9182. If the API retries after a timeout, the engine returns the existing instance instead of creating a second checkout flow.

BEGIN;

WITH inserted AS (
  INSERT INTO workflow_instances (workflow_key, workflow_type, workflow_version, input, current_step)
  VALUES ('checkout:9182', 'checkout', 1, '{"order_id":9182}'::jsonb, 'reserve_inventory')
  ON CONFLICT (workflow_key) DO NOTHING
  RETURNING id
), instance AS (
  SELECT id FROM inserted
  UNION ALL
  SELECT id FROM workflow_instances WHERE workflow_key = 'checkout:9182'
  LIMIT 1
)
INSERT INTO workflow_steps (workflow_id, step_key, step_name, step_order, input, idempotency_key)
SELECT
  id,
  'reserve_inventory:1',
  'reserve_inventory',
  10,
  '{"order_id":9182}'::jsonb,
  'checkout:9182:reserve_inventory'
FROM instance
ON CONFLICT (workflow_id, step_key) DO NOTHING;

COMMIT;

Run the engine as a normal worker loop

The workflow worker still claims rows with the mechanics from earlier posts. The difference is the handler returns a workflow outcome, and the engine persists that outcome. Do not let arbitrary handler code update workflow state directly.

WITH candidates AS (
  SELECT s.id
  FROM workflow_steps s
  WHERE s.status = 'pending'
    AND s.available_at <= now()
    AND NOT EXISTS (
      SELECT 1
      FROM workflow_steps earlier
      WHERE earlier.workflow_id = s.workflow_id
        AND earlier.step_order < s.step_order
        AND earlier.status IN ('pending', 'processing', 'waiting')
    )
  ORDER BY s.created_at, s.id
  LIMIT 25
  FOR UPDATE SKIP LOCKED
)
UPDATE workflow_steps s
SET
  status = 'processing',
  claimed_by = $1,
  lease_expires_at = now() + interval '90 seconds',
  attempts = attempts + 1
FROM candidates c
WHERE s.id = c.id
RETURNING s.*;

Complete a step by enqueueing the next step in the same transaction

The important engine invariant is simple: completing one step and making the next step durable must be one transaction. Otherwise a crash can leave the workflow with a completed side effect and no next step.

BEGIN;

UPDATE workflow_steps
SET status = 'completed', result = $result, completed_at = now()
WHERE id = $step_id
  AND claimed_by = $worker_id
  AND status = 'processing';

INSERT INTO workflow_steps (workflow_id, step_key, step_name, step_order, input, idempotency_key)
VALUES ($workflow_id, 'send_receipt:1', 'send_receipt', 30, $input, $workflow_id || ':send_receipt:1')
ON CONFLICT (workflow_id, step_key) DO NOTHING;

UPDATE workflow_instances
SET current_step = 'send_receipt', state = state || $state_patch, updated_at = now()
WHERE id = $workflow_id;

COMMIT;
while (true) {
  await db.runWorkflowHousekeeping();
  const steps = await db.claimWorkflowSteps(WORKER_ID);

  for (const step of steps) {
    try {
      const definition = registry.get(step.workflow_type);
      const outcome = await definition.steps[step.step_name].run(step.input);
      await engine.applyOutcome(step, outcome, definition);
    } catch (err) {
      await engine.failStepWithRetry(step, err);
    }
  }
}

Add timers and signals as engine outcomes

A workflow should not hold a process while it waits for shipment, approval, a partner callback, or a retry window. A handler returns sleepUntil or waitForSignal; the engine converts that into durable rows.

Wait typePostgres representationWho wakes it?
Retry laterstatus = pending, future available_atWorker claim loop
Sleep until dateTimer step with future available_atWorker claim loop
External callbackstatus = waiting plus signal nameWebhook or API signal handler
Manual approvalstatus = waiting plus approval payloadAdmin action inserts a signal
CREATE TABLE workflow_signals (
  id              UUID PRIMARY KEY DEFAULT uuidv7(),
  workflow_id     UUID NOT NULL REFERENCES workflow_instances(id),
  signal_name     TEXT NOT NULL,
  payload         JSONB NOT NULL DEFAULT '{}',
  idempotency_key TEXT NOT NULL UNIQUE,
  created_at      TIMESTAMPTZ NOT NULL DEFAULT now()
);
BEGIN;

INSERT INTO workflow_signals (workflow_id, signal_name, payload, idempotency_key)
VALUES ($workflow_id, 'shipment_created', $payload, $signal_id)
ON CONFLICT (idempotency_key) DO NOTHING;

UPDATE workflow_steps
SET status = 'pending', available_at = now()
WHERE workflow_id = $workflow_id
  AND step_name = 'wait_for_shipment'
  AND status = 'waiting';

UPDATE workflow_instances
SET status = 'running', updated_at = now()
WHERE id = $workflow_id;

COMMIT;

Keep the public API tiny

A useful first engine needs only a small surface area. Resist building a framework before the state machine is reliable.

APIWhat it guarantees
start(workflowType, id, input)Creates one instance and one first step, idempotently.
complete(step, result)Marks the step complete and schedules the next step atomically.
sleep(step, until)Marks the step pending with a future available_at.
waitForSignal(step, name)Marks the step waiting until a signal transaction resumes it.
signal(workflowId, name, payload, key)Records a signal idempotently and wakes matching waiting steps.
cancel(workflowId, reason)Stops future steps and records cancellation state.

Version workflow definitions before you need to

Every instance stores workflow_type, such as checkout_v1. That is not decoration. It prevents a deploy from changing the meaning of an in-flight workflow.

ChangeSafe approachWhy
Add a new final stepCreate checkout_v2Old instances keep their original path.
Change payment provider logicNew step name or versioned handlerRetries of old steps run old semantics.
Remove a waiting signalKeep compatibility handler until old waits drainOld workflows may still be parked.
Change state JSON shapeMigrate instance state or branch by versionHandlers should know what shape they read.

Start here before adopting a workflow platform

This engine is useful when the workflow belongs to one service and your team wants SQL-visible state, explicit steps, and a small runtime. It covers the common middle ground between one-off queue jobs and a full workflow platform.

Strong fit

  • Service-local workflows with a handful of steps
  • Durable sleeps, callbacks, approvals, and retry windows
  • SQL support queries are more valuable than a separate workflow UI
  • Handlers are idempotent and side effects already have dedupe keys
  • You can version workflow definitions explicitly

Escalate when

  • Workflows span many teams and services as shared platform infrastructure
  • You need deterministic replay as a language-level programming model
  • History is large enough to need purpose-built retention, search, and replay tooling
  • Human workflow management becomes a product surface with its own UI and permissions
  • Cross-region workflow execution is a core requirement, not an edge case

Sharp edges before you ship

  • Versioning: never change old step semantics in place. Add checkout_v2 or branch by workflow version.
  • Idempotency: each activity needs its own dedupe key. Step completion dedupe is not payment dedupe.
  • Step explosion: do not model every line of code as a step. Persist only meaningful recovery checkpoints.
  • Long history: archive completed workflows and step rows. Workflow tables churn like inbox tables.
  • Cancellation: define which steps can be cancelled, which require compensation, and which are already externally committed.