DataQueueDataQueue

JobQueue

Initialization

initJobQueue

initJobQueue(config: JobQueueConfig): JobQueue

Initializes the job queue system with the provided configuration. The JobQueueConfig is a discriminated union -- you provide either a PostgreSQL or Redis configuration.

PostgresJobQueueConfig

Provide either databaseConfig (the library creates a pool) or pool (bring your own pg.Pool). At least one must be set.

interface PostgresJobQueueConfig {
  backend?: 'postgres'; // Optional, defaults to 'postgres'
  databaseConfig?: {
    connectionString?: string;
    host?: string;
    port?: number;
    database?: string;
    user?: string;
    password?: string;
    ssl?: DatabaseSSLConfig;
  };
  pool?: import('pg').Pool; // Bring your own pool
  verbose?: boolean;
}

RedisJobQueueConfig

Provide either redisConfig (the library creates an ioredis client) or client (bring your own). At least one must be set.

interface RedisJobQueueConfig {
  backend: 'redis'; // Required
  redisConfig?: {
    url?: string;
    host?: string;
    port?: number;
    password?: string;
    db?: number;
    tls?: RedisTLSConfig;
    keyPrefix?: string; // Default: 'dq:'
  };
  client?: unknown; // Bring your own ioredis client
  keyPrefix?: string; // Key prefix when using external client (default: 'dq:')
  verbose?: boolean;
}

JobQueueConfig

type JobQueueConfig = PostgresJobQueueConfig | RedisJobQueueConfig;

DatabaseSSLConfig

interface DatabaseSSLConfig {
  ca?: string;
  cert?: string;
  key?: string;
  rejectUnauthorized?: boolean;
}
  • ca - Client certificate authority (CA) as PEM string or file path. If the value starts with 'file://', it will be loaded from file, otherwise treated as PEM string.
  • cert - Client certificate as PEM string or file path. If the value starts with 'file://', it will be loaded from file, otherwise treated as PEM string.
  • key - Client private key as PEM string or file path. If the value starts with 'file://', it will be loaded from file, otherwise treated as PEM string.
  • rejectUnauthorized - Whether to reject unauthorized certificates (default: true)

RedisTLSConfig

interface RedisTLSConfig {
  ca?: string;
  cert?: string;
  key?: string;
  rejectUnauthorized?: boolean;
}

Adding Jobs

addJob

addJob(job: JobOptions, options?: AddJobOptions): Promise<number>

Adds a job to the queue. Returns the job ID.

JobOptions

interface JobOptions {
  jobType: string;
  payload: any;
  maxAttempts?: number;
  priority?: number;
  runAt?: Date | null;
  timeoutMs?: number;
  tags?: string[];
  idempotencyKey?: string;
  retryDelay?: number; // Base delay between retries in seconds (default: 60)
  retryBackoff?: boolean; // Use exponential backoff (default: true)
  retryDelayMax?: number; // Max delay cap in seconds (default: none)
  deadLetterJobType?: string; // Route exhausted failures to this job type
  group?: { id: string; tier?: string }; // Optional group for global concurrency limits
}
  • retryDelay - Base delay between retries in seconds. When retryBackoff is true, this is the base for exponential backoff (retryDelay * 2^attempts). When false, retries use this fixed delay. Default: 60.
  • retryBackoff - Whether to use exponential backoff. When true, delay doubles with each attempt and includes jitter. Default: true.
  • retryDelayMax - Maximum delay cap in seconds. Only meaningful when retryBackoff is true. No limit when omitted.
  • deadLetterJobType - Optional dead-letter destination. When retries are exhausted, a new pending job is created in this job type with an envelope payload (originalJob, originalPayload, failure).
  • group - Optional grouping metadata. Use group.id to enforce global per-group limits with ProcessorOptions.groupConcurrency. group.tier is reserved for future policies.

AddJobOptions

interface AddJobOptions {
  db?: DatabaseClient;
}
  • db — An external database client (e.g., a pg.PoolClient inside a transaction). When provided, the INSERT runs on this client instead of the internal pool. PostgreSQL only. Throws if used with the Redis backend.

addJobs

addJobs(jobs: JobOptions[], options?: AddJobOptions): Promise<number[]>

Adds multiple jobs to the queue in a single operation. More efficient than calling addJob in a loop because it batches the INSERT into a single database round-trip (PostgreSQL) or a single atomic Lua script (Redis).

Returns an array of job IDs in the same order as the input array.

Each job can independently have its own priority, runAt, tags, idempotencyKey, and other options. Idempotency keys are handled per-job — duplicates resolve to the existing job's ID without creating a new row.

Passing an empty array returns [] immediately without touching the database.

const jobIds = await jobQueue.addJobs([
  {
    jobType: 'email',
    payload: { to: 'a@example.com', subject: 'Hi', body: '...' },
  },
  {
    jobType: 'email',
    payload: { to: 'b@example.com', subject: 'Hi', body: '...' },
    priority: 10,
  },
  {
    jobType: 'report',
    payload: { reportId: '123', userId: '456' },
    tags: ['monthly'],
  },
]);
// jobIds = [1, 2, 3]

The { db } option works the same as addJob — pass a transactional client to batch-insert within an existing transaction (PostgreSQL only).

DatabaseClient

interface DatabaseClient {
  query(
    text: string,
    values?: any[],
  ): Promise<{ rows: any[]; rowCount: number | null }>;
}

Any object matching this interface works — pg.Pool, pg.PoolClient, pg.Client, or ORM query runners that expose a raw query() method.


Retrieving Jobs

getJob

getJob(id: number): Promise<JobRecord | null>

Retrieves a job by its ID.

getJobs

getJobs(
  filters?: {
    jobType?: string;
    priority?: number;
    runAt?: Date | { gt?: Date; gte?: Date; lt?: Date; lte?: Date; eq?: Date };
    tags?: { values: string[]; mode?: 'all' | 'any' | 'none' | 'exact' };
  },
  limit?: number,
  offset?: number
): Promise<JobRecord[]>

Retrieves jobs matching the provided filters, with optional pagination.

getJobsByStatus

getJobsByStatus(status: string, limit?: number, offset?: number): Promise<JobRecord[]>

Retrieves jobs by their status, with pagination.

getAllJobs

getAllJobs(limit?: number, offset?: number): Promise<JobRecord[]>

Retrieves all jobs, with optional pagination.

getJobsByTags

getJobsByTags(tags: string[], mode?: TagQueryMode, limit?: number, offset?: number): Promise<JobRecord[]>

Retrieves jobs by tag(s).


Managing Jobs

retryJob

retryJob(jobId: number): Promise<void>

Retries a job given its ID.

cancelJob

cancelJob(jobId: number): Promise<void>

Cancels a job given its ID.

editJob

editJob(jobId: number, updates: EditJobOptions): Promise<void>

Edits a pending job given its ID. Only works for jobs with status 'pending'. Silently fails for other statuses (processing, completed, failed, cancelled).

EditJobOptions

interface EditJobOptions {
  payload?: any;
  maxAttempts?: number;
  priority?: number;
  runAt?: Date | null;
  timeoutMs?: number;
  tags?: string[];
  retryDelay?: number | null;
  retryBackoff?: boolean | null;
  retryDelayMax?: number | null;
  deadLetterJobType?: string | null;
}

All fields are optional - only provided fields will be updated. Note that jobType cannot be changed. Set retry fields to null to revert to legacy default behavior. Set deadLetterJobType to null to clear dead-letter routing for pending jobs.

Example

// Edit a pending job's payload and priority
await jobQueue.editJob(jobId, {
  payload: { to: 'newemail@example.com', subject: 'Updated' },
  priority: 10,
});

// Edit only the scheduled run time
await jobQueue.editJob(jobId, {
  runAt: new Date(Date.now() + 60000), // Run in 1 minute
});

// Edit multiple fields at once
await jobQueue.editJob(jobId, {
  payload: { to: 'updated@example.com' },
  priority: 5,
  maxAttempts: 10,
  timeoutMs: 30000,
  tags: ['urgent', 'priority'],
});

editAllPendingJobs

editAllPendingJobs(
  filters?: {
    jobType?: string;
    priority?: number;
    runAt?: Date | { gt?: Date; gte?: Date; lt?: Date; lte?: Date; eq?: Date };
    tags?: { values: string[]; mode?: 'all' | 'any' | 'none' | 'exact' };
  },
  updates: EditJobOptions
): Promise<number>

Edits all pending jobs that match the filters. Only works for jobs with status 'pending'. Non-pending jobs are not affected. Returns the number of jobs that were edited.

Parameters

  • filters (optional): Filters to select which jobs to edit. If not provided, all pending jobs are edited.
    • jobType: Filter by job type
    • priority: Filter by priority
    • runAt: Filter by scheduled run time (supports gt, gte, lt, lte, eq operators or exact Date match)
    • tags: Filter by tags with mode ('all', 'any', 'none', 'exact')
  • updates: The fields to update (same as EditJobOptions). All fields are optional - only provided fields will be updated.

Returns

The number of jobs that were successfully edited.

Examples

// Edit all pending jobs
const editedCount = await jobQueue.editAllPendingJobs(undefined, {
  priority: 10,
});

// Edit all pending email jobs
const editedCount = await jobQueue.editAllPendingJobs(
  { jobType: 'email' },
  {
    priority: 5,
  },
);

// Edit all pending jobs with 'urgent' tag
const editedCount = await jobQueue.editAllPendingJobs(
  { tags: { values: ['urgent'], mode: 'any' } },
  {
    priority: 10,
    maxAttempts: 5,
  },
);

// Edit all pending jobs scheduled in the future
const editedCount = await jobQueue.editAllPendingJobs(
  { runAt: { gte: new Date() } },
  {
    priority: 10,
  },
);

// Edit with combined filters
const editedCount = await jobQueue.editAllPendingJobs(
  {
    jobType: 'email',
    tags: { values: ['urgent'], mode: 'any' },
  },
  {
    priority: 10,
    maxAttempts: 5,
  },
);

Note: Only pending jobs are edited. Jobs with other statuses (processing, completed, failed, cancelled) are not affected. Edit events are recorded for each affected job, just like single job edits.

cancelAllUpcomingJobs

cancelAllUpcomingJobs(filters?: {
  jobType?: string;
  priority?: number;
  runAt?: Date | { gt?: Date; gte?: Date; lt?: Date; lte?: Date; eq?: Date };
  tags?: { values: string[]; mode?: 'all' | 'any' | 'none' | 'exact' };
}): Promise<number>

Cancels all upcoming jobs that match the filters. Returns the number of jobs cancelled.

cleanupOldJobs

cleanupOldJobs(daysToKeep?: number): Promise<number>

Cleans up jobs older than the specified number of days. Returns the number of jobs removed.

reclaimStuckJobs

reclaimStuckJobs(maxProcessingTimeMinutes?: number): Promise<number>

Reclaims jobs stuck in 'processing' for too long. Returns the number of jobs reclaimed. If a job has a timeoutMs that is longer than the maxProcessingTimeMinutes threshold, the job's own timeout is used instead, preventing premature reclamation of long-running jobs.


Job Events

getJobEvents

getJobEvents(jobId: number): Promise<JobEvent[]>

Retrieves the job events for a job.

JobEvent

interface JobEvent {
  id: number;
  jobId: number;
  eventType: JobEventType;
  createdAt: Date;
  metadata: any;
}

JobEventType

enum JobEventType {
  Added = 'added',
  Processing = 'processing',
  Completed = 'completed',
  Failed = 'failed',
  Cancelled = 'cancelled',
  Retried = 'retried',
  Edited = 'edited',
}

Event Hooks

DataQueue emits real-time events for job lifecycle transitions. Register listeners using on, once, off, and removeAllListeners. Works identically with both PostgreSQL and Redis backends.

QueueEventMap

interface QueueEventMap {
  'job:added': { jobId: number; jobType: string };
  'job:processing': { jobId: number; jobType: string };
  'job:completed': { jobId: number; jobType: string };
  'job:failed': {
    jobId: number;
    jobType: string;
    error: Error;
    willRetry: boolean;
  };
  'job:cancelled': { jobId: number };
  'job:retried': { jobId: number };
  'job:waiting': { jobId: number; jobType: string };
  'job:progress': { jobId: number; progress: number };
  error: Error;
}

on

on(event: QueueEventName, listener: (data) => void): void

Register a listener that fires every time the event is emitted.

once

once(event: QueueEventName, listener: (data) => void): void

Register a one-time listener that auto-removes after the first invocation.

off

off(event: QueueEventName, listener: (data) => void): void

Remove a previously registered listener. Pass the exact function reference used with on or once.

removeAllListeners

removeAllListeners(event?: QueueEventName): void

Remove all listeners for a specific event, or all listeners for all events when called without arguments.

See Event Hooks for detailed usage examples.


Processing Jobs

createProcessor

createProcessor(
  handlers: JobHandlers,
  options?: ProcessorOptions
): Processor

Creates a job processor with the provided handlers and options.

ProcessorOptions

interface ProcessorOptions {
  workerId?: string;
  batchSize?: number;
  concurrency?: number;
  groupConcurrency?: number;
  pollInterval?: number;
  onError?: (error: Error) => void;
  verbose?: boolean;
  jobType?: string | string[];
}
  • groupConcurrency - Optional global per-group concurrency limit (positive integer). Applies only to jobs with group.id; ungrouped jobs are unaffected.

Background Supervisor

createSupervisor

createSupervisor(options?: SupervisorOptions): Supervisor

Creates a background supervisor that automatically runs maintenance tasks on a configurable interval: reclaiming stuck jobs, cleaning up old completed jobs/events, and expiring timed-out waitpoint tokens.

SupervisorOptions

interface SupervisorOptions {
  intervalMs?: number; // default: 60000
  stuckJobsTimeoutMinutes?: number; // default: 10
  cleanupJobsDaysToKeep?: number; // default: 30 (0 to disable)
  cleanupEventsDaysToKeep?: number; // default: 30 (0 to disable)
  cleanupBatchSize?: number; // default: 1000
  reclaimStuckJobs?: boolean; // default: true
  expireTimedOutTokens?: boolean; // default: true
  onError?: (error: Error) => void; // default: console.error
  verbose?: boolean;
}

Supervisor

interface Supervisor {
  start(): Promise<SupervisorRunResult>;
  startInBackground(): void;
  stop(): void;
  stopAndDrain(timeoutMs?: number): Promise<void>;
  isRunning(): boolean;
}
  • start() runs all tasks once and returns the results (serverless-friendly).
  • startInBackground() starts a background loop that runs every intervalMs.
  • stopAndDrain() stops the loop and waits for the current run to finish.

SupervisorRunResult

interface SupervisorRunResult {
  reclaimedJobs: number;
  cleanedUpJobs: number;
  cleanedUpEvents: number;
  expiredTokens: number;
}

See Long-Running Server for usage examples.


Accessing the Underlying Client

getPool

getPool(): Pool

Returns the PostgreSQL connection pool instance. Only available when using the PostgreSQL backend.

Throws an error if called when using the Redis backend.

getRedisClient

getRedisClient(): Redis

Returns the ioredis client instance. Only available when using the Redis backend.

Throws an error if called when using the PostgreSQL backend.