JobQueue
Initialization
initJobQueue
initJobQueue(config: JobQueueConfig): JobQueueInitializes the job queue system with the provided configuration. The JobQueueConfig is a discriminated union -- you provide either a PostgreSQL or Redis configuration.
PostgresJobQueueConfig
Provide either databaseConfig (the library creates a pool) or pool (bring your own pg.Pool). At least one must be set.
interface PostgresJobQueueConfig {
backend?: 'postgres'; // Optional, defaults to 'postgres'
databaseConfig?: {
connectionString?: string;
host?: string;
port?: number;
database?: string;
user?: string;
password?: string;
ssl?: DatabaseSSLConfig;
};
pool?: import('pg').Pool; // Bring your own pool
verbose?: boolean;
}RedisJobQueueConfig
Provide either redisConfig (the library creates an ioredis client) or client (bring your own). At least one must be set.
interface RedisJobQueueConfig {
backend: 'redis'; // Required
redisConfig?: {
url?: string;
host?: string;
port?: number;
password?: string;
db?: number;
tls?: RedisTLSConfig;
keyPrefix?: string; // Default: 'dq:'
};
client?: unknown; // Bring your own ioredis client
keyPrefix?: string; // Key prefix when using external client (default: 'dq:')
verbose?: boolean;
}JobQueueConfig
type JobQueueConfig = PostgresJobQueueConfig | RedisJobQueueConfig;DatabaseSSLConfig
interface DatabaseSSLConfig {
ca?: string;
cert?: string;
key?: string;
rejectUnauthorized?: boolean;
}ca- Client certificate authority (CA) as PEM string or file path. If the value starts with 'file://', it will be loaded from file, otherwise treated as PEM string.cert- Client certificate as PEM string or file path. If the value starts with 'file://', it will be loaded from file, otherwise treated as PEM string.key- Client private key as PEM string or file path. If the value starts with 'file://', it will be loaded from file, otherwise treated as PEM string.rejectUnauthorized- Whether to reject unauthorized certificates (default: true)
RedisTLSConfig
interface RedisTLSConfig {
ca?: string;
cert?: string;
key?: string;
rejectUnauthorized?: boolean;
}Adding Jobs
addJob
addJob(job: JobOptions, options?: AddJobOptions): Promise<number>Adds a job to the queue. Returns the job ID.
JobOptions
interface JobOptions {
jobType: string;
payload: any;
maxAttempts?: number;
priority?: number;
runAt?: Date | null;
timeoutMs?: number;
tags?: string[];
idempotencyKey?: string;
retryDelay?: number; // Base delay between retries in seconds (default: 60)
retryBackoff?: boolean; // Use exponential backoff (default: true)
retryDelayMax?: number; // Max delay cap in seconds (default: none)
deadLetterJobType?: string; // Route exhausted failures to this job type
group?: { id: string; tier?: string }; // Optional group for global concurrency limits
}retryDelay- Base delay between retries in seconds. WhenretryBackoffis true, this is the base for exponential backoff (retryDelay * 2^attempts). When false, retries use this fixed delay. Default:60.retryBackoff- Whether to use exponential backoff. When true, delay doubles with each attempt and includes jitter. Default:true.retryDelayMax- Maximum delay cap in seconds. Only meaningful whenretryBackoffis true. No limit when omitted.deadLetterJobType- Optional dead-letter destination. When retries are exhausted, a new pending job is created in this job type with an envelope payload (originalJob,originalPayload,failure).group- Optional grouping metadata. Usegroup.idto enforce global per-group limits withProcessorOptions.groupConcurrency.group.tieris reserved for future policies.
AddJobOptions
interface AddJobOptions {
db?: DatabaseClient;
}db— An external database client (e.g., apg.PoolClientinside a transaction). When provided, the INSERT runs on this client instead of the internal pool. PostgreSQL only. Throws if used with the Redis backend.
addJobs
addJobs(jobs: JobOptions[], options?: AddJobOptions): Promise<number[]>Adds multiple jobs to the queue in a single operation. More efficient than calling addJob in a loop because it batches the INSERT into a single database round-trip (PostgreSQL) or a single atomic Lua script (Redis).
Returns an array of job IDs in the same order as the input array.
Each job can independently have its own priority, runAt, tags, idempotencyKey, and other options. Idempotency keys are handled per-job — duplicates resolve to the existing job's ID without creating a new row.
Passing an empty array returns [] immediately without touching the database.
const jobIds = await jobQueue.addJobs([
{
jobType: 'email',
payload: { to: 'a@example.com', subject: 'Hi', body: '...' },
},
{
jobType: 'email',
payload: { to: 'b@example.com', subject: 'Hi', body: '...' },
priority: 10,
},
{
jobType: 'report',
payload: { reportId: '123', userId: '456' },
tags: ['monthly'],
},
]);
// jobIds = [1, 2, 3]The { db } option works the same as addJob — pass a transactional client to batch-insert within an existing transaction (PostgreSQL only).
DatabaseClient
interface DatabaseClient {
query(
text: string,
values?: any[],
): Promise<{ rows: any[]; rowCount: number | null }>;
}Any object matching this interface works — pg.Pool, pg.PoolClient, pg.Client, or ORM query runners that expose a raw query() method.
Retrieving Jobs
getJob
getJob(id: number): Promise<JobRecord | null>Retrieves a job by its ID.
getJobs
getJobs(
filters?: {
jobType?: string;
priority?: number;
runAt?: Date | { gt?: Date; gte?: Date; lt?: Date; lte?: Date; eq?: Date };
tags?: { values: string[]; mode?: 'all' | 'any' | 'none' | 'exact' };
},
limit?: number,
offset?: number
): Promise<JobRecord[]>Retrieves jobs matching the provided filters, with optional pagination.
getJobsByStatus
getJobsByStatus(status: string, limit?: number, offset?: number): Promise<JobRecord[]>Retrieves jobs by their status, with pagination.
getAllJobs
getAllJobs(limit?: number, offset?: number): Promise<JobRecord[]>Retrieves all jobs, with optional pagination.
getJobsByTags
getJobsByTags(tags: string[], mode?: TagQueryMode, limit?: number, offset?: number): Promise<JobRecord[]>Retrieves jobs by tag(s).
Managing Jobs
retryJob
retryJob(jobId: number): Promise<void>Retries a job given its ID.
cancelJob
cancelJob(jobId: number): Promise<void>Cancels a job given its ID.
editJob
editJob(jobId: number, updates: EditJobOptions): Promise<void>Edits a pending job given its ID. Only works for jobs with status 'pending'. Silently fails for other statuses (processing, completed, failed, cancelled).
EditJobOptions
interface EditJobOptions {
payload?: any;
maxAttempts?: number;
priority?: number;
runAt?: Date | null;
timeoutMs?: number;
tags?: string[];
retryDelay?: number | null;
retryBackoff?: boolean | null;
retryDelayMax?: number | null;
deadLetterJobType?: string | null;
}All fields are optional - only provided fields will be updated. Note that jobType cannot be changed. Set retry fields to null to revert to legacy default behavior. Set deadLetterJobType to null to clear dead-letter routing for pending jobs.
Example
// Edit a pending job's payload and priority
await jobQueue.editJob(jobId, {
payload: { to: 'newemail@example.com', subject: 'Updated' },
priority: 10,
});
// Edit only the scheduled run time
await jobQueue.editJob(jobId, {
runAt: new Date(Date.now() + 60000), // Run in 1 minute
});
// Edit multiple fields at once
await jobQueue.editJob(jobId, {
payload: { to: 'updated@example.com' },
priority: 5,
maxAttempts: 10,
timeoutMs: 30000,
tags: ['urgent', 'priority'],
});editAllPendingJobs
editAllPendingJobs(
filters?: {
jobType?: string;
priority?: number;
runAt?: Date | { gt?: Date; gte?: Date; lt?: Date; lte?: Date; eq?: Date };
tags?: { values: string[]; mode?: 'all' | 'any' | 'none' | 'exact' };
},
updates: EditJobOptions
): Promise<number>Edits all pending jobs that match the filters. Only works for jobs with status 'pending'. Non-pending jobs are not affected. Returns the number of jobs that were edited.
Parameters
filters(optional): Filters to select which jobs to edit. If not provided, all pending jobs are edited.jobType: Filter by job typepriority: Filter by priorityrunAt: Filter by scheduled run time (supportsgt,gte,lt,lte,eqoperators or exact Date match)tags: Filter by tags with mode ('all', 'any', 'none', 'exact')
updates: The fields to update (same asEditJobOptions). All fields are optional - only provided fields will be updated.
Returns
The number of jobs that were successfully edited.
Examples
// Edit all pending jobs
const editedCount = await jobQueue.editAllPendingJobs(undefined, {
priority: 10,
});
// Edit all pending email jobs
const editedCount = await jobQueue.editAllPendingJobs(
{ jobType: 'email' },
{
priority: 5,
},
);
// Edit all pending jobs with 'urgent' tag
const editedCount = await jobQueue.editAllPendingJobs(
{ tags: { values: ['urgent'], mode: 'any' } },
{
priority: 10,
maxAttempts: 5,
},
);
// Edit all pending jobs scheduled in the future
const editedCount = await jobQueue.editAllPendingJobs(
{ runAt: { gte: new Date() } },
{
priority: 10,
},
);
// Edit with combined filters
const editedCount = await jobQueue.editAllPendingJobs(
{
jobType: 'email',
tags: { values: ['urgent'], mode: 'any' },
},
{
priority: 10,
maxAttempts: 5,
},
);Note: Only pending jobs are edited. Jobs with other statuses (processing, completed, failed, cancelled) are not affected. Edit events are recorded for each affected job, just like single job edits.
cancelAllUpcomingJobs
cancelAllUpcomingJobs(filters?: {
jobType?: string;
priority?: number;
runAt?: Date | { gt?: Date; gte?: Date; lt?: Date; lte?: Date; eq?: Date };
tags?: { values: string[]; mode?: 'all' | 'any' | 'none' | 'exact' };
}): Promise<number>Cancels all upcoming jobs that match the filters. Returns the number of jobs cancelled.
cleanupOldJobs
cleanupOldJobs(daysToKeep?: number): Promise<number>Cleans up jobs older than the specified number of days. Returns the number of jobs removed.
reclaimStuckJobs
reclaimStuckJobs(maxProcessingTimeMinutes?: number): Promise<number>Reclaims jobs stuck in 'processing' for too long. Returns the number of jobs reclaimed. If a job has a timeoutMs that is longer than the maxProcessingTimeMinutes threshold, the job's own timeout is used instead, preventing premature reclamation of long-running jobs.
Job Events
getJobEvents
getJobEvents(jobId: number): Promise<JobEvent[]>Retrieves the job events for a job.
JobEvent
interface JobEvent {
id: number;
jobId: number;
eventType: JobEventType;
createdAt: Date;
metadata: any;
}JobEventType
enum JobEventType {
Added = 'added',
Processing = 'processing',
Completed = 'completed',
Failed = 'failed',
Cancelled = 'cancelled',
Retried = 'retried',
Edited = 'edited',
}Event Hooks
DataQueue emits real-time events for job lifecycle transitions. Register listeners using on, once, off, and removeAllListeners. Works identically with both PostgreSQL and Redis backends.
QueueEventMap
interface QueueEventMap {
'job:added': { jobId: number; jobType: string };
'job:processing': { jobId: number; jobType: string };
'job:completed': { jobId: number; jobType: string };
'job:failed': {
jobId: number;
jobType: string;
error: Error;
willRetry: boolean;
};
'job:cancelled': { jobId: number };
'job:retried': { jobId: number };
'job:waiting': { jobId: number; jobType: string };
'job:progress': { jobId: number; progress: number };
error: Error;
}on
on(event: QueueEventName, listener: (data) => void): voidRegister a listener that fires every time the event is emitted.
once
once(event: QueueEventName, listener: (data) => void): voidRegister a one-time listener that auto-removes after the first invocation.
off
off(event: QueueEventName, listener: (data) => void): voidRemove a previously registered listener. Pass the exact function reference used with on or once.
removeAllListeners
removeAllListeners(event?: QueueEventName): voidRemove all listeners for a specific event, or all listeners for all events when called without arguments.
See Event Hooks for detailed usage examples.
Processing Jobs
createProcessor
createProcessor(
handlers: JobHandlers,
options?: ProcessorOptions
): ProcessorCreates a job processor with the provided handlers and options.
ProcessorOptions
interface ProcessorOptions {
workerId?: string;
batchSize?: number;
concurrency?: number;
groupConcurrency?: number;
pollInterval?: number;
onError?: (error: Error) => void;
verbose?: boolean;
jobType?: string | string[];
}groupConcurrency- Optional global per-group concurrency limit (positive integer). Applies only to jobs withgroup.id; ungrouped jobs are unaffected.
Background Supervisor
createSupervisor
createSupervisor(options?: SupervisorOptions): SupervisorCreates a background supervisor that automatically runs maintenance tasks on a configurable interval: reclaiming stuck jobs, cleaning up old completed jobs/events, and expiring timed-out waitpoint tokens.
SupervisorOptions
interface SupervisorOptions {
intervalMs?: number; // default: 60000
stuckJobsTimeoutMinutes?: number; // default: 10
cleanupJobsDaysToKeep?: number; // default: 30 (0 to disable)
cleanupEventsDaysToKeep?: number; // default: 30 (0 to disable)
cleanupBatchSize?: number; // default: 1000
reclaimStuckJobs?: boolean; // default: true
expireTimedOutTokens?: boolean; // default: true
onError?: (error: Error) => void; // default: console.error
verbose?: boolean;
}Supervisor
interface Supervisor {
start(): Promise<SupervisorRunResult>;
startInBackground(): void;
stop(): void;
stopAndDrain(timeoutMs?: number): Promise<void>;
isRunning(): boolean;
}start()runs all tasks once and returns the results (serverless-friendly).startInBackground()starts a background loop that runs everyintervalMs.stopAndDrain()stops the loop and waits for the current run to finish.
SupervisorRunResult
interface SupervisorRunResult {
reclaimedJobs: number;
cleanedUpJobs: number;
cleanedUpEvents: number;
expiredTokens: number;
}See Long-Running Server for usage examples.
Accessing the Underlying Client
getPool
getPool(): PoolReturns the PostgreSQL connection pool instance. Only available when using the PostgreSQL backend.
Throws an error if called when using the Redis backend.
getRedisClient
getRedisClient(): RedisReturns the ioredis client instance. Only available when using the Redis backend.
Throws an error if called when using the PostgreSQL backend.