JobQueue
The JobQueue
interface is the main entry point for interacting with the job queue system. It provides methods to add, retrieve, process, and manage jobs.
Methods
initJobQueue
initJobQueue(config: JobQueueConfig): Promise<JobQueue>
Initialize the job queue system.
This function is the main entry point for initializing the job queue system. It takes a JobQueueConfig
object as an argument, which contains the configuration for the job queue system.
The JobQueueConfig
object contains the following properties:
databaseConfig
: An object containing the configuration for the database.verbose
: A boolean indicating whether to enable verbose logging.
The function returns a JobQueue
object, which provides methods to add, retrieve, process, and manage jobs.
JobQueueConfig
interface JobQueueConfig {
databaseConfig: {
connectionString?: string;
host?: string;
port?: number;
database?: string;
user?: string;
password?: string;
ssl?: any;
};
verbose?: boolean;
}
Fields
connectionString
: A string containing the connection string for the database.host
: A string containing the host of the database.port
: A number containing the port of the database.database
: A string containing the name of the database.user
: A string containing the username for the database.password
: A string containing the password for the database.ssl
: An object containing the SSL configuration for the database.verbose
: A boolean indicating whether to enable verbose logging.
JobQueue
The JobQueue
object is returned by the initJobQueue
function. It provides methods to add, retrieve, process, and manage jobs.
addJob
addJob(job: JobOptions): Promise<number>
Add a job to the queue.
JobOptions
interface JobOptions {
jobType: string;
payload: any;
maxAttempts?: number;
priority?: number;
runAt?: Date | null;
timeoutMs?: number;
}
jobType
: A string containing the type of the job.payload
: An object containing the payload of the job.maxAttempts
: A number containing the maximum number of attempts for the job.priority
: A number containing the priority of the job.runAt
: A date containing the time the job is scheduled to run at.timeoutMs
: A number containing the timeout for the job in milliseconds.
getJob
getJob(id: number): Promise<JobRecord | null>
Get a job by its ID.
getJobsByStatus
getJobsByStatus(
status: string,
limit?: number,
offset?: number
): Promise<JobRecord[]>
Get jobs by their status, with pagination.
getAllJobs
getAllJobs(limit?: number, offset?: number): Promise<JobRecord[]>
Get all jobs, with optional pagination.
retryJob
retryJob(jobId: number): Promise<void>
Retry a job given its ID.
cleanupOldJobs
cleanupOldJobs(daysToKeep?: number): Promise<number>
Cleanup jobs that are older than the specified number of days.
cancelJob
cancelJob(jobId: number): Promise<void>
Cancel a job given its ID.
reclaimStuckJobs
reclaimStuckJobs(maxProcessingTimeMinutes?: number): Promise<number>
Reclaim jobs stuck in 'processing' for too long.
cancelAllUpcomingJobs
cancelAllUpcomingJobs(filters?): Promise<number>
Cancel all upcoming jobs that match the filters.
createProcessor
createProcessor(
handlers: JobHandlers,
options?: ProcessorOptions
): Processor
Create a job processor. Handlers must be provided per-processor.
ProcessorOptions
interface ProcessorOptions {
workerId?: string;
batchSize?: number;
concurrency?: number;
pollInterval?: number;
onError?: (error: Error) => void;
verbose?: boolean;
jobType?: string | string[];
}
workerId
: A string identifying the worker.batchSize
: The number of jobs to process whenstart
is called.concurrency
: The number of jobs to process in parallel.pollInterval
: The interval in milliseconds to poll for new jobs.onError
: A function to call when an error occurs.verbose
: A boolean indicating whether to enable verbose logging.jobType
: A string or array of strings containing the job type(s) to process.
getJobEvents
getJobEvents(jobId: number): Promise<JobEvent[]>
Get the job events for a job.
JobEvent
interface JobEvent {
id: number;
jobId: number;
eventType: JobEventType;
createdAt: Date;
metadata: any;
}
id
: The ID of the event.jobId
: The ID of the job.eventType
: The type of event.createdAt
: The date and time the event was created.metadata
: The metadata of the event.
JobEventType
enum JobEventType {
Added = 'added',
Processing = 'processing',
Completed = 'completed',
Failed = 'failed',
Cancelled = 'cancelled',
Retried = 'retried',
}
getPool
getPool(): Pool
Get the database pool.