DataQueueDataQueue

JobQueue

JobQueue API Reference

Initialization

initJobQueue

initJobQueue(config: JobQueueConfig): JobQueue

Initializes the job queue system with the provided configuration.

JobQueueConfig

interface JobQueueConfig {
  databaseConfig: {
    connectionString?: string;
    host?: string;
    port?: number;
    database?: string;
    user?: string;
    password?: string;
    ssl?: DatabaseSSLConfig;
  };
  verbose?: boolean;
}

DatabaseSSLConfig

interface DatabaseSSLConfig {
  ca?: string;
  cert?: string;
  key?: string;
  rejectUnauthorized?: boolean;
}
  • ca - Client certificate authority (CA) as PEM string or file path. If the value starts with 'file://', it will be loaded from file, otherwise treated as PEM string.
  • cert - Client certificate as PEM string or file path. If the value starts with 'file://', it will be loaded from file, otherwise treated as PEM string.
  • key - Client private key as PEM string or file path. If the value starts with 'file://', it will be loaded from file, otherwise treated as PEM string.
  • rejectUnauthorized - Whether to reject unauthorized certificates (default: true)

Adding Jobs

addJob

addJob(job: JobOptions): Promise<number>

Adds a job to the queue. Returns the job ID.

JobOptions

interface JobOptions {
  jobType: string;
  payload: any;
  maxAttempts?: number;
  priority?: number;
  runAt?: Date | null;
  timeoutMs?: number;
  tags?: string[];
}

Retrieving Jobs

getJob

getJob(id: number): Promise<JobRecord | null>

Retrieves a job by its ID.

getJobs

getJobs(
  filters?: {
    jobType?: string;
    priority?: number;
    runAt?: Date | { gt?: Date; gte?: Date; lt?: Date; lte?: Date; eq?: Date };
    tags?: { values: string[]; mode?: 'all' | 'any' | 'none' | 'exact' };
  },
  limit?: number,
  offset?: number
): Promise<JobRecord[]>

Retrieves jobs matching the provided filters, with optional pagination.

getJobsByStatus

getJobsByStatus(status: string, limit?: number, offset?: number): Promise<JobRecord[]>

Retrieves jobs by their status, with pagination.

getAllJobs

getAllJobs(limit?: number, offset?: number): Promise<JobRecord[]>

Retrieves all jobs, with optional pagination.

getJobsByTags

getJobsByTags(tags: string[], mode?: TagQueryMode, limit?: number, offset?: number): Promise<JobRecord[]>

Retrieves jobs by tag(s).


Managing Jobs

retryJob

retryJob(jobId: number): Promise<void>

Retries a job given its ID.

cancelJob

cancelJob(jobId: number): Promise<void>

Cancels a job given its ID.

cancelAllUpcomingJobs

cancelAllUpcomingJobs(filters?: {
  jobType?: string;
  priority?: number;
  runAt?: Date | { gt?: Date; gte?: Date; lt?: Date; lte?: Date; eq?: Date };
  tags?: { values: string[]; mode?: 'all' | 'any' | 'none' | 'exact' };
}): Promise<number>

Cancels all upcoming jobs that match the filters. Returns the number of jobs cancelled.

cleanupOldJobs

cleanupOldJobs(daysToKeep?: number): Promise<number>

Cleans up jobs older than the specified number of days. Returns the number of jobs removed.

reclaimStuckJobs

reclaimStuckJobs(maxProcessingTimeMinutes?: number): Promise<number>

Reclaims jobs stuck in 'processing' for too long. Returns the number of jobs reclaimed.


Job Events

getJobEvents

getJobEvents(jobId: number): Promise<JobEvent[]>

Retrieves the job events for a job.

JobEvent

interface JobEvent {
  id: number;
  jobId: number;
  eventType: JobEventType;
  createdAt: Date;
  metadata: any;
}

JobEventType

enum JobEventType {
  Added = 'added',
  Processing = 'processing',
  Completed = 'completed',
  Failed = 'failed',
  Cancelled = 'cancelled',
  Retried = 'retried',
}

Processing Jobs

createProcessor

createProcessor(
  handlers: JobHandlers,
  options?: ProcessorOptions
): Processor

Creates a job processor with the provided handlers and options.

ProcessorOptions

interface ProcessorOptions {
  workerId?: string;
  batchSize?: number;
  concurrency?: number;
  pollInterval?: number;
  onError?: (error: Error) => void;
  verbose?: boolean;
  jobType?: string | string[];
}

Database Pool

getPool

getPool(): Pool

Returns the database pool instance.