JobQueue
Initialization
initJobQueue
initJobQueue(config: JobQueueConfig): JobQueueInitializes the job queue system with the provided configuration.
JobQueueConfig
interface JobQueueConfig {
databaseConfig: {
connectionString?: string;
host?: string;
port?: number;
database?: string;
user?: string;
password?: string;
ssl?: DatabaseSSLConfig;
};
verbose?: boolean;
}DatabaseSSLConfig
interface DatabaseSSLConfig {
ca?: string;
cert?: string;
key?: string;
rejectUnauthorized?: boolean;
}ca- Client certificate authority (CA) as PEM string or file path. If the value starts with 'file://', it will be loaded from file, otherwise treated as PEM string.cert- Client certificate as PEM string or file path. If the value starts with 'file://', it will be loaded from file, otherwise treated as PEM string.key- Client private key as PEM string or file path. If the value starts with 'file://', it will be loaded from file, otherwise treated as PEM string.rejectUnauthorized- Whether to reject unauthorized certificates (default: true)
Adding Jobs
addJob
addJob(job: JobOptions): Promise<number>Adds a job to the queue. Returns the job ID.
JobOptions
interface JobOptions {
jobType: string;
payload: any;
maxAttempts?: number;
priority?: number;
runAt?: Date | null;
timeoutMs?: number;
tags?: string[];
}Retrieving Jobs
getJob
getJob(id: number): Promise<JobRecord | null>Retrieves a job by its ID.
getJobs
getJobs(
filters?: {
jobType?: string;
priority?: number;
runAt?: Date | { gt?: Date; gte?: Date; lt?: Date; lte?: Date; eq?: Date };
tags?: { values: string[]; mode?: 'all' | 'any' | 'none' | 'exact' };
},
limit?: number,
offset?: number
): Promise<JobRecord[]>Retrieves jobs matching the provided filters, with optional pagination.
getJobsByStatus
getJobsByStatus(status: string, limit?: number, offset?: number): Promise<JobRecord[]>Retrieves jobs by their status, with pagination.
getAllJobs
getAllJobs(limit?: number, offset?: number): Promise<JobRecord[]>Retrieves all jobs, with optional pagination.
getJobsByTags
getJobsByTags(tags: string[], mode?: TagQueryMode, limit?: number, offset?: number): Promise<JobRecord[]>Retrieves jobs by tag(s).
Managing Jobs
retryJob
retryJob(jobId: number): Promise<void>Retries a job given its ID.
cancelJob
cancelJob(jobId: number): Promise<void>Cancels a job given its ID.
editJob
editJob(jobId: number, updates: EditJobOptions): Promise<void>Edits a pending job given its ID. Only works for jobs with status 'pending'. Silently fails for other statuses (processing, completed, failed, cancelled).
EditJobOptions
interface EditJobOptions {
payload?: any;
maxAttempts?: number;
priority?: number;
runAt?: Date | null;
timeoutMs?: number;
tags?: string[];
}All fields are optional - only provided fields will be updated. Note that jobType cannot be changed.
Example
// Edit a pending job's payload and priority
await jobQueue.editJob(jobId, {
payload: { to: 'newemail@example.com', subject: 'Updated' },
priority: 10,
});
// Edit only the scheduled run time
await jobQueue.editJob(jobId, {
runAt: new Date(Date.now() + 60000), // Run in 1 minute
});
// Edit multiple fields at once
await jobQueue.editJob(jobId, {
payload: { to: 'updated@example.com' },
priority: 5,
maxAttempts: 10,
timeoutMs: 30000,
tags: ['urgent', 'priority'],
});editAllPendingJobs
editAllPendingJobs(
filters?: {
jobType?: string;
priority?: number;
runAt?: Date | { gt?: Date; gte?: Date; lt?: Date; lte?: Date; eq?: Date };
tags?: { values: string[]; mode?: 'all' | 'any' | 'none' | 'exact' };
},
updates: EditJobOptions
): Promise<number>Edits all pending jobs that match the filters. Only works for jobs with status 'pending'. Non-pending jobs are not affected. Returns the number of jobs that were edited.
Parameters
filters(optional): Filters to select which jobs to edit. If not provided, all pending jobs are edited.jobType: Filter by job typepriority: Filter by priorityrunAt: Filter by scheduled run time (supportsgt,gte,lt,lte,eqoperators or exact Date match)tags: Filter by tags with mode ('all', 'any', 'none', 'exact')
updates: The fields to update (same asEditJobOptions). All fields are optional - only provided fields will be updated.
Returns
The number of jobs that were successfully edited.
Examples
// Edit all pending jobs
const editedCount = await jobQueue.editAllPendingJobs(undefined, {
priority: 10,
});
// Edit all pending email jobs
const editedCount = await jobQueue.editAllPendingJobs(
{ jobType: 'email' },
{
priority: 5,
},
);
// Edit all pending jobs with 'urgent' tag
const editedCount = await jobQueue.editAllPendingJobs(
{ tags: { values: ['urgent'], mode: 'any' } },
{
priority: 10,
maxAttempts: 5,
},
);
// Edit all pending jobs scheduled in the future
const editedCount = await jobQueue.editAllPendingJobs(
{ runAt: { gte: new Date() } },
{
priority: 10,
},
);
// Edit with combined filters
const editedCount = await jobQueue.editAllPendingJobs(
{
jobType: 'email',
tags: { values: ['urgent'], mode: 'any' },
},
{
priority: 10,
maxAttempts: 5,
},
);Note: Only pending jobs are edited. Jobs with other statuses (processing, completed, failed, cancelled) are not affected. Edit events are recorded for each affected job, just like single job edits.
cancelAllUpcomingJobs
cancelAllUpcomingJobs(filters?: {
jobType?: string;
priority?: number;
runAt?: Date | { gt?: Date; gte?: Date; lt?: Date; lte?: Date; eq?: Date };
tags?: { values: string[]; mode?: 'all' | 'any' | 'none' | 'exact' };
}): Promise<number>Cancels all upcoming jobs that match the filters. Returns the number of jobs cancelled.
cleanupOldJobs
cleanupOldJobs(daysToKeep?: number): Promise<number>Cleans up jobs older than the specified number of days. Returns the number of jobs removed.
reclaimStuckJobs
reclaimStuckJobs(maxProcessingTimeMinutes?: number): Promise<number>Reclaims jobs stuck in 'processing' for too long. Returns the number of jobs reclaimed.
Job Events
getJobEvents
getJobEvents(jobId: number): Promise<JobEvent[]>Retrieves the job events for a job.
JobEvent
interface JobEvent {
id: number;
jobId: number;
eventType: JobEventType;
createdAt: Date;
metadata: any;
}JobEventType
enum JobEventType {
Added = 'added',
Processing = 'processing',
Completed = 'completed',
Failed = 'failed',
Cancelled = 'cancelled',
Retried = 'retried',
Edited = 'edited',
}Processing Jobs
createProcessor
createProcessor(
handlers: JobHandlers,
options?: ProcessorOptions
): ProcessorCreates a job processor with the provided handlers and options.
ProcessorOptions
interface ProcessorOptions {
workerId?: string;
batchSize?: number;
concurrency?: number;
pollInterval?: number;
onError?: (error: Error) => void;
verbose?: boolean;
jobType?: string | string[];
}Database Pool
getPool
getPool(): PoolReturns the database pool instance.