JobQueue
Initialization
initJobQueue
initJobQueue(config: JobQueueConfig): JobQueueInitializes the job queue system with the provided configuration. The JobQueueConfig is a discriminated union -- you provide either a PostgreSQL or Redis configuration.
PostgresJobQueueConfig
interface PostgresJobQueueConfig {
backend?: 'postgres'; // Optional, defaults to 'postgres'
databaseConfig: {
connectionString?: string;
host?: string;
port?: number;
database?: string;
user?: string;
password?: string;
ssl?: DatabaseSSLConfig;
};
verbose?: boolean;
}RedisJobQueueConfig
interface RedisJobQueueConfig {
backend: 'redis'; // Required
redisConfig: {
url?: string;
host?: string;
port?: number;
password?: string;
db?: number;
tls?: RedisTLSConfig;
keyPrefix?: string; // Default: 'dq:'
};
verbose?: boolean;
}JobQueueConfig
type JobQueueConfig = PostgresJobQueueConfig | RedisJobQueueConfig;DatabaseSSLConfig
interface DatabaseSSLConfig {
ca?: string;
cert?: string;
key?: string;
rejectUnauthorized?: boolean;
}ca- Client certificate authority (CA) as PEM string or file path. If the value starts with 'file://', it will be loaded from file, otherwise treated as PEM string.cert- Client certificate as PEM string or file path. If the value starts with 'file://', it will be loaded from file, otherwise treated as PEM string.key- Client private key as PEM string or file path. If the value starts with 'file://', it will be loaded from file, otherwise treated as PEM string.rejectUnauthorized- Whether to reject unauthorized certificates (default: true)
RedisTLSConfig
interface RedisTLSConfig {
ca?: string;
cert?: string;
key?: string;
rejectUnauthorized?: boolean;
}Adding Jobs
addJob
addJob(job: JobOptions): Promise<number>Adds a job to the queue. Returns the job ID.
JobOptions
interface JobOptions {
jobType: string;
payload: any;
maxAttempts?: number;
priority?: number;
runAt?: Date | null;
timeoutMs?: number;
tags?: string[];
}Retrieving Jobs
getJob
getJob(id: number): Promise<JobRecord | null>Retrieves a job by its ID.
getJobs
getJobs(
filters?: {
jobType?: string;
priority?: number;
runAt?: Date | { gt?: Date; gte?: Date; lt?: Date; lte?: Date; eq?: Date };
tags?: { values: string[]; mode?: 'all' | 'any' | 'none' | 'exact' };
},
limit?: number,
offset?: number
): Promise<JobRecord[]>Retrieves jobs matching the provided filters, with optional pagination.
getJobsByStatus
getJobsByStatus(status: string, limit?: number, offset?: number): Promise<JobRecord[]>Retrieves jobs by their status, with pagination.
getAllJobs
getAllJobs(limit?: number, offset?: number): Promise<JobRecord[]>Retrieves all jobs, with optional pagination.
getJobsByTags
getJobsByTags(tags: string[], mode?: TagQueryMode, limit?: number, offset?: number): Promise<JobRecord[]>Retrieves jobs by tag(s).
Managing Jobs
retryJob
retryJob(jobId: number): Promise<void>Retries a job given its ID.
cancelJob
cancelJob(jobId: number): Promise<void>Cancels a job given its ID.
editJob
editJob(jobId: number, updates: EditJobOptions): Promise<void>Edits a pending job given its ID. Only works for jobs with status 'pending'. Silently fails for other statuses (processing, completed, failed, cancelled).
EditJobOptions
interface EditJobOptions {
payload?: any;
maxAttempts?: number;
priority?: number;
runAt?: Date | null;
timeoutMs?: number;
tags?: string[];
}All fields are optional - only provided fields will be updated. Note that jobType cannot be changed.
Example
// Edit a pending job's payload and priority
await jobQueue.editJob(jobId, {
payload: { to: 'newemail@example.com', subject: 'Updated' },
priority: 10,
});
// Edit only the scheduled run time
await jobQueue.editJob(jobId, {
runAt: new Date(Date.now() + 60000), // Run in 1 minute
});
// Edit multiple fields at once
await jobQueue.editJob(jobId, {
payload: { to: 'updated@example.com' },
priority: 5,
maxAttempts: 10,
timeoutMs: 30000,
tags: ['urgent', 'priority'],
});editAllPendingJobs
editAllPendingJobs(
filters?: {
jobType?: string;
priority?: number;
runAt?: Date | { gt?: Date; gte?: Date; lt?: Date; lte?: Date; eq?: Date };
tags?: { values: string[]; mode?: 'all' | 'any' | 'none' | 'exact' };
},
updates: EditJobOptions
): Promise<number>Edits all pending jobs that match the filters. Only works for jobs with status 'pending'. Non-pending jobs are not affected. Returns the number of jobs that were edited.
Parameters
filters(optional): Filters to select which jobs to edit. If not provided, all pending jobs are edited.jobType: Filter by job typepriority: Filter by priorityrunAt: Filter by scheduled run time (supportsgt,gte,lt,lte,eqoperators or exact Date match)tags: Filter by tags with mode ('all', 'any', 'none', 'exact')
updates: The fields to update (same asEditJobOptions). All fields are optional - only provided fields will be updated.
Returns
The number of jobs that were successfully edited.
Examples
// Edit all pending jobs
const editedCount = await jobQueue.editAllPendingJobs(undefined, {
priority: 10,
});
// Edit all pending email jobs
const editedCount = await jobQueue.editAllPendingJobs(
{ jobType: 'email' },
{
priority: 5,
},
);
// Edit all pending jobs with 'urgent' tag
const editedCount = await jobQueue.editAllPendingJobs(
{ tags: { values: ['urgent'], mode: 'any' } },
{
priority: 10,
maxAttempts: 5,
},
);
// Edit all pending jobs scheduled in the future
const editedCount = await jobQueue.editAllPendingJobs(
{ runAt: { gte: new Date() } },
{
priority: 10,
},
);
// Edit with combined filters
const editedCount = await jobQueue.editAllPendingJobs(
{
jobType: 'email',
tags: { values: ['urgent'], mode: 'any' },
},
{
priority: 10,
maxAttempts: 5,
},
);Note: Only pending jobs are edited. Jobs with other statuses (processing, completed, failed, cancelled) are not affected. Edit events are recorded for each affected job, just like single job edits.
cancelAllUpcomingJobs
cancelAllUpcomingJobs(filters?: {
jobType?: string;
priority?: number;
runAt?: Date | { gt?: Date; gte?: Date; lt?: Date; lte?: Date; eq?: Date };
tags?: { values: string[]; mode?: 'all' | 'any' | 'none' | 'exact' };
}): Promise<number>Cancels all upcoming jobs that match the filters. Returns the number of jobs cancelled.
cleanupOldJobs
cleanupOldJobs(daysToKeep?: number): Promise<number>Cleans up jobs older than the specified number of days. Returns the number of jobs removed.
reclaimStuckJobs
reclaimStuckJobs(maxProcessingTimeMinutes?: number): Promise<number>Reclaims jobs stuck in 'processing' for too long. Returns the number of jobs reclaimed. If a job has a timeoutMs that is longer than the maxProcessingTimeMinutes threshold, the job's own timeout is used instead, preventing premature reclamation of long-running jobs.
Job Events
getJobEvents
getJobEvents(jobId: number): Promise<JobEvent[]>Retrieves the job events for a job.
JobEvent
interface JobEvent {
id: number;
jobId: number;
eventType: JobEventType;
createdAt: Date;
metadata: any;
}JobEventType
enum JobEventType {
Added = 'added',
Processing = 'processing',
Completed = 'completed',
Failed = 'failed',
Cancelled = 'cancelled',
Retried = 'retried',
Edited = 'edited',
}Processing Jobs
createProcessor
createProcessor(
handlers: JobHandlers,
options?: ProcessorOptions
): ProcessorCreates a job processor with the provided handlers and options.
ProcessorOptions
interface ProcessorOptions {
workerId?: string;
batchSize?: number;
concurrency?: number;
pollInterval?: number;
onError?: (error: Error) => void;
verbose?: boolean;
jobType?: string | string[];
}Accessing the Underlying Client
getPool
getPool(): PoolReturns the PostgreSQL connection pool instance. Only available when using the PostgreSQL backend.
Throws an error if called when using the Redis backend.
getRedisClient
getRedisClient(): RedisReturns the ioredis client instance. Only available when using the Redis backend.
Throws an error if called when using the PostgreSQL backend.