DataQueueDataQueue
Usage

Job Dependencies

You can defer a job until prerequisites are satisfied by setting dependsOn on JobOptions. Both dimensions use logical AND when both are present.

dependsOn.jobIds

The job stays pending until every listed prerequisite has status completed.

  • Invalid ids: Enqueue fails if any id does not exist in the queue.
  • Self-dependency: A job cannot list its own id in dependsOn.jobIds.
  • Cycles: DataQueue rejects inserts that would create a dependency cycle between jobs.
  • Failure or cancellation: If any prerequisite ends as failed or cancelled, pending jobs that depend on it (transitively) are cancelled.

Use this for explicit chains: job B runs only after job A completes successfully.

Linear chain
import { getJobQueue } from '@/lib/queue';

const jobQueue = getJobQueue();

const a = await jobQueue.addJob({
  jobType: 'ingest',
  payload: { fileId: 'f1' },
});

await jobQueue.addJob({
  jobType: 'transform',
  payload: { fileId: 'f1' },
  dependsOn: { jobIds: [a] },
});

dependsOn.tags (tag drain)

The job stays pending while another job (not itself) is activepending, processing, or waiting — and that job’s tags are a superset of every tag listed in dependsOn.tags (same semantics as Postgres tags @> depends_on_tags).

When no such blocking job exists, the dependent job becomes eligible to run (subject to runAt, workers, etc.).

  • Failure or cancellation: If a job that matches the tag barrier fails or is cancelled, pending jobs that listed those tags are cancelled (transitively).

Use this for drain patterns: wait until no in-flight work is tagged in a certain way (for example a “wave” or “tenant” tag).

Tag drain
await jobQueue.addJob({
  jobType: 'finalize_wave',
  payload: { wave: 2 },
  tags: ['wave:2'],
  dependsOn: { tags: ['wave:1'] },
});

Combining jobIds and tags

If you set both, all job-id prerequisites must be completed and the tag-drain condition must be clear before the job runs.

addJob vs addJobs

Single addJob

dependsOn.jobIds must contain positive database ids only. Negative placeholders are not allowed (they are reserved for batch inserts).

Batch addJobs and batchDepRef

When you enqueue several related jobs in one addJobs call, you can reference earlier jobs in the same batch using negative placeholders: -(index + 1) for the job at index in the array. Use the helper batchDepRef from @nicnocquee/dataqueue instead of hard-coding negatives.

Batch dependencies
import { batchDepRef } from '@nicnocquee/dataqueue';
import { getJobQueue } from '@/lib/queue';

const jobQueue = getJobQueue();

const [idA, idB, idC] = await jobQueue.addJobs([
  { jobType: 'step_a', payload: {} },
  {
    jobType: 'step_b',
    payload: {},
    dependsOn: { jobIds: [batchDepRef(0)] },
  },
  {
    jobType: 'step_c',
    payload: {},
    dependsOn: { jobIds: [batchDepRef(0), batchDepRef(1)] },
  },
]);

This enqueues three jobs in one round-trip. Array indices are 0-based: batchDepRef(0) means “the job at index 0 in this same array,” and batchDepRef(1) means “the job at index 1.” After insert, those placeholders become real ids.

  • step_a (index 0) has no prerequisites; it can run as soon as a worker picks it up.
  • step_b waits for step_a only (dependsOn: { jobIds: [batchDepRef(0)] }).
  • step_c waits for both earlier jobs (batchDepRef(0) and batchDepRef(1)), so it runs only after step_a and step_b have reached completed.

[idA, idB, idC] are the final database ids in the same order as the input array — the same ids that were written into each row’s dependency list when placeholders were resolved.

batchDepRef is exported from @nicnocquee/dataqueue. Re-export it from your queue module if you prefer a single import path.

Persisted fields on JobRecord

Prerequisites are stored on the row as dependsOnJobIds and dependsOnTags for inspection and debugging.

See also

  • JobOptions — full dependsOn / JobDependsOn shape
  • JobRecord — persisted prerequisite columns
  • Add jobaddJob / addJobs batch behavior