DataQueueDataQueue

JobQueue

Initialization

initJobQueue

initJobQueue(config: JobQueueConfig): JobQueue

Initializes the job queue system with the provided configuration.

JobQueueConfig

interface JobQueueConfig {
  databaseConfig: {
    connectionString?: string;
    host?: string;
    port?: number;
    database?: string;
    user?: string;
    password?: string;
    ssl?: DatabaseSSLConfig;
  };
  verbose?: boolean;
}

DatabaseSSLConfig

interface DatabaseSSLConfig {
  ca?: string;
  cert?: string;
  key?: string;
  rejectUnauthorized?: boolean;
}
  • ca - Client certificate authority (CA) as PEM string or file path. If the value starts with 'file://', it will be loaded from file, otherwise treated as PEM string.
  • cert - Client certificate as PEM string or file path. If the value starts with 'file://', it will be loaded from file, otherwise treated as PEM string.
  • key - Client private key as PEM string or file path. If the value starts with 'file://', it will be loaded from file, otherwise treated as PEM string.
  • rejectUnauthorized - Whether to reject unauthorized certificates (default: true)

Adding Jobs

addJob

addJob(job: JobOptions): Promise<number>

Adds a job to the queue. Returns the job ID.

JobOptions

interface JobOptions {
  jobType: string;
  payload: any;
  maxAttempts?: number;
  priority?: number;
  runAt?: Date | null;
  timeoutMs?: number;
  tags?: string[];
}

Retrieving Jobs

getJob

getJob(id: number): Promise<JobRecord | null>

Retrieves a job by its ID.

getJobs

getJobs(
  filters?: {
    jobType?: string;
    priority?: number;
    runAt?: Date | { gt?: Date; gte?: Date; lt?: Date; lte?: Date; eq?: Date };
    tags?: { values: string[]; mode?: 'all' | 'any' | 'none' | 'exact' };
  },
  limit?: number,
  offset?: number
): Promise<JobRecord[]>

Retrieves jobs matching the provided filters, with optional pagination.

getJobsByStatus

getJobsByStatus(status: string, limit?: number, offset?: number): Promise<JobRecord[]>

Retrieves jobs by their status, with pagination.

getAllJobs

getAllJobs(limit?: number, offset?: number): Promise<JobRecord[]>

Retrieves all jobs, with optional pagination.

getJobsByTags

getJobsByTags(tags: string[], mode?: TagQueryMode, limit?: number, offset?: number): Promise<JobRecord[]>

Retrieves jobs by tag(s).


Managing Jobs

retryJob

retryJob(jobId: number): Promise<void>

Retries a job given its ID.

cancelJob

cancelJob(jobId: number): Promise<void>

Cancels a job given its ID.

editJob

editJob(jobId: number, updates: EditJobOptions): Promise<void>

Edits a pending job given its ID. Only works for jobs with status 'pending'. Silently fails for other statuses (processing, completed, failed, cancelled).

EditJobOptions

interface EditJobOptions {
  payload?: any;
  maxAttempts?: number;
  priority?: number;
  runAt?: Date | null;
  timeoutMs?: number;
  tags?: string[];
}

All fields are optional - only provided fields will be updated. Note that jobType cannot be changed.

Example

// Edit a pending job's payload and priority
await jobQueue.editJob(jobId, {
  payload: { to: 'newemail@example.com', subject: 'Updated' },
  priority: 10,
});

// Edit only the scheduled run time
await jobQueue.editJob(jobId, {
  runAt: new Date(Date.now() + 60000), // Run in 1 minute
});

// Edit multiple fields at once
await jobQueue.editJob(jobId, {
  payload: { to: 'updated@example.com' },
  priority: 5,
  maxAttempts: 10,
  timeoutMs: 30000,
  tags: ['urgent', 'priority'],
});

editAllPendingJobs

editAllPendingJobs(
  filters?: {
    jobType?: string;
    priority?: number;
    runAt?: Date | { gt?: Date; gte?: Date; lt?: Date; lte?: Date; eq?: Date };
    tags?: { values: string[]; mode?: 'all' | 'any' | 'none' | 'exact' };
  },
  updates: EditJobOptions
): Promise<number>

Edits all pending jobs that match the filters. Only works for jobs with status 'pending'. Non-pending jobs are not affected. Returns the number of jobs that were edited.

Parameters

  • filters (optional): Filters to select which jobs to edit. If not provided, all pending jobs are edited.
    • jobType: Filter by job type
    • priority: Filter by priority
    • runAt: Filter by scheduled run time (supports gt, gte, lt, lte, eq operators or exact Date match)
    • tags: Filter by tags with mode ('all', 'any', 'none', 'exact')
  • updates: The fields to update (same as EditJobOptions). All fields are optional - only provided fields will be updated.

Returns

The number of jobs that were successfully edited.

Examples

// Edit all pending jobs
const editedCount = await jobQueue.editAllPendingJobs(undefined, {
  priority: 10,
});

// Edit all pending email jobs
const editedCount = await jobQueue.editAllPendingJobs(
  { jobType: 'email' },
  {
    priority: 5,
  },
);

// Edit all pending jobs with 'urgent' tag
const editedCount = await jobQueue.editAllPendingJobs(
  { tags: { values: ['urgent'], mode: 'any' } },
  {
    priority: 10,
    maxAttempts: 5,
  },
);

// Edit all pending jobs scheduled in the future
const editedCount = await jobQueue.editAllPendingJobs(
  { runAt: { gte: new Date() } },
  {
    priority: 10,
  },
);

// Edit with combined filters
const editedCount = await jobQueue.editAllPendingJobs(
  {
    jobType: 'email',
    tags: { values: ['urgent'], mode: 'any' },
  },
  {
    priority: 10,
    maxAttempts: 5,
  },
);

Note: Only pending jobs are edited. Jobs with other statuses (processing, completed, failed, cancelled) are not affected. Edit events are recorded for each affected job, just like single job edits.

cancelAllUpcomingJobs

cancelAllUpcomingJobs(filters?: {
  jobType?: string;
  priority?: number;
  runAt?: Date | { gt?: Date; gte?: Date; lt?: Date; lte?: Date; eq?: Date };
  tags?: { values: string[]; mode?: 'all' | 'any' | 'none' | 'exact' };
}): Promise<number>

Cancels all upcoming jobs that match the filters. Returns the number of jobs cancelled.

cleanupOldJobs

cleanupOldJobs(daysToKeep?: number): Promise<number>

Cleans up jobs older than the specified number of days. Returns the number of jobs removed.

reclaimStuckJobs

reclaimStuckJobs(maxProcessingTimeMinutes?: number): Promise<number>

Reclaims jobs stuck in 'processing' for too long. Returns the number of jobs reclaimed.


Job Events

getJobEvents

getJobEvents(jobId: number): Promise<JobEvent[]>

Retrieves the job events for a job.

JobEvent

interface JobEvent {
  id: number;
  jobId: number;
  eventType: JobEventType;
  createdAt: Date;
  metadata: any;
}

JobEventType

enum JobEventType {
  Added = 'added',
  Processing = 'processing',
  Completed = 'completed',
  Failed = 'failed',
  Cancelled = 'cancelled',
  Retried = 'retried',
  Edited = 'edited',
}

Processing Jobs

createProcessor

createProcessor(
  handlers: JobHandlers,
  options?: ProcessorOptions
): Processor

Creates a job processor with the provided handlers and options.

ProcessorOptions

interface ProcessorOptions {
  workerId?: string;
  batchSize?: number;
  concurrency?: number;
  pollInterval?: number;
  onError?: (error: Error) => void;
  verbose?: boolean;
  jobType?: string | string[];
}

Database Pool

getPool

getPool(): Pool

Returns the database pool instance.