JobQueue
JobQueue API Reference
Initialization
initJobQueue
initJobQueue(config: JobQueueConfig): JobQueue
Initializes the job queue system with the provided configuration.
JobQueueConfig
interface JobQueueConfig {
databaseConfig: {
connectionString?: string;
host?: string;
port?: number;
database?: string;
user?: string;
password?: string;
ssl?: DatabaseSSLConfig;
};
verbose?: boolean;
}
DatabaseSSLConfig
interface DatabaseSSLConfig {
ca?: string;
cert?: string;
key?: string;
rejectUnauthorized?: boolean;
}
ca
- Client certificate authority (CA) as PEM string or file path. If the value starts with 'file://', it will be loaded from file, otherwise treated as PEM string.cert
- Client certificate as PEM string or file path. If the value starts with 'file://', it will be loaded from file, otherwise treated as PEM string.key
- Client private key as PEM string or file path. If the value starts with 'file://', it will be loaded from file, otherwise treated as PEM string.rejectUnauthorized
- Whether to reject unauthorized certificates (default: true)
Adding Jobs
addJob
addJob(job: JobOptions): Promise<number>
Adds a job to the queue. Returns the job ID.
JobOptions
interface JobOptions {
jobType: string;
payload: any;
maxAttempts?: number;
priority?: number;
runAt?: Date | null;
timeoutMs?: number;
tags?: string[];
}
Retrieving Jobs
getJob
getJob(id: number): Promise<JobRecord | null>
Retrieves a job by its ID.
getJobs
getJobs(
filters?: {
jobType?: string;
priority?: number;
runAt?: Date | { gt?: Date; gte?: Date; lt?: Date; lte?: Date; eq?: Date };
tags?: { values: string[]; mode?: 'all' | 'any' | 'none' | 'exact' };
},
limit?: number,
offset?: number
): Promise<JobRecord[]>
Retrieves jobs matching the provided filters, with optional pagination.
getJobsByStatus
getJobsByStatus(status: string, limit?: number, offset?: number): Promise<JobRecord[]>
Retrieves jobs by their status, with pagination.
getAllJobs
getAllJobs(limit?: number, offset?: number): Promise<JobRecord[]>
Retrieves all jobs, with optional pagination.
getJobsByTags
getJobsByTags(tags: string[], mode?: TagQueryMode, limit?: number, offset?: number): Promise<JobRecord[]>
Retrieves jobs by tag(s).
Managing Jobs
retryJob
retryJob(jobId: number): Promise<void>
Retries a job given its ID.
cancelJob
cancelJob(jobId: number): Promise<void>
Cancels a job given its ID.
cancelAllUpcomingJobs
cancelAllUpcomingJobs(filters?: {
jobType?: string;
priority?: number;
runAt?: Date | { gt?: Date; gte?: Date; lt?: Date; lte?: Date; eq?: Date };
tags?: { values: string[]; mode?: 'all' | 'any' | 'none' | 'exact' };
}): Promise<number>
Cancels all upcoming jobs that match the filters. Returns the number of jobs cancelled.
cleanupOldJobs
cleanupOldJobs(daysToKeep?: number): Promise<number>
Cleans up jobs older than the specified number of days. Returns the number of jobs removed.
reclaimStuckJobs
reclaimStuckJobs(maxProcessingTimeMinutes?: number): Promise<number>
Reclaims jobs stuck in 'processing' for too long. Returns the number of jobs reclaimed.
Job Events
getJobEvents
getJobEvents(jobId: number): Promise<JobEvent[]>
Retrieves the job events for a job.
JobEvent
interface JobEvent {
id: number;
jobId: number;
eventType: JobEventType;
createdAt: Date;
metadata: any;
}
JobEventType
enum JobEventType {
Added = 'added',
Processing = 'processing',
Completed = 'completed',
Failed = 'failed',
Cancelled = 'cancelled',
Retried = 'retried',
}
Processing Jobs
createProcessor
createProcessor(
handlers: JobHandlers,
options?: ProcessorOptions
): Processor
Creates a job processor with the provided handlers and options.
ProcessorOptions
interface ProcessorOptions {
workerId?: string;
batchSize?: number;
concurrency?: number;
pollInterval?: number;
onError?: (error: Error) => void;
verbose?: boolean;
jobType?: string | string[];
}
Database Pool
getPool
getPool(): Pool
Returns the database pool instance.