DataQueueDataQueue

Processor

The Processor interface represents a job processor that can process jobs from the queue, either in the background or synchronously.

Creating a processor

Create a processor by calling createProcessor on the queue.

const jobQueue = getJobQueue();
const processor = queue.createProcessor(handlers, options);

ProcessorOptions

interface ProcessorOptions {
  workerId?: string;
  batchSize?: number;
  concurrency?: number;
  groupConcurrency?: number;
  pollInterval?: number;
  onError?: (error: Error) => void;
  verbose?: boolean;
  jobType?: string | string[];
}
  • groupConcurrency sets a global per-group concurrency cap across all workers/instances for jobs with group.id.
  • Must be a positive integer when provided.
  • Jobs without group.id are not affected.

Methods

startInBackground

startInBackground(): void

Start the job processor in the background. This will run continuously and process jobs as they become available. It polls for new jobs every pollInterval milliseconds (default: 5 seconds).

stop

stop(): void

Stop the job processor that runs in the background. Does not wait for in-flight jobs to finish.

stopAndDrain

stopAndDrain(timeoutMs?: number): Promise<void>

Stop the processor and wait for the current in-flight batch to finish before resolving. Accepts an optional timeout in milliseconds (default: 30000). If the batch does not complete within the timeout, the promise resolves anyway so your process is not stuck indefinitely. Useful for graceful shutdown (e.g., SIGTERM handling). See Long-Running Server for a full example.

isRunning

isRunning(): boolean

Check if the job processor is running.

start

start(): Promise<number>

Start the job processor synchronously. This will process jobs immediately and then stop. Returns the number of jobs processed.