# DataQueue — Full Documentation # Database Utility Slug: api/db-util The `createPool` function creates a PostgreSQL connection pool for use with the job queue system. > **Note:** This utility is only relevant for the **PostgreSQL backend**. If you're using the Redis backend, you don't need this function. ## Function ```ts createPool(config: PostgresJobQueueConfig['databaseConfig']): Pool ``` - `config`: The database connection configuration (connection string, host, port, database, user, password, ssl). - Returns a `Pool` instance from `pg`. ## Example ```ts import { createPool } from '@nicnocquee/dataqueue'; const pool = createPool({ host: 'localhost', port: 5432, database: 'mydb', user: 'postgres', password: 'secret', }); ``` --- # FailureReason Slug: api/failure-reason The `FailureReason` enum represents the possible reasons for a job failure. ## Enum ```ts enum FailureReason { Timeout = 'timeout', HandlerError = 'handler_error', NoHandler = 'no_handler', } ``` ## Values - `Timeout`: The job timed out. - `HandlerError`: The job handler threw an error. - `NoHandler`: The job handler was not found. --- # API Reference Slug: api This section documents the main classes, types, and functions available for managing job queues, processing jobs, and interacting with the database. ## API Surface - [JobQueue](/api/job-queue) - [JobOptions](/api/job-options) - [JobRecord](/api/job-record) - [JobEvent](/api/job-event) - [Processor](/api/processor) - [ProcessorOptions](/api/processor-options) - [Supervisor](/api/job-queue#background-supervisor) - [SupervisorOptions](/api/job-queue#supervisoroptions) - [SupervisorRunResult](/api/job-queue#supervisorrunresult) - [JobHandlers](/api/job-handlers) - [Database Utility](/api/db-util) - [Tags](/api/tags) --- # JobEvent Slug: api/job-event The `JobEvent` interface represents an event in the lifecycle of a job, such as when it is added, processed, completed, failed, cancelled, or retried. ## Fields - `id`: _number_ — Unique event ID. - `jobId`: _number_ — The job this event is associated with. - `eventType`: _JobEventType_ — The type of event (see below). - `createdAt`: _Date_ — When the event was created. - `metadata`: _any_ — Additional metadata for the event. ## JobEventType `JobEventType` is an enum of possible job event types: ```ts type JobEventType = | 'added' | 'processing' | 'completed' | 'failed' | 'cancelled' | 'retried' | 'edited' | 'prolonged'; ``` The `prolonged` event is recorded when a running job extends its timeout via `prolong()` or `onTimeout()`. See [Job Timeout](/usage/job-timeout) for details. --- # JobHandlers Slug: api/job-handlers The `JobHandlers` type defines a map of job types to their handler functions. Each handler processes a job's payload and receives an `AbortSignal` for cancellation and a `JobContext` for timeout extension. ## Type ```ts type OnTimeoutCallback = () => number | void | undefined; interface JobContext { /** Proactively reset the timeout deadline. * If ms is provided, sets deadline to ms from now. * If omitted, resets to the original timeoutMs. */ prolong: (ms?: number) => void; /** Register a callback invoked when timeout fires (before abort). * Return a number (ms) to extend, or nothing to let timeout proceed. * The callback may be called multiple times if the job keeps extending. */ onTimeout: (callback: OnTimeoutCallback) => void; } type JobHandler = ( payload: PayloadMap[T], signal: AbortSignal, ctx: JobContext, ) => Promise; // Map of job types to handlers export type JobHandlers = { [K in keyof PayloadMap]: JobHandler; }; ``` ## Example ```ts const handlers = { email: async (payload, signal) => { // send email }, generateReport: async (payload, signal, { prolong }) => { // prolong the timeout before a heavy step prolong(60_000); // generate report }, processData: async (payload, signal, { onTimeout }) => { let progress = 0; onTimeout(() => { if (progress < 100) return 30_000; // extend if still working }); // process data in chunks, updating progress }, }; ``` --- # JobOptions Slug: api/job-options The `JobOptions` interface defines the options for creating a new job in the queue. ## Fields - `jobType`: _string_ — The type of the job. - `payload`: _any_ — The payload for the job, type-safe per job type. - `maxAttempts?`: _number_ — Maximum number of attempts for this job (default: 3). - `priority?`: _number_ — Priority of the job (higher runs first, default: 0). - `runAt?`: _Date | null_ — When to run the job (default: now). - `timeoutMs?`: _number_ — Timeout for this job in milliseconds. If not set, uses the processor default or unlimited. - `forceKillOnTimeout?`: _boolean_ — If true, the job will be forcefully terminated (using Worker Threads) when timeout is reached. If false (default), the job will only receive an AbortSignal and must handle the abort gracefully. **⚠️ Runtime Requirements**: This option requires **Node.js** and will **not work** in Bun or other runtimes without worker thread support. See [Force Kill on Timeout](/usage/force-kill-timeout) for details. - `tags?`: _string[]_ — Tags for this job. Used for grouping, searching, or batch operations. - `idempotencyKey?`: _string_ — Optional idempotency key. When provided, ensures that only one job exists for a given key. If a job with the same key already exists, `addJob` returns the existing job's ID instead of creating a duplicate. See [Idempotency](/usage/add-job#idempotency) for details. ## Example ```ts const job = { jobType: 'email', payload: { to: 'user@example.com', subject: 'Hello' }, maxAttempts: 5, priority: 10, runAt: new Date(Date.now() + 60000), // run in 1 minute timeoutMs: 30000, // 30 seconds forceKillOnTimeout: false, // Use graceful shutdown (default) tags: ['welcome', 'user'], // tags for grouping/searching idempotencyKey: 'welcome-email-user-123', // prevent duplicate jobs }; ``` --- # JobQueue Slug: api/job-queue ## Initialization ### initJobQueue ```ts initJobQueue(config: JobQueueConfig): JobQueue ``` Initializes the job queue system with the provided configuration. The `JobQueueConfig` is a discriminated union -- you provide either a PostgreSQL or Redis configuration. #### PostgresJobQueueConfig Provide either `databaseConfig` (the library creates a pool) or `pool` (bring your own `pg.Pool`). At least one must be set. ```ts interface PostgresJobQueueConfig { backend?: 'postgres'; // Optional, defaults to 'postgres' databaseConfig?: { connectionString?: string; host?: string; port?: number; database?: string; user?: string; password?: string; ssl?: DatabaseSSLConfig; }; pool?: import('pg').Pool; // Bring your own pool verbose?: boolean; } ``` #### RedisJobQueueConfig Provide either `redisConfig` (the library creates an ioredis client) or `client` (bring your own). At least one must be set. ```ts interface RedisJobQueueConfig { backend: 'redis'; // Required redisConfig?: { url?: string; host?: string; port?: number; password?: string; db?: number; tls?: RedisTLSConfig; keyPrefix?: string; // Default: 'dq:' }; client?: unknown; // Bring your own ioredis client keyPrefix?: string; // Key prefix when using external client (default: 'dq:') verbose?: boolean; } ``` #### JobQueueConfig ```ts type JobQueueConfig = PostgresJobQueueConfig | RedisJobQueueConfig; ``` #### DatabaseSSLConfig ```ts interface DatabaseSSLConfig { ca?: string; cert?: string; key?: string; rejectUnauthorized?: boolean; } ``` - `ca` - Client certificate authority (CA) as PEM string or file path. If the value starts with 'file://', it will be loaded from file, otherwise treated as PEM string. - `cert` - Client certificate as PEM string or file path. If the value starts with 'file://', it will be loaded from file, otherwise treated as PEM string. - `key` - Client private key as PEM string or file path. If the value starts with 'file://', it will be loaded from file, otherwise treated as PEM string. - `rejectUnauthorized` - Whether to reject unauthorized certificates (default: true) #### RedisTLSConfig ```ts interface RedisTLSConfig { ca?: string; cert?: string; key?: string; rejectUnauthorized?: boolean; } ``` --- ## Adding Jobs ### addJob ```ts addJob(job: JobOptions, options?: AddJobOptions): Promise ``` Adds a job to the queue. Returns the job ID. #### JobOptions ```ts interface JobOptions { jobType: string; payload: any; maxAttempts?: number; priority?: number; runAt?: Date | null; timeoutMs?: number; tags?: string[]; idempotencyKey?: string; retryDelay?: number; // Base delay between retries in seconds (default: 60) retryBackoff?: boolean; // Use exponential backoff (default: true) retryDelayMax?: number; // Max delay cap in seconds (default: none) } ``` - `retryDelay` - Base delay between retries in seconds. When `retryBackoff` is true, this is the base for exponential backoff (`retryDelay * 2^attempts`). When false, retries use this fixed delay. Default: `60`. - `retryBackoff` - Whether to use exponential backoff. When true, delay doubles with each attempt and includes jitter. Default: `true`. - `retryDelayMax` - Maximum delay cap in seconds. Only meaningful when `retryBackoff` is true. No limit when omitted. #### AddJobOptions ```ts interface AddJobOptions { db?: DatabaseClient; } ``` - `db` — An external database client (e.g., a `pg.PoolClient` inside a transaction). When provided, the INSERT runs on this client instead of the internal pool. **PostgreSQL only.** Throws if used with the Redis backend. ### addJobs ```ts addJobs(jobs: JobOptions[], options?: AddJobOptions): Promise ``` Adds multiple jobs to the queue in a single operation. More efficient than calling `addJob` in a loop because it batches the INSERT into a single database round-trip (PostgreSQL) or a single atomic Lua script (Redis). Returns an array of job IDs in the same order as the input array. Each job can independently have its own `priority`, `runAt`, `tags`, `idempotencyKey`, and other options. Idempotency keys are handled per-job — duplicates resolve to the existing job's ID without creating a new row. Passing an empty array returns `[]` immediately without touching the database. ```ts const jobIds = await jobQueue.addJobs([ { jobType: 'email', payload: { to: 'a@example.com', subject: 'Hi', body: '...' }, }, { jobType: 'email', payload: { to: 'b@example.com', subject: 'Hi', body: '...' }, priority: 10, }, { jobType: 'report', payload: { reportId: '123', userId: '456' }, tags: ['monthly'], }, ]); // jobIds = [1, 2, 3] ``` The `{ db }` option works the same as `addJob` — pass a transactional client to batch-insert within an existing transaction (PostgreSQL only). #### DatabaseClient ```ts interface DatabaseClient { query( text: string, values?: any[], ): Promise<{ rows: any[]; rowCount: number | null }>; } ``` Any object matching this interface works — `pg.Pool`, `pg.PoolClient`, `pg.Client`, or ORM query runners that expose a raw `query()` method. --- ## Retrieving Jobs ### getJob ```ts getJob(id: number): Promise ``` Retrieves a job by its ID. ### getJobs ```ts getJobs( filters?: { jobType?: string; priority?: number; runAt?: Date | { gt?: Date; gte?: Date; lt?: Date; lte?: Date; eq?: Date }; tags?: { values: string[]; mode?: 'all' | 'any' | 'none' | 'exact' }; }, limit?: number, offset?: number ): Promise ``` Retrieves jobs matching the provided filters, with optional pagination. ### getJobsByStatus ```ts getJobsByStatus(status: string, limit?: number, offset?: number): Promise ``` Retrieves jobs by their status, with pagination. ### getAllJobs ```ts getAllJobs(limit?: number, offset?: number): Promise ``` Retrieves all jobs, with optional pagination. ### getJobsByTags ```ts getJobsByTags(tags: string[], mode?: TagQueryMode, limit?: number, offset?: number): Promise ``` Retrieves jobs by tag(s). --- ## Managing Jobs ### retryJob ```ts retryJob(jobId: number): Promise ``` Retries a job given its ID. ### cancelJob ```ts cancelJob(jobId: number): Promise ``` Cancels a job given its ID. ### editJob ```ts editJob(jobId: number, updates: EditJobOptions): Promise ``` Edits a pending job given its ID. Only works for jobs with status 'pending'. Silently fails for other statuses (processing, completed, failed, cancelled). #### EditJobOptions ```ts interface EditJobOptions { payload?: any; maxAttempts?: number; priority?: number; runAt?: Date | null; timeoutMs?: number; tags?: string[]; retryDelay?: number | null; retryBackoff?: boolean | null; retryDelayMax?: number | null; } ``` All fields are optional - only provided fields will be updated. Note that `jobType` cannot be changed. Set retry fields to `null` to revert to legacy default behavior. #### Example ```ts // Edit a pending job's payload and priority await jobQueue.editJob(jobId, { payload: { to: 'newemail@example.com', subject: 'Updated' }, priority: 10, }); // Edit only the scheduled run time await jobQueue.editJob(jobId, { runAt: new Date(Date.now() + 60000), // Run in 1 minute }); // Edit multiple fields at once await jobQueue.editJob(jobId, { payload: { to: 'updated@example.com' }, priority: 5, maxAttempts: 10, timeoutMs: 30000, tags: ['urgent', 'priority'], }); ``` ### editAllPendingJobs ```ts editAllPendingJobs( filters?: { jobType?: string; priority?: number; runAt?: Date | { gt?: Date; gte?: Date; lt?: Date; lte?: Date; eq?: Date }; tags?: { values: string[]; mode?: 'all' | 'any' | 'none' | 'exact' }; }, updates: EditJobOptions ): Promise ``` Edits all pending jobs that match the filters. Only works for jobs with status 'pending'. Non-pending jobs are not affected. Returns the number of jobs that were edited. #### Parameters - `filters` (optional): Filters to select which jobs to edit. If not provided, all pending jobs are edited. - `jobType`: Filter by job type - `priority`: Filter by priority - `runAt`: Filter by scheduled run time (supports `gt`, `gte`, `lt`, `lte`, `eq` operators or exact Date match) - `tags`: Filter by tags with mode ('all', 'any', 'none', 'exact') - `updates`: The fields to update (same as `EditJobOptions`). All fields are optional - only provided fields will be updated. #### Returns The number of jobs that were successfully edited. #### Examples ```ts // Edit all pending jobs const editedCount = await jobQueue.editAllPendingJobs(undefined, { priority: 10, }); // Edit all pending email jobs const editedCount = await jobQueue.editAllPendingJobs( { jobType: 'email' }, { priority: 5, }, ); // Edit all pending jobs with 'urgent' tag const editedCount = await jobQueue.editAllPendingJobs( { tags: { values: ['urgent'], mode: 'any' } }, { priority: 10, maxAttempts: 5, }, ); // Edit all pending jobs scheduled in the future const editedCount = await jobQueue.editAllPendingJobs( { runAt: { gte: new Date() } }, { priority: 10, }, ); // Edit with combined filters const editedCount = await jobQueue.editAllPendingJobs( { jobType: 'email', tags: { values: ['urgent'], mode: 'any' }, }, { priority: 10, maxAttempts: 5, }, ); ``` **Note:** Only pending jobs are edited. Jobs with other statuses (processing, completed, failed, cancelled) are not affected. Edit events are recorded for each affected job, just like single job edits. ### cancelAllUpcomingJobs ```ts cancelAllUpcomingJobs(filters?: { jobType?: string; priority?: number; runAt?: Date | { gt?: Date; gte?: Date; lt?: Date; lte?: Date; eq?: Date }; tags?: { values: string[]; mode?: 'all' | 'any' | 'none' | 'exact' }; }): Promise ``` Cancels all upcoming jobs that match the filters. Returns the number of jobs cancelled. ### cleanupOldJobs ```ts cleanupOldJobs(daysToKeep?: number): Promise ``` Cleans up jobs older than the specified number of days. Returns the number of jobs removed. ### reclaimStuckJobs ```ts reclaimStuckJobs(maxProcessingTimeMinutes?: number): Promise ``` Reclaims jobs stuck in 'processing' for too long. Returns the number of jobs reclaimed. If a job has a `timeoutMs` that is longer than the `maxProcessingTimeMinutes` threshold, the job's own timeout is used instead, preventing premature reclamation of long-running jobs. --- ## Job Events ### getJobEvents ```ts getJobEvents(jobId: number): Promise ``` Retrieves the job events for a job. #### JobEvent ```ts interface JobEvent { id: number; jobId: number; eventType: JobEventType; createdAt: Date; metadata: any; } ``` #### JobEventType ```ts enum JobEventType { Added = 'added', Processing = 'processing', Completed = 'completed', Failed = 'failed', Cancelled = 'cancelled', Retried = 'retried', Edited = 'edited', } ``` --- ## Event Hooks DataQueue emits real-time events for job lifecycle transitions. Register listeners using `on`, `once`, `off`, and `removeAllListeners`. Works identically with both PostgreSQL and Redis backends. ### QueueEventMap ```ts interface QueueEventMap { 'job:added': { jobId: number; jobType: string }; 'job:processing': { jobId: number; jobType: string }; 'job:completed': { jobId: number; jobType: string }; 'job:failed': { jobId: number; jobType: string; error: Error; willRetry: boolean; }; 'job:cancelled': { jobId: number }; 'job:retried': { jobId: number }; 'job:waiting': { jobId: number; jobType: string }; 'job:progress': { jobId: number; progress: number }; error: Error; } ``` ### on ```ts on(event: QueueEventName, listener: (data) => void): void ``` Register a listener that fires every time the event is emitted. ### once ```ts once(event: QueueEventName, listener: (data) => void): void ``` Register a one-time listener that auto-removes after the first invocation. ### off ```ts off(event: QueueEventName, listener: (data) => void): void ``` Remove a previously registered listener. Pass the exact function reference used with `on` or `once`. ### removeAllListeners ```ts removeAllListeners(event?: QueueEventName): void ``` Remove all listeners for a specific event, or all listeners for all events when called without arguments. See [Event Hooks](/usage/event-hooks) for detailed usage examples. --- ## Processing Jobs ### createProcessor ```ts createProcessor( handlers: JobHandlers, options?: ProcessorOptions ): Processor ``` Creates a job processor with the provided handlers and options. #### ProcessorOptions ```ts interface ProcessorOptions { workerId?: string; batchSize?: number; concurrency?: number; pollInterval?: number; onError?: (error: Error) => void; verbose?: boolean; jobType?: string | string[]; } ``` --- ## Background Supervisor ### createSupervisor ```ts createSupervisor(options?: SupervisorOptions): Supervisor ``` Creates a background supervisor that automatically runs maintenance tasks on a configurable interval: reclaiming stuck jobs, cleaning up old completed jobs/events, and expiring timed-out waitpoint tokens. #### SupervisorOptions ```ts interface SupervisorOptions { intervalMs?: number; // default: 60000 stuckJobsTimeoutMinutes?: number; // default: 10 cleanupJobsDaysToKeep?: number; // default: 30 (0 to disable) cleanupEventsDaysToKeep?: number; // default: 30 (0 to disable) cleanupBatchSize?: number; // default: 1000 reclaimStuckJobs?: boolean; // default: true expireTimedOutTokens?: boolean; // default: true onError?: (error: Error) => void; // default: console.error verbose?: boolean; } ``` #### Supervisor ```ts interface Supervisor { start(): Promise; startInBackground(): void; stop(): void; stopAndDrain(timeoutMs?: number): Promise; isRunning(): boolean; } ``` - `start()` runs all tasks once and returns the results (serverless-friendly). - `startInBackground()` starts a background loop that runs every `intervalMs`. - `stopAndDrain()` stops the loop and waits for the current run to finish. #### SupervisorRunResult ```ts interface SupervisorRunResult { reclaimedJobs: number; cleanedUpJobs: number; cleanedUpEvents: number; expiredTokens: number; } ``` See [Long-Running Server](/usage/long-running-server#background-supervisor) for usage examples. --- ## Accessing the Underlying Client ### getPool ```ts getPool(): Pool ``` Returns the PostgreSQL connection pool instance. Only available when using the PostgreSQL backend. > **Note:** Throws an error if called when using the Redis backend. ### getRedisClient ```ts getRedisClient(): Redis ``` Returns the `ioredis` client instance. Only available when using the Redis backend. > **Note:** Throws an error if called when using the PostgreSQL backend. --- # JobRecord Slug: api/job-record The `JobRecord` interface represents a job stored in the queue, including its status, attempts, and metadata. ## Fields - `id`: _number_ — Unique job ID. - `jobType`: _string_ — The type of the job. - `payload`: _any_ — The job payload. - `status`: _'pending' | 'processing' | 'completed' | 'failed' | 'cancelled'_ — Current job status. - `createdAt`: _Date_ — When the job was created. - `updated_at`: _Date_ — When the job was last updated. - `locked_at`: _Date | null_ — When the job was locked for processing. - `locked_by`: _string | null_ — Worker that locked the job. - `attempts`: _number_ — Number of attempts so far. - `maxAttempts`: _number_ — Maximum allowed attempts. - `nextAttemptAt`: _Date | null_ — When the next attempt is scheduled. - `priority`: _number_ — Job priority. - `runAt`: _Date_ — When the job is scheduled to run. - `pendingReason?`: _string | null_ — Reason for pending status. - `errorHistory?`: _\{ message: string; timestamp: string \}[]_ — Error history for the job. - `timeoutMs?`: _number | null_ — Timeout for this job in milliseconds. - `failureReason?`: _FailureReason | null_ — Reason for last failure, if any. - `completedAt`: _Date | null_ — When the job was completed. - `startedAt`: _Date | null_ — When the job was first picked up for processing. - `lastRetriedAt`: _Date | null_ — When the job was last retried. - `lastFailedAt`: _Date | null_ — When the job last failed. - `lastCancelledAt`: _Date | null_ — When the job was last cancelled. - `tags?`: _string[]_ — Tags for this job. Used for grouping, searching, or batch operations. - `idempotencyKey?`: _string | null_ — The idempotency key for this job, if one was provided when the job was created. - `progress?`: _number | null_ — Progress percentage (0–100) reported by the handler via `ctx.setProgress()`. `null` if no progress has been reported. See [Progress Tracking](/usage/progress-tracking). - `output?`: _unknown_ — Handler output stored via `ctx.setOutput(data)` or by returning a value from the handler. `null` if no output has been stored. See [Job Output](/usage/job-output). ## Example ```json { "id": 1, "jobType": "email", "payload": { "to": "user@example.com", "subject": "Hello" }, "status": "completed", "createdAt": "2024-06-01T12:00:00Z", "tags": ["welcome", "user"], "idempotencyKey": "welcome-email-user-123", "progress": 100, "output": { "messageId": "abc-123", "sentAt": "2024-06-01T12:00:05Z" } } ``` --- # Processor Slug: api/processor The `Processor` interface represents a job processor that can process jobs from the queue, either in the background or synchronously. ## Creating a processor Create a processor by calling `createProcessor` on the queue. ```ts const jobQueue = getJobQueue(); const processor = queue.createProcessor(handlers, options); ``` ### ProcessorOptions ```ts interface ProcessorOptions { workerId?: string; batchSize?: number; concurrency?: number; pollInterval?: number; onError?: (error: Error) => void; verbose?: boolean; jobType?: string | string[]; } ``` ## Methods ### startInBackground ```ts startInBackground(): void ``` Start the job processor in the background. This will run continuously and process jobs as they become available. It polls for new jobs every `pollInterval` milliseconds (default: 5 seconds). ### stop ```ts stop(): void ``` Stop the job processor that runs in the background. Does not wait for in-flight jobs to finish. ### stopAndDrain ```ts stopAndDrain(timeoutMs?: number): Promise ``` Stop the processor and wait for the current in-flight batch to finish before resolving. Accepts an optional timeout in milliseconds (default: `30000`). If the batch does not complete within the timeout, the promise resolves anyway so your process is not stuck indefinitely. Useful for graceful shutdown (e.g., SIGTERM handling). See [Long-Running Server](/usage/long-running-server) for a full example. ### isRunning ```ts isRunning(): boolean ``` Check if the job processor is running. ### start ```ts start(): Promise ``` Start the job processor synchronously. This will process jobs immediately and then stop. Returns the number of jobs processed. --- # Tags Slug: api/tags The tags feature lets you group, search, and batch jobs using arbitrary string tags. Tags can be set when adding a job and used in various JobQueue methods. ## Tags in JobOptions You can assign tags to a job when adding it: ```typescript await jobQueue.addJob({ jobType: 'email', payload: { to: 'user@example.com', subject: 'Hello' }, tags: ['welcome', 'user'], }); ``` ## Tags in JobRecord The `tags` field is available on JobRecord objects: ```json { "id": 1, "jobType": "email", "tags": ["welcome", "user"] } ``` ## Tag Query Methods ### getJobsByTags ```typescript const jobs = await jobQueue.getJobsByTags(['welcome', 'user'], 'all'); ``` ### Cancel jobs by tags You can cancel jobs by their tags using the `cancelAllUpcomingJobs` method with the `tags` filter (an object with `values` and `mode`): ```typescript // Cancel all jobs with both 'welcome' and 'user' tags await jobQueue.cancelAllUpcomingJobs({ tags: { values: ['welcome', 'user'], mode: 'all' }, }); // Cancel all jobs with any of the tags await jobQueue.cancelAllUpcomingJobs({ tags: { values: ['foo', 'bar'], mode: 'any' }, }); // Cancel all jobs with exactly the given tags await jobQueue.cancelAllUpcomingJobs({ tags: { values: ['foo', 'bar'], mode: 'exact' }, }); // Cancel all jobs with none of the given tags await jobQueue.cancelAllUpcomingJobs({ tags: { values: ['foo', 'bar'], mode: 'none' }, }); ``` ## TagQueryMode The `mode` parameter controls how tags are matched: - `'exact'`: Jobs with exactly the same tags (no more, no less) - `'all'`: Jobs that have all the given tags (can have more) - `'any'`: Jobs that have at least one of the given tags - `'none'`: Jobs that have none of the given tags The default mode is `'all'`. --- # CLI > Command-line tools for managing DataQueue migrations, project scaffolding, and AI integrations. Slug: cli DataQueue ships a CLI tool called `dataqueue-cli` that you can run directly with `npx`: ```bash npx dataqueue-cli [options] ``` ## Commands | Command | Description | | --------------------------------------- | ------------------------------------------------- | | [`migrate`](/cli/migrate) | Run PostgreSQL database migrations | | [`init`](/cli/init) | Scaffold a Next.js project for DataQueue | | [`install-skills`](/cli/install-skills) | Install AI skill files for coding assistants | | [`install-rules`](/cli/install-rules) | Install agent rule sets for AI clients | | [`install-mcp`](/cli/install-mcp) | Configure the DataQueue MCP server for AI clients | | [`mcp`](/cli/mcp) | Start the DataQueue MCP server over stdio | ## Usage Running `dataqueue-cli` without a command (or with an unrecognized command) prints the help output: ``` Usage: dataqueue-cli migrate [--envPath ] [-s | --schema ] dataqueue-cli init dataqueue-cli install-skills dataqueue-cli install-rules dataqueue-cli install-mcp dataqueue-cli mcp Options for migrate: --envPath Path to a .env file to load environment variables -s, --schema Set the schema to use AI tooling commands: install-skills Install DataQueue skill files for AI assistants install-rules Install DataQueue agent rules for AI clients install-mcp Configure the DataQueue MCP server for AI clients mcp Start the DataQueue MCP server (stdio) ``` --- # init > Scaffold a Next.js project for DataQueue with a single command. Slug: cli/init Scaffolds your Next.js project with everything needed to start using DataQueue — API routes, a job queue singleton, a cron script, and all required dependencies. ```bash npx dataqueue-cli init ``` ## What It Does The `init` command auto-detects your project structure (App Router vs Pages Router, `src/` directory vs root) and creates the following: ### Files Created | File | Purpose | | ----------------------------------------------- | ------------------------------------------------------ | | `app/api/dataqueue/manage/[[...task]]/route.ts` | API route for queue management (App Router) | | `pages/api/dataqueue/manage/[[...task]].ts` | API route for queue management (Pages Router) | | `lib/dataqueue/queue.ts` | Job queue singleton with a sample `send_email` handler | | `cron.sh` | Shell script for local development cron jobs | > **Note:** Only the API route matching your detected router is created. Existing files are never overwritten. ### Dependencies Added **Production:** - `@nicnocquee/dataqueue` - `@nicnocquee/dataqueue-dashboard` - `@nicnocquee/dataqueue-react` **Development:** - `dotenv-cli` - `ts-node` - `node-pg-migrate` ### Scripts Added | Script | Command | | ------------------- | ----------------------------------------------- | | `cron` | `bash cron.sh` | | `migrate-dataqueue` | `dotenv -e .env.local -- dataqueue-cli migrate` | ## After Running 1. Install the newly added dependencies: ```bash npm install ``` 2. Set up your environment variables in `.env.local`: ```bash PG_DATAQUEUE_DATABASE=postgresql://user:password@localhost:5432/mydb CRON_SECRET=your-secret-here ``` 3. Run database migrations: ```bash npm run migrate-dataqueue ``` 4. Start your Next.js dev server and the cron script: ```bash npm run dev npm run cron ``` ## Requirements - Must be run in a Next.js project directory (looks for `next` in `package.json` dependencies) - Must have either an `app/` or `pages/` directory --- # install-mcp > Configure the DataQueue MCP server for AI coding clients. Slug: cli/install-mcp Configures the DataQueue [MCP](https://modelcontextprotocol.io/) (Model Context Protocol) server in your AI client's configuration. This gives your AI assistant direct access to DataQueue documentation. ```bash npx dataqueue-cli install-mcp ``` ## Interactive Prompt The command prompts you to select your AI client: ``` DataQueue MCP Server Installer Select your AI client: 1) Cursor 2) Claude Code 3) VS Code (Copilot) 4) Windsurf Enter choice (1-4): ``` ## What It Configures The installer adds a `"dataqueue"` server entry to your client's MCP config file: ```json { "mcpServers": { "dataqueue": { "command": "npx", "args": ["dataqueue-cli", "mcp"] } } } ``` If the config file already exists, the `dataqueue` entry is merged in without affecting other servers. ## Install Locations | Client | Config File | | ----------------- | ------------------------------------- | | Cursor | `.cursor/mcp.json` | | Claude Code | `.mcp.json` | | VS Code (Copilot) | `.vscode/mcp.json` | | Windsurf | `~/.codeium/windsurf/mcp_config.json` | > **Note:** After installing, your AI client will automatically start the MCP server when needed. See the [`mcp`](/cli/mcp) command for details on what the server exposes. --- # install-rules > Install DataQueue agent rules for AI coding clients. Slug: cli/install-rules Installs comprehensive DataQueue rule sets into your AI client's configuration. Rules give AI assistants detailed guidance for generating correct DataQueue code. ```bash npx dataqueue-cli install-rules ``` ## Interactive Prompt The command prompts you to select your AI client: ``` DataQueue Agent Rules Installer Select your AI client: 1) Cursor 2) Claude Code 3) AGENTS.md (Codex, Jules, OpenCode) 4) GitHub Copilot 5) Windsurf Enter choice (1-5): ``` ## Rules Installed Three rule files are installed, covering the full surface area of DataQueue: | Rule File | What It Covers | | -------------------- | ------------------------------------------------------------- | | `basic.md` | Core API — initialization, adding jobs, processing, handlers | | `advanced.md` | Advanced features — waits, cron, tokens, cancellation, events | | `react-dashboard.md` | React SDK and Dashboard components | ## Install Locations | Client | Installs To | | -------------- | -------------------------------------------------------------------------------------------------------------------------- | | Cursor | `.cursor/rules/dataqueue-basic.mdc`, `.cursor/rules/dataqueue-advanced.mdc`, `.cursor/rules/dataqueue-react-dashboard.mdc` | | Claude Code | `CLAUDE.md` (appended between markers) | | AGENTS.md | `AGENTS.md` (appended between markers) | | GitHub Copilot | `.github/copilot-instructions.md` (appended between markers) | | Windsurf | `CONVENTIONS.md` (appended between markers) | > **Note:** For Cursor, each rule file is written separately. For all other clients, the rules are combined and appended to a single file between `<!-- DATAQUEUE RULES START -->` and `<!-- DATAQUEUE RULES END -->` markers. Re-running the command updates the content between the markers without duplicating it. --- # install-skills > Install DataQueue skill files for AI coding assistants. Slug: cli/install-skills Copies DataQueue skill files (`SKILL.md`) into your AI coding assistant's skills directory. Skills teach AI assistants DataQueue patterns and best practices. ```bash npx dataqueue-cli install-skills ``` ## Skills Installed | Skill | What It Covers | | -------------------- | ------------------------------------------------------------------ | | `dataqueue-core` | Core patterns — initialization, adding jobs, processing, handlers | | `dataqueue-advanced` | Advanced features — waits, cron jobs, tokens, cancellation, events | | `dataqueue-react` | React SDK and Dashboard integration | ## Auto-Detection The command automatically detects which AI tools are present by checking for their config directories: | AI Tool | Detected By | Skills Installed To | | -------------- | ----------- | ------------------- | | Cursor | `.cursor/` | `.cursor/skills/` | | Claude Code | `.claude/` | `.claude/skills/` | | GitHub Copilot | `.github/` | `.github/skills/` | If no AI tool directories are detected, it defaults to `.cursor/skills/`. ## Example Output ``` Installing skills for Cursor... ✓ dataqueue-core ✓ dataqueue-advanced ✓ dataqueue-react Done! Installed 3 skill(s) for Cursor. ``` --- # mcp > Start the DataQueue MCP server for AI-powered documentation access. Slug: cli/mcp Starts the DataQueue MCP (Model Context Protocol) server over stdio. This server gives AI coding assistants live access to the full DataQueue documentation. ```bash npx dataqueue-cli mcp ``` > **Note:** You typically don't run this command directly. Use [`install-mcp`](/cli/install-mcp) to configure your AI client, which will start the server automatically. ## Tools Exposed The MCP server exposes three tools that AI assistants can call: | Tool | Description | | ---------------- | ------------------------------------------------------------------------------ | | `list-doc-pages` | Lists all available documentation pages with titles and descriptions | | `get-doc-page` | Fetches a specific page by slug (e.g., `"usage/add-job"` or `"api/job-queue"`) | | `search-docs` | Full-text search across all documentation pages with term matching | ## Resources The server also exposes a resource: | URI | Description | | ---------------------- | ------------------------------------------------------- | | `dataqueue://llms.txt` | Machine-readable DataQueue overview for LLM consumption | ## How It Works The server loads a bundled `docs-content.json` file containing all DataQueue documentation pages. Search uses simple term matching across page titles, descriptions, and content, returning the top 5 results with relevant excerpts. Communication happens over stdio using the [Model Context Protocol](https://modelcontextprotocol.io/), so it works with any MCP-compatible client. --- # migrate > Run PostgreSQL database migrations for DataQueue. Slug: cli/migrate Runs the DataQueue database migrations against your PostgreSQL database using [node-pg-migrate](https://github.com/salsita/node-pg-migrate). ```bash npx dataqueue-cli migrate [options] ``` ## Options | Option | Description | | ----------------------- | ------------------------------------------------------------------------------ | | `--envPath ` | Path to a `.env` file to load environment variables from | | `-s, --schema ` | PostgreSQL schema to use. Automatically creates the schema if it doesn't exist | ## Environment Variables The migration reads the connection string from the `PG_DATAQUEUE_DATABASE` environment variable. You can set it directly or load it from a `.env` file using `--envPath`. ## Examples Run migrations using environment variables already set in your shell: ```bash npx dataqueue-cli migrate ``` Load environment variables from a specific `.env` file: ```bash npx dataqueue-cli migrate --envPath .env.local ``` Run migrations in a custom PostgreSQL schema: ```bash npx dataqueue-cli migrate --schema my_schema ``` Combine both options: ```bash npx dataqueue-cli migrate --envPath .env.local --schema my_schema ``` ## How It Works Under the hood, `dataqueue-cli migrate` runs: ```bash npx node-pg-migrate up \ -t dataqueuedev_migrations \ -d PG_DATAQUEUE_DATABASE \ -m \ [--envPath ] \ [-s --create-schema] ``` The migrations directory is bundled with the `@nicnocquee/dataqueue` package, so you don't need to manage migration files yourself. > **Note:** This command is only needed for the PostgreSQL backend. The Redis backend requires no migrations. --- # Next.js Demo App Slug: example You can see a working example of a Next.js app using DataQueue [here](https://dataqueue-demo.netlify.app/) and the code in [apps/demo](https://github.com/nicnocquee/dataqueue/tree/main/apps/demo) folder of this repository. --- # DataQueue Docs > Documentation for DataQueue, a lightweight job queue for Node.js/TypeScript projects, backed by PostgreSQL or Redis. Slug: index Welcome to the DataQueue docs! Start from the [Introduction](/intro) to learn more about DataQueue. DataQueue supports **PostgreSQL** and **Redis (beta)** as storage backends. Choose the one that fits your stack -- the API is identical regardless of which backend you use. ## Packages | Package | Description | | --------------------------------- | --------------------------------------------------------------------------------------- | | `@nicnocquee/dataqueue` | Core job queue library (PostgreSQL & Redis (beta)) | | `@nicnocquee/dataqueue-react` | [React hooks](/usage/react-sdk) for subscribing to job status and progress | | `@nicnocquee/dataqueue-dashboard` | [Admin dashboard](/usage/dashboard) — plug-and-play UI for monitoring and managing jobs | ## Source Code The source code for DataQueue is available on [GitHub](https://github.com/nicnocquee/dataqueue). ## Demo App A demo Next.js app that showcases all features of DataQueue is available [here](https://dataqueue-demo.netlify.app) and the source code is available on [GitHub](https://github.com/nicnocquee/dataqueue/tree/main/apps/demo). --- # Comparison > How DataQueue compares to BullMQ and Trigger.dev Slug: intro/comparison Choosing a job queue depends on your stack, infrastructure preferences, and the features you need. Here is a side-by-side comparison of **DataQueue**, **BullMQ**, and **Trigger.dev**. | Feature | DataQueue | BullMQ | Trigger.dev | | ----------------------- | ----------------------------------------------- | ------------------------------------------- | --------------------------------------- | | **Backend** | PostgreSQL or Redis | Redis only | Cloud or self-hosted (Postgres + Redis) | | **Type Safety** | Full generic `PayloadMap` | Basic types | Full TypeScript tasks | | **Scheduling** | `runAt`, Cron | Cron, delayed, recurring | Cron, delayed | | **Retries** | Exponential backoff, configurable `maxAttempts` | Exponential backoff, custom strategies, DLQ | Auto retries, bulk replay, DLQ | | **Priority** | Integer priority | Priority levels | Queue-based priority | | **Concurrency Control** | `batchSize` + `concurrency` | Built-in | Per-task + shared limits | | **Rate Limiting** | - | Yes | Via concurrency limits | | **Job Flows / DAGs** | - | Parent-child flows | Workflows | | **Dashboard** | Built-in Next.js package | Third-party (Bull Board, etc.) | Built-in web dashboard | | **Wait / Pause Jobs** | `waitFor`, `waitUntil`, token system | - | Durable execution | | **Human-in-the-Loop** | Token system | - | Yes | | **Progress Tracking** | Yes (0-100%) | Yes | Yes (realtime) | | **Serverless-First** | Yes | No (needs long-running process) | Yes (cloud) | | **Self-Hosted** | Yes | Yes (your Redis) | Yes (containers) | | **Cloud Option** | - | - | Yes | | **License** | MIT | MIT | Apache-2.0 | | **Pricing** | Free (OSS) | Free (OSS) | Free tier + paid plans | | **Infrastructure** | Your own Postgres or Redis | Your own Redis | Their cloud or your infra | ## Where DataQueue shines - **Serverless-first** — designed from the ground up for Vercel, AWS Lambda, and other serverless platforms. No long-running process required. - **Use your existing database** — back your queue with PostgreSQL or Redis. No additional infrastructure to provision or pay for. - **Wait and token system** — pause jobs with `waitFor`, `waitUntil`, or token-based waits for human-in-the-loop workflows, all within a single handler function. - **Type-safe PayloadMap** — a generic `PayloadMap` gives you compile-time validation of every job type and its payload, catching bugs before they reach production. - **Built-in Next.js dashboard** — add a full admin UI to your Next.js app with a single route file. No separate service to deploy. --- # About Slug: intro DataQueue is an open source lightweight job queue for Node.js/TypeScript projects, backed by **PostgreSQL** or **Redis**. It lets you easily schedule, process, and manage background jobs. It's ideal for serverless environments like Vercel, AWS Lambda, and more. ## Features - Simple API for adding and processing jobs - Strong typing for job types and payloads, preventing you from adding jobs with the wrong payload and ensuring handlers receive the correct type - Works in serverless environments - Supports job priorities, scheduling, canceling, and retries - Reclaims stuck jobs: No job will remain in the `processing` state indefinitely - Cleans up old jobs: Keeps only jobs from the last xxx days - **Choose your backend**: Use PostgreSQL or Redis -- same API, same features, your choice ## Who is this for? This package is for you if all of the following apply: | | | | --- | ---------------------------------------------------------------------------------------------- | | ☁️ | You deploy web apps to serverless platforms like Vercel, AWS Lambda, etc. | | 📝 | You use TypeScript | | ⚡ | You want your app to stay fast and responsive by offloading heavy tasks to the background | | 💾 | You use PostgreSQL or Redis | | 💸 | You're on a budget and want to avoid paying for a job queue service or running your own server | ## Backend Options ### PostgreSQL If you already use PostgreSQL, it makes sense to use it for job queues, thanks to [SKIP LOCKED](https://www.postgresql.org/docs/current/sql-select.html). The update process in DataQueue uses `FOR UPDATE SKIP LOCKED` to avoid race conditions and improve performance. If two jobs are scheduled at the same time, one will skip any jobs that are already being processed and work on other available jobs instead. This lets multiple workers handle different jobs at once without waiting or causing conflicts, making PostgreSQL a great choice for job queues and similar tasks. The PostgreSQL backend requires running [database migrations](/usage/database-migration) before use. ### Redis If you already have a Redis instance in your stack, you can use it as the backend instead. The Redis backend uses Lua scripts for atomic operations and sorted sets for priority-based job claiming, providing the same guarantees as the PostgreSQL backend. The Redis backend requires **no migrations** -- it automatically creates the necessary keys when jobs are added. > **Note:** Both backends provide **full feature parity**. Tags, filters, idempotency keys, job events, priority ordering, scheduling, and all other features work identically regardless of which backend you choose. You can switch backends at any time by changing a single configuration option. --- # Installation Slug: intro/install Before you begin, make sure you have Node.js and TypeScript installed in your environment. ## PostgreSQL Backend If you're using PostgreSQL as your backend, you need a Postgres database. Install the required libraries: ```bash npm install @nicnocquee/dataqueue npm install -D node-pg-migrate ts-node ``` You need to install node-pg-migrate and ts-node as development dependencies to run [database migrations](/usage/database-migration). ## Redis Backend If you're using Redis as your backend, you need a Redis server (v6+). Install the required libraries: ```bash npm install @nicnocquee/dataqueue ioredis ``` > **Note:** The `ioredis` package is an optional peer dependency. You only need to install it if you choose Redis as your backend. No database migrations are required for Redis. --- # Overview Slug: intro/overview DataQueue is a lightweight library that helps you manage your job queue using a **PostgreSQL** or **Redis** backend. It has three main components: the processor, the queue, and the job. It is not an external tool or service. You install DataQueue in your project and use it to add jobs to the queue, process them, and more, using your own existing database. ## Processor The processor has these responsibilities: - retrieve a certain number of unclaimed, pending jobs from the database - run the defined job handlers for each job - update the job status accordingly - retry failed jobs In a serverless environment, you can initiate and start the processor for example in an API route and use cron job to periodically call it. In a long running process environment, you can start the processor when your application starts, and it will periodically check for jobs to process. For more information, see [Processor](/api/processor). ## Queue The queue is an abstraction over the database. It has these responsibilities: - add jobs to the database - retrieve jobs from the database - cancel pending jobs - edit pending jobs The API is identical whether you're using PostgreSQL or Redis (beta) as your backend. You select the backend when [initializing the queue](/usage/init-queue). For more information, see [Queue](/api/queue). ## Job A job that you add to the queue needs to have a type and a payload. The type is a string that identifies the job, and the payload is the data that will be passed to the job handler of that job type. Once a job is added to the queue, it can be in one of these states: - `pending`: The job is waiting in the queue to be processed. - `processing`: The job is currently being worked on. - `completed`: The job finished successfully. - `failed`: The job did not finish successfully. It can be retried up to `maxAttempts` times. - `cancelled`: The job was cancelled before it finished. For more information, see [Job](/api/job). --- # Add Job Slug: usage/add-job You can add jobs to the queue from your application logic, such as in a [server function](https://react.dev/reference/rsc/server-functions): ```typescript title="@/app/actions/send-email.ts" 'use server'; import { getJobQueue } from '@/lib/queue'; import { revalidatePath } from 'next/cache'; export const sendEmail = async ({ name, email, }: { name: string; email: string; }) => { // Add a welcome email job const jobQueue = getJobQueue();try { const runAt = new Date(Date.now() + 5 * 1000); // Run 5 seconds from nowconst job = await jobQueue.addJob({ jobType: 'send_email', payload: { to: email, subject: 'Welcome to our platform!', body: `Hi ${name}, welcome to our platform!`, }, priority: 10, // Higher number = higher priority runAt: runAt, tags: ['welcome', 'user'], // Add tags for grouping/searching }); revalidatePath('/'); return { job }; } catch (error) { console.error('Error adding job:', error); throw error; } }; ``` In the example above, a job is added to the queue to send an email. The job type is `send_email`, and the payload includes the recipient's email, subject, and body. When adding a job, you can set its `priority`, schedule when it should run using `runAt`, and specify a timeout in milliseconds with `timeoutMs`. You can also add `tags` (an array of strings) to group, search, or batch jobs by category. See [Tags](/api/tags) for more details. ## Batch Insert When you need to enqueue many jobs at once, use `addJobs` instead of calling `addJob` in a loop. It batches the inserts into a single database round-trip (PostgreSQL) or a single atomic Lua script (Redis), which is significantly faster. ```typescript title="@/app/actions/send-bulk.ts" 'use server'; import { getJobQueue } from '@/lib/queue'; export const sendBulkEmails = async ( recipients: { email: string; name: string }[], ) => { const jobQueue = getJobQueue();const jobIds = await jobQueue.addJobs( recipients.map((r) => ({ jobType: 'send_email' as const, payload: { to: r.email, subject: 'Newsletter', body: `Hi ${r.name}, here's your update!`, }, tags: ['newsletter'], })), ); // jobIds[i] corresponds to recipients[i] return { jobIds }; }; ``` `addJobs` returns an array of job IDs in the **same order** as the input array. Each job can independently have its own `priority`, `runAt`, `tags`, `idempotencyKey`, and other options. - **Empty array**: `addJobs([])` returns `[]` immediately without touching the database. - **Idempotency**: Each job's `idempotencyKey` is handled independently. Duplicate keys resolve to the existing job's ID. - **Transactional**: The `{ db }` option works with `addJobs` the same way as `addJob` (PostgreSQL only). ## Idempotency You can provide an `idempotencyKey` when adding a job to prevent duplicate jobs. If a job with the same key already exists in the queue, `addJob` returns the existing job's ID instead of creating a new one. This is useful for preventing duplicates caused by retries, double-clicks, webhook replays, or serverless function re-invocations. ```typescript title="@/app/actions/send-welcome.ts" 'use server'; import { getJobQueue } from '@/lib/queue'; export const sendWelcomeEmail = async (userId: string, email: string) => { const jobQueue = getJobQueue();const jobId = await jobQueue.addJob({ jobType: 'send_email', payload: { to: email, subject: 'Welcome!', body: `Welcome to our platform!`, }, idempotencyKey: `welcome-email-${userId}`, // prevents duplicate welcome emails }); return { jobId }; }; ``` In the example above, calling `sendWelcomeEmail` multiple times for the same `userId` will only create one job. Subsequent calls return the existing job's ID. ### Behavior - **No key provided**: Works exactly as before, no uniqueness check is performed. - **Key provided, no conflict**: The job is inserted and its new ID is returned. - **Key provided, conflict**: The existing job's ID is returned. The existing job is **not** updated. - **Scope**: The key is unique across the entire `job_queue` table regardless of job status. Once a key exists, it cannot be reused until the job is cleaned up via [`cleanupOldJobs`](/usage/cleanup-jobs). ## Transactional Job Creation > **Note:** Transactional job creation is only available with the **PostgreSQL** backend. You can insert a job within an existing database transaction by passing an external database client via the `db` option. This guarantees that the job is enqueued **atomically** with your other database writes — if the transaction rolls back, the job is never enqueued. This is useful when you need to ensure that a job is only created when a related database operation succeeds (e.g., creating a user and enqueuing a welcome email in the same transaction). ```typescript title="@/app/actions/register.ts" 'use server'; import { Pool } from 'pg'; import { getJobQueue } from '@/lib/queue'; const pool = new Pool({ connectionString: process.env.DATABASE_URL }); export const registerUser = async (email: string, name: string) => { const client = await pool.connect(); try { await client.query('BEGIN'); // Insert the user await client.query('INSERT INTO users (email, name) VALUES ($1, $2)', [ email, name, ]); // Enqueue the welcome email in the same transactionconst jobQueue = getJobQueue(); await jobQueue.addJob( { jobType: 'send_email', payload: { to: email, subject: 'Welcome!', body: `Hi ${name}!` }, }, { db: client }, // Use the transaction client ); await client.query('COMMIT'); } catch (error) { await client.query('ROLLBACK'); throw error; } finally { client.release(); } }; ``` ### How it works - When `db` is provided, the `INSERT` into the `job_queue` table and the associated job event are both executed on the supplied client. - The library does **not** call `client.release()` — you are responsible for managing the client lifecycle. - If the transaction is rolled back, both the job and its event are discarded. - When `db` is **not** provided, `addJob` behaves exactly as before (gets a connection from the internal pool). --- # Building with AI > Tools and resources for building DataQueue projects with AI coding assistants. Slug: usage/building-with-ai We provide multiple tools to help AI coding assistants write correct DataQueue code. Use one or all of them for the best developer experience. ## Quick Setup ### 1. Install Skills Portable instruction sets that teach any AI coding assistant DataQueue best practices. ```bash npx dataqueue-cli install-skills ``` Skills are installed as `SKILL.md` files into your AI tool's skills directory (`.cursor/skills/`, `.claude/skills/`, etc.). They cover core patterns, advanced features (waits, cron, tokens), and React/Dashboard integration. ### 2. Install Agent Rules Comprehensive rule sets installed directly into your AI client's config files. ```bash npx dataqueue-cli install-rules ``` The installer prompts you to choose your AI client and writes rules to the appropriate location: | Client | Installs to | | -------------- | --------------------------------- | | Cursor | `.cursor/rules/dataqueue-*.mdc` | | Claude Code | `CLAUDE.md` | | AGENTS.md | `AGENTS.md` | | GitHub Copilot | `.github/copilot-instructions.md` | | Windsurf | `CONVENTIONS.md` | ### 3. Install MCP Server Give your AI assistant direct access to DataQueue documentation — search docs, fetch specific pages, and list all available topics. ```bash npx dataqueue-cli install-mcp ``` The installer prompts you to choose your AI client and writes the MCP config to the appropriate location. Currently supported clients: | Client | Installs to | | ----------------- | ------------------------------------- | | Cursor | `.cursor/mcp.json` | | Claude Code | `.mcp.json` | | VS Code (Copilot) | `.vscode/mcp.json` | | Windsurf | `~/.codeium/windsurf/mcp_config.json` | The MCP server runs via `npx dataqueue-cli mcp` and communicates over stdio. It exposes three tools: | Tool | Description | | ---------------- | ---------------------------------------- | | `search-docs` | Full-text search across all doc pages | | `get-doc-page` | Fetch a specific doc page by slug | | `list-doc-pages` | List all available doc pages with titles | ## Skills vs Agent Rules vs MCP | | **Skills** | **Agent Rules** | **MCP Server** | | :---------------- | :----------------------------------- | :------------------------------------- | :------------------------------------- | | **What it does** | Drops skill files into your project | Installs rule sets into client config | Runs a live server your AI connects to | | **Installs to** | `.cursor/skills/`, `.claude/skills/` | `.cursor/rules/`, `CLAUDE.md`, etc. | `.cursor/mcp.json`, `.mcp.json`, etc. | | **Best for** | Teaching patterns and best practices | Comprehensive code generation guidance | Live documentation search | | **Works offline** | Yes | Yes | Yes (runs locally) | **Recommendation:** Install all three. Skills and Agent Rules teach your AI _how_ to write code. The MCP Server lets it _look up_ the docs when it needs specifics. ## llms.txt We publish machine-readable documentation for LLM consumption: - [docs.dataqueue.dev/llms.txt](https://docs.dataqueue.dev/llms.txt) — concise overview - [docs.dataqueue.dev/llms-full.txt](https://docs.dataqueue.dev/llms-full.txt) — full documentation These follow the [llms.txt standard](https://llmstxt.org) and can be fed directly into any LLM context window. ## Project-Level Context Snippet If you prefer a lightweight approach, paste this snippet into a context file at the root of your project: | File | Read by | | :-------------------------------- | :---------------------------- | | `CLAUDE.md` | Claude Code | | `AGENTS.md` | OpenAI Codex, Jules, OpenCode | | `.cursor/rules/*.md` | Cursor | | `.github/copilot-instructions.md` | GitHub Copilot | | `CONVENTIONS.md` | Windsurf, Cline, and others | ```markdown # DataQueue rules ## Imports Always import from `@nicnocquee/dataqueue`. ## PayloadMap pattern Define a type map of job types to payload shapes for full type safety: \`\`\`ts type JobPayloadMap = { send_email: { to: string; subject: string; body: string }; }; \`\`\` ## Initialization (singleton) Never call initJobQueue per request — use a module-level singleton: \`\`\`ts import { initJobQueue } from '@nicnocquee/dataqueue'; let queue: ReturnType> | null = null; export const getJobQueue = () => { if (!queue) { queue = initJobQueue({ databaseConfig: { connectionString: process.env.PG_DATAQUEUE_DATABASE }, }); } return queue; }; \`\`\` ## Handler pattern Type handlers as `JobHandlers` — TypeScript enforces a handler for every job type. ## Processing - Serverless: `processor.start()` (one-shot) - Long-running: `processor.startInBackground()` + `stopAndDrain()` on SIGTERM ## Common mistakes 1. Creating initJobQueue per request (creates a DB pool each time) 2. Missing handler for a job type (fails with NoHandler) 3. Not checking signal.aborted in long handlers 4. Forgetting reclaimStuckJobs() — crashed workers leave jobs stuck 5. Skipping migrations (PostgreSQL requires `dataqueue-cli migrate`) ``` --- # Cancel Jobs Slug: usage/cancel-jobs You can cancel a job by its ID, but only if it is still pending (not yet started or scheduled for the future). ```typescript title="@/app/api/cancel-job/route.ts" import { NextRequest, NextResponse } from 'next/server'; import { getJobQueue } from '@/lib/queue'; export async function POST(request: NextRequest) { try { const { jobId } = await request.json();const jobQueue = getJobQueue(); await jobQueue.cancelJob(jobId); return NextResponse.json({ message: 'Job cancelled' }); } catch (error) { console.error('Error cancelling job:', error); return NextResponse.json( { message: 'Failed to cancel job' }, { status: 500 }, ); } } ``` ### Cancel All Pending Jobs DataQueue also lets you cancel all pending jobs at once. This is useful if you want to stop all jobs that haven't started yet or are scheduled for the future. ```typescript title="@/app/api/cancel-all-jobs/route.ts" import { NextRequest, NextResponse } from 'next/server'; import { getJobQueue } from '@/lib/queue'; export async function POST(request: NextRequest) { try {const jobQueue = getJobQueue(); const cancelledCount = await jobQueue.cancelAllUpcomingJobs(); return NextResponse.json({ message: `Cancelled ${cancelledCount} jobs` }); } catch (error) { console.error('Error cancelling jobs:', error); return NextResponse.json( { message: 'Failed to cancel jobs' }, { status: 500 }, ); } } ``` #### Cancel Jobs by Filter You can also cancel only the pending jobs that match certain criteria: ```typescript // Cancel only email jobs await jobQueue.cancelAllUpcomingJobs({ jobType: 'email' }); // Cancel only jobs with priority 2 await jobQueue.cancelAllUpcomingJobs({ priority: 2 }); // Cancel only jobs scheduled for a specific time (exact match) const runAt = new Date('2024-06-01T12:00:00Z'); await jobQueue.cancelAllUpcomingJobs({ runAt }); // Cancel jobs scheduled after a certain time await jobQueue.cancelAllUpcomingJobs({ runAt: { gt: new Date('2024-06-01T12:00:00Z') }, }); // Cancel jobs scheduled on or after a certain time await jobQueue.cancelAllUpcomingJobs({ runAt: { gte: new Date('2024-06-01T12:00:00Z') }, }); // Cancel jobs scheduled before a certain time await jobQueue.cancelAllUpcomingJobs({ runAt: { lt: new Date('2024-06-01T12:00:00Z') }, }); // Cancel jobs scheduled on or before a certain time await jobQueue.cancelAllUpcomingJobs({ runAt: { lte: new Date('2024-06-01T12:00:00Z') }, }); // Cancel jobs scheduled exactly at a certain time await jobQueue.cancelAllUpcomingJobs({ runAt: { eq: new Date('2024-06-01T12:00:00Z') }, }); // Cancel jobs scheduled between two times (inclusive) await jobQueue.cancelAllUpcomingJobs({ runAt: { gte: new Date('2024-06-01T00:00:00Z'), lte: new Date('2024-06-01T23:59:59Z'), }, }); // Combine runAt with other filters await jobQueue.cancelAllUpcomingJobs({ jobType: 'email', runAt: { gt: new Date('2024-06-01T12:00:00Z') }, }); // Cancel all jobs with both 'welcome' and 'user' tags. The jobs can have other tags. await jobQueue.cancelAllUpcomingJobs({ tags: { values: ['welcome', 'user'], mode: 'all' }, }); // Cancel all jobs with any of the tags. The jobs can have other tags. await jobQueue.cancelAllUpcomingJobs({ tags: { values: ['foo', 'bar'], mode: 'any' }, }); // Cancel all jobs with exactly the given tags. The jobs cannot have other tags. await jobQueue.cancelAllUpcomingJobs({ tags: { values: ['foo', 'bar'], mode: 'exact' }, }); // Cancel all jobs with none of the given tags await jobQueue.cancelAllUpcomingJobs({ tags: { values: ['foo', 'bar'], mode: 'none' }, }); // Combine filters await jobQueue.cancelAllUpcomingJobs({ jobType: 'email', tags: { values: ['welcome', 'user'], mode: 'all' }, runAt: { lt: new Date('2024-06-01T12:00:00Z') }, }); ``` **runAt filter details:** - You can pass a single `Date` for an exact match, or an object with any of the following keys: - `gt`: Greater than - `gte`: Greater than or equal to - `lt`: Less than - `lte`: Less than or equal to - `eq`: Equal to - All filters (`jobType`, `priority`, `runAt`, `tags`) can be combined for precise cancellation. This will set the status of all jobs that are still pending (not yet started or scheduled for the future) and match the filters to `cancelled`. --- # Cleanup Jobs Slug: usage/cleanup-jobs > **Note:** Running a long-lived server? Use [`createSupervisor()`](/usage/long-running-server#background-supervisor) to automate job cleanup instead of calling `cleanupOldJobs` manually. If you have a lot of jobs, you may want to clean up old ones—for example, keeping only jobs from the last 30 days. You can do this by calling the `cleanupOldJobs` method. The example below shows an API route (`/api/cron/cleanup`) that can be triggered by a cron job: ```typescript title="@/app/api/cron/cleanup.ts" import { getJobQueue } from '@/lib/queue'; import { NextResponse } from 'next/server'; export async function GET(request: Request) { const authHeader = request.headers.get('authorization'); if (authHeader !== `Bearer ${process.env.CRON_SECRET}`) { return NextResponse.json({ message: 'Unauthorized' }, { status: 401 }); } try {const jobQueue = getJobQueue(); // Clean up old jobs (keep only the last 30 days) const deleted = await jobQueue.cleanupOldJobs(30); console.log(`Deleted ${deleted} old jobs`); return NextResponse.json({ message: 'Old jobs cleaned up', deleted, }); } catch (error) { console.error('Error cleaning up jobs:', error); return NextResponse.json( { message: 'Failed to clean up jobs' }, { status: 500 }, ); } } ``` #### Scheduling the Cleanup Job with Cron Add the following to your `vercel.json` to call the cleanup route every day at midnight: ```json title="vercel.json" { "crons": [ { "path": "/api/cron/cleanup", "schedule": "0 0 * * *" } ] } ``` --- # Cron Jobs (Recurring Schedules) > Define recurring jobs that automatically enqueue on a cron schedule. Slug: usage/cron-jobs DataQueue supports recurring cron schedules. Define a schedule with a cron expression, and the processor will **automatically enqueue** job instances before each batch — no extra code required. ## Add a Cron Schedule ```typescript title="@/app/api/cron-schedules/route.ts" import { NextRequest, NextResponse } from 'next/server'; import { getJobQueue } from '@/lib/queue'; export async function POST(request: NextRequest) { const jobQueue = getJobQueue();const id = await jobQueue.addCronJob({ scheduleName: 'daily-report', // must be unique! cronExpression: '0 9 * * *', // every day at 9:00 AM jobType: 'generate_report', payload: { reportId: 'daily', userId: 'system' }, timezone: 'America/New_York', // default: 'UTC' }); return NextResponse.json({ id }); } ``` ### Options | Option | Type | Default | Description | | ---------------- | ---------- | ---------- | -------------------------------------------------- | | `scheduleName` | `string` | _required_ | Unique name for the schedule | | `cronExpression` | `string` | _required_ | Standard 5-field cron expression | | `jobType` | `string` | _required_ | Job type from your PayloadMap | | `payload` | `object` | _required_ | Payload for each job instance | | `timezone` | `string` | `'UTC'` | IANA timezone for cron evaluation | | `allowOverlap` | `boolean` | `false` | Allow new instance while previous is still running | | `maxAttempts` | `number` | `3` | Max retry attempts per job instance | | `priority` | `number` | `0` | Priority for each job instance | | `timeoutMs` | `number` | — | Timeout per job instance | | `tags` | `string[]` | — | Tags for each job instance | ## Automatic Enqueueing When you call `processor.start()` or `processor.startInBackground()`, DataQueue automatically checks all active cron schedules and enqueues jobs whose next run time has passed — **before** processing the batch. ```typescript title="@/app/api/cron/process/route.ts" import { NextRequest, NextResponse } from 'next/server'; import { getJobQueue, jobHandlers } from '@/lib/queue'; export async function GET(request: NextRequest) { const authHeader = request.headers.get('authorization'); if (authHeader !== `Bearer ${process.env.CRON_SECRET}`) { return NextResponse.json({ error: 'Unauthorized' }, { status: 401 }); } const jobQueue = getJobQueue();// Cron jobs are automatically enqueued before each batch const processor = jobQueue.createProcessor(jobHandlers, { batchSize: 10, concurrency: 3, }); const processed = await processor.start(); return NextResponse.json({ processed }); } ``` ### Vercel Cron Example ```json title="vercel.json" { "crons": [ { "path": "/api/cron/process", "schedule": "* * * * *" } ] } ``` ### Manual Trigger If you need to enqueue due cron jobs outside the processor (e.g., in tests or one-off scripts), you can still call `enqueueDueCronJobs()` directly: ```typescript const enqueued = await jobQueue.enqueueDueCronJobs(); ``` ## Overlap Protection By default, `allowOverlap` is `false`. This means if a previous job instance from the same schedule is still **pending**, **processing**, or **waiting**, a new instance will **not** be enqueued — even if the cron expression says it's time. ```typescript // Allow overlapping instances (e.g., for idempotent jobs) await jobQueue.addCronJob({ scheduleName: 'heartbeat', cronExpression: '* * * * *', jobType: 'send_email', payload: { to: 'admin@example.com', subject: 'heartbeat', body: 'ping' },allowOverlap: true, }); ``` ## Manage Schedules ### Pause and Resume ```typescript // Pause — skipped during automatic enqueueing await jobQueue.pauseCronJob(scheduleId); // Resume await jobQueue.resumeCronJob(scheduleId); ``` ### Edit a Schedule ```typescript await jobQueue.editCronJob(scheduleId, { cronExpression: '0 */2 * * *', // change to every 2 hours payload: { reportId: 'bi-hourly', userId: 'system' }, }); ``` When `cronExpression` or `timezone` changes, `nextRunAt` is automatically recalculated. ### Remove a Schedule ```typescript // Deletes the schedule definition. Already-enqueued jobs are not cancelled. await jobQueue.removeCronJob(scheduleId); ``` ### List and Query ```typescript // List all schedules const all = await jobQueue.listCronJobs(); // List only active / paused const active = await jobQueue.listCronJobs('active'); const paused = await jobQueue.listCronJobs('paused'); // Get by ID or name const byId = await jobQueue.getCronJob(id); const byName = await jobQueue.getCronJobByName('daily-report'); ``` ## Database Migration The cron feature requires the `cron_schedules` table. Run the DataQueue migrations to create it: ```bash npx dataqueue-cli migrate up ``` If you're already using DataQueue, just run migrations again — the new table will be added alongside existing ones. --- # Dashboard > Plug-and-play admin dashboard for monitoring and managing jobs Slug: usage/dashboard The `@nicnocquee/dataqueue-dashboard` package provides a self-contained admin dashboard that you can add to any Next.js application with a single file. It lets you view jobs, inspect details, manually trigger processing, and cancel or retry jobs. ## Installation ```bash npm install @nicnocquee/dataqueue-dashboard ``` ## Setup (Next.js) Create a single catch-all route file in your app: ```typescript title="app/admin/dataqueue/[[...path]]/route.ts" import { createDataqueueDashboard } from '@nicnocquee/dataqueue-dashboard/next'; import { getJobQueue, jobHandlers } from '@/lib/queue'; const { GET, POST } = createDataqueueDashboard({ jobQueue: getJobQueue(), jobHandlers, basePath: '/admin/dataqueue', }); export { GET, POST }; ``` That's it. Visit `/admin/dataqueue` to open the dashboard. > **Note:** The `basePath` must match the directory where you placed the route file. If you put it at `app/jobs/dashboard/[[...path]]/route.ts`, use `basePath: '/jobs/dashboard'`. ## Features ### Jobs List The main page shows all jobs in a table with: - **Status filter tabs** — All, Pending, Processing, Completed, Failed, Cancelled, Waiting - **Pagination** — Navigate through pages of jobs - **Auto-refresh** — Toggle automatic polling every 3 seconds - **Inline actions** — Cancel pending/waiting jobs or retry failed/cancelled jobs directly from the table ### Job Detail Click any job ID to see the full detail view: - **Properties** — Status, type, priority, attempts, all timestamps, tags, progress bar - **Payload** — Formatted JSON display of the job's payload - **Error History** — All errors with timestamps (if the job has failed) - **Step Data** — Completed step results for jobs using `ctx.run()` (if any) - **Events Timeline** — Chronological history of all job events ### Process Jobs The **Process Jobs** button in the header triggers one-shot job processing. It creates a temporary processor, runs a single batch, and returns the count of jobs processed. This is useful for: - Debugging job handlers during development - Manually processing jobs in environments without a background worker - Testing job behavior before deploying a cron-based processor ## Configuration ### DashboardConfig ```typescript interface DashboardConfig { /** The initialized JobQueue instance. */ jobQueue: JobQueue; /** Job handlers used when triggering processing from the dashboard. */ jobHandlers: JobHandlers; /** Base path where the dashboard is mounted (e.g., '/admin/dataqueue'). */ basePath: string; /** Options for the processor when manually triggering processing. */ processorOptions?: { batchSize?: number; // default: 10 concurrency?: number; pollInterval?: number; onError?: (error: Error) => void; verbose?: boolean; jobType?: string | string[]; }; } ``` ### Customizing the Processor Pass `processorOptions` to control how jobs are processed when using the "Process Jobs" button: ```typescript title="app/admin/dataqueue/[[...path]]/route.ts" const { GET, POST } = createDataqueueDashboard({ jobQueue: getJobQueue(), jobHandlers, basePath: '/admin/dataqueue', processorOptions: { batchSize: 5, concurrency: 2, verbose: true, }, }); ``` ## Protecting the Dashboard Since you own the route file, you can protect the dashboard with any authentication strategy your app already uses. ### Using Next.js Middleware ```typescript title="middleware.ts" import { NextResponse } from 'next/server'; import type { NextRequest } from 'next/server'; export function middleware(request: NextRequest) { if (request.nextUrl.pathname.startsWith('/admin/dataqueue')) { const session = request.cookies.get('session'); if (!session) { return NextResponse.redirect(new URL('/login', request.url)); } } return NextResponse.next(); } ``` ### Wrapping the Handler ```typescript title="app/admin/dataqueue/[[...path]]/route.ts" import { createDataqueueDashboard } from '@nicnocquee/dataqueue-dashboard/next'; import { getJobQueue, jobHandlers } from '@/lib/queue'; import { auth } from '@/lib/auth'; const dashboard = createDataqueueDashboard({ jobQueue: getJobQueue(), jobHandlers, basePath: '/admin/dataqueue', }); export async function GET(req: Request, ctx: any) { const session = await auth(); if (!session?.user?.isAdmin) { return new Response('Unauthorized', { status: 401 }); } return dashboard.GET(req, ctx); } export async function POST(req: Request, ctx: any) { const session = await auth(); if (!session?.user?.isAdmin) { return new Response('Unauthorized', { status: 401 }); } return dashboard.POST(req, ctx); } ``` ## API Endpoints The dashboard exposes these API endpoints under the configured `basePath`: | Method | Path | Description | | ------ | ---------------------- | ------------------------------------------------------------------------ | | GET | `/` | Dashboard HTML page | | GET | `/api/jobs` | List jobs (supports `status`, `jobType`, `limit`, `offset` query params) | | GET | `/api/jobs/:id` | Get a single job | | GET | `/api/jobs/:id/events` | Get job event history | | POST | `/api/jobs/:id/cancel` | Cancel a pending or waiting job | | POST | `/api/jobs/:id/retry` | Retry a failed or cancelled job | | POST | `/api/process` | Trigger one-shot job processing | ## Architecture The package is designed with future framework support in mind. The core logic uses Web Standard `Request` and `Response` objects, making it framework-agnostic. The Next.js adapter is a thin wrapper that maps App Router conventions to the core handler: ``` @nicnocquee/dataqueue-dashboard ├── core/ Framework-agnostic handlers (Web Request/Response) ├── next.ts Next.js App Router adapter └── index.ts Type exports ``` Adding support for other frameworks (Express, Hono, etc.) would only require a new adapter file — the core handlers and dashboard UI remain unchanged. --- # Database Migration Slug: usage/database-migration > **Note:** Database migrations are **only required for the PostgreSQL backend**. If you're using the Redis backend, you can skip this page entirely -- Redis requires no schema setup. After installing the package, add the following script to your `package.json` to apply [the migrations](https://github.com/nicnocquee/dataqueue/tree/main/packages/dataqueue/migrations): ```json title="package.json" "scripts": { "migrate-dataqueue": "dataqueue-cli migrate" } ``` Next, run this command to apply the migrations: ```bash npm run migrate-dataqueue ``` This will apply all the necessary schema migrations so your Postgres database is ready to use with DataQueue. > **Note:** **Make sure the `PG_DATAQUEUE_DATABASE` environment variable is set to your Postgres connection string.** The CLI uses this environment variable to connect to your database. For example: ```dotenv PG_DATAQUEUE_DATABASE=postgresql://postgres:password@localhost:5432/my_database ``` > **Note:** **You must run these migrations before using the job queue.** For example, if you are deploying your app to Vercel, run this command before deploying in the Vercel's pipeline. If you have used Prisma or other ORMs, you may be familiar with this process. ### Using a custom .env file You can use the `--envPath` option to specify a custom path to your environment file. For example: ```bash npm run migrate-dataqueue -- --envPath .env.local ``` This will load environment variables from `.env.local` before running the migration. ### Schema selection You can explicitly set the schema for migrations using the `-s` or `--schema` CLI option. This option is passed directly to `node-pg-migrate` and will ensure the schema is created if it does not exist. **Example CLI usage with explicit schema:** ```bash npm run migrate-dataqueue -- --envPath .env.local --schema dataqueue ``` > **Note:** Specifying the schema is optional but **recommended** when you're using the same database as your main application. If you don't specify the schema, the CLI will use the default schema which is `public`. If you use [Prisma](https://www.prisma.io), the prisma migration will fail because of the additional tables added by DataQueue. > **Note:** You have to use the `--schema` option even when `PG_DATAQUEUE_DATABASE` contains the schema name in `search_path`. ### Other options You can pass other options supported by `node-pg-migrate` to the migration command. For example: ```bash npm run migrate-dataqueue -- --envPath .env.local --schema dataqueue --verbose ``` For more information, see the [node-pg-migrate documentation](https://salsita.github.io/node-pg-migrate/cli). ### Running migrations with SSL and a custom CA Most managed Postgres providers (like DigitalOcean, Supabase, etc.) require SSL connections and provide a CA certificate (`.crt` file). You can use the CA certificate to validate the server's identity. In order to successfully run the migration with custom CA, you must set the `NODE_EXTRA_CA_CERTS` environment variable to the path of your CA certificate. This tells Node.js to trust your provider's CA for outgoing TLS connections, including Postgres. ```bash NODE_EXTRA_CA_CERTS=/absolute/path/to/ca.crt \ PG_DATAQUEUE_DATABASE=your_connection_string \ npm run migrate-dataqueue ``` #### Migration without Certificate Validation For convenience, you can run the migration without certificate validation by adding the `--no-reject-unauthorized` flag to the command. ```bash npm run migrate-dataqueue -- --no-reject-unauthorized ``` #### Using a CA certificate in environments where you cannot upload files In some serverless or cloud environments (like Vercel, AWS Lambda, etc.), you cannot upload files directly, but you still need Node.js to trust your managed Postgres provider's CA certificate. In this case, you can store the CA certificate as an environment variable and write it to a temporary file in your pipeline shell script before running the migration. 1. **Store the PEM content as an environment variable** - Copy the full contents of your `.crt` file into a new environment variable, e.g. `PGSSLROOTCERT_CONTENT`. - Make sure your environment supports multi-line secrets. 2. **Write the CA certificate to a file and set NODE_EXTRA_CA_CERTS in your pipeline script** ```sh # Write the CA cert to a file printf "%s" "$PGSSLROOTCERT_CONTENT" > /tmp/ca.crt # Set NODE_EXTRA_CA_CERTS and run the migration NODE_EXTRA_CA_CERTS=/tmp/ca.crt npm run migrate-dataqueue ``` --- # Edit Jobs Slug: usage/edit-jobs You can edit a pending job by its ID to update its properties before it is processed. Only jobs with status 'pending' can be edited. Attempting to edit a job with any other status (processing, completed, failed, cancelled) will silently fail. ## Basic Usage ```typescript title="@/app/api/edit-job/route.ts" import { NextRequest, NextResponse } from 'next/server'; import { getJobQueue } from '@/lib/queue'; export async function POST(request: NextRequest) { try { const { jobId, updates } = await request.json();const jobQueue = getJobQueue(); await jobQueue.editJob(jobId, updates); return NextResponse.json({ message: 'Job updated' }); } catch (error) { console.error('Error editing job:', error); return NextResponse.json( { message: 'Failed to edit job' }, { status: 500 }, ); } } ``` ## Editable Fields All fields in `EditJobOptions` are optional - only the fields you provide will be updated. The following fields can be edited: - `payload` - The job payload data - `priority` - Job priority (higher runs first) - `maxAttempts` - Maximum number of attempts - `runAt` - When to run the job (Date or null) - `timeoutMs` - Timeout for the job in milliseconds - `tags` - Tags for grouping, searching, or batch operations **Note:** `jobType` cannot be changed. If you need to change the job type, you should cancel the job and create a new one. ## Examples ### Edit Payload ```typescript // Update the payload of a pending job await jobQueue.editJob(jobId, { payload: { to: 'newemail@example.com', subject: 'Updated Subject' }, }); ``` ### Edit Priority ```typescript // Increase the priority of a job await jobQueue.editJob(jobId, { priority: 10, }); ``` ### Edit Scheduled Time ```typescript // Reschedule a job to run in 1 hour await jobQueue.editJob(jobId, { runAt: new Date(Date.now() + 60 * 60 * 1000), }); // Schedule a job to run immediately (or as soon as possible) await jobQueue.editJob(jobId, { runAt: null, }); ``` ### Edit Multiple Fields ```typescript // Update multiple fields at once await jobQueue.editJob(jobId, { payload: { to: 'updated@example.com', subject: 'New Subject' }, priority: 5, maxAttempts: 10, timeoutMs: 30000, tags: ['urgent', 'priority'], }); ``` ### Partial Updates ```typescript // Only update what you need - other fields remain unchanged await jobQueue.editJob(jobId, { priority: 10, // payload, maxAttempts, runAt, timeoutMs, and tags remain unchanged }); ``` ### Clear Tags or Timeout ```typescript // Remove tags by setting to undefined await jobQueue.editJob(jobId, { tags: undefined, }); // Remove timeout by setting to undefined await jobQueue.editJob(jobId, { timeoutMs: undefined, }); ``` ## Batch Editing You can edit multiple pending jobs at once using `editAllPendingJobs`. This is useful when you need to update many jobs that match certain criteria. The function returns the number of jobs that were edited. ### Basic Batch Edit ```typescript // Edit all pending jobs const editedCount = await jobQueue.editAllPendingJobs(undefined, { priority: 10, }); console.log(`Edited ${editedCount} jobs`); ``` ### Filter by Job Type ```typescript // Edit all pending email jobs const editedCount = await jobQueue.editAllPendingJobs( { jobType: 'email' }, { priority: 5, }, ); ``` ### Filter by Priority ```typescript // Edit all pending jobs with priority 1 const editedCount = await jobQueue.editAllPendingJobs( { priority: 1 }, { priority: 5, }, ); ``` ### Filter by Tags ```typescript // Edit all pending jobs with 'urgent' tag const editedCount = await jobQueue.editAllPendingJobs( { tags: { values: ['urgent'], mode: 'any' } }, { priority: 10, }, ); ``` ### Filter by Scheduled Time ```typescript // Edit all pending jobs scheduled in the future const editedCount = await jobQueue.editAllPendingJobs( { runAt: { gte: new Date() } }, { priority: 10, }, ); // Edit all pending jobs scheduled before a specific date const editedCount = await jobQueue.editAllPendingJobs( { runAt: { lt: new Date('2024-12-31') } }, { priority: 5, }, ); ``` ### Combined Filters ```typescript // Edit all pending email jobs with 'urgent' tag const editedCount = await jobQueue.editAllPendingJobs( { jobType: 'email', tags: { values: ['urgent'], mode: 'any' }, }, { priority: 10, maxAttempts: 5, }, ); ``` ### Batch Edit Notes - Only pending jobs are edited. Jobs with other statuses (processing, completed, failed, cancelled) are not affected. - The function returns the number of jobs that were successfully edited. - Edit events are recorded for each affected job, just like single job edits. - If no fields are provided in the updates object, the function returns 0 and no jobs are modified. ## When to Use Edit vs Cancel vs Retry - **Edit**: Use when you want to modify a pending job's properties before it runs - **Cancel**: Use when you want to completely remove a pending job from the queue - **Retry**: Use when you want to retry a failed job (sets status back to pending) ## Error Handling The `editJob` function silently fails if you try to edit a non-pending job. This means: - No error is thrown - The job remains unchanged - The operation completes successfully (but does nothing) To check if an edit was successful, you can: ```typescript const job = await jobQueue.getJob(jobId); if (job?.status === 'pending') { // Job is still pending, edit might have succeeded // Check if the fields you wanted to update actually changed if (job.priority === newPriority) { console.log('Edit successful'); } } else { console.log('Job is not pending, edit was ignored'); } ``` ## Event Tracking When a job is edited, an 'edited' event is recorded in the job's event history. The event metadata contains the fields that were updated: ```typescript const events = await jobQueue.getJobEvents(jobId); const editEvent = events.find((e) => e.eventType === 'edited'); if (editEvent) { console.log('Updated fields:', editEvent.metadata); // { payload: {...}, priority: 10, ... } } ``` ## Best Practices 1. **Check job status before editing**: If you're unsure whether a job is pending, check its status first: ```typescript const job = await jobQueue.getJob(jobId); if (job?.status === 'pending') { await jobQueue.editJob(jobId, updates); } else { console.log('Job is not pending, cannot edit'); } ``` 2. **Use partial updates**: Only update the fields you need to change. This is more efficient and reduces the chance of accidentally overwriting other fields. 3. **Validate updates**: Ensure the updated values are valid for your job handlers. For example, if your handler expects a specific payload structure, make sure the updated payload matches. 4. **Consider race conditions**: If a job might be picked up for processing while you're editing it, be aware that the edit might not take effect if the job transitions to 'processing' status between your check and the edit operation. 5. **Monitor events**: Use job events to track when and what was edited for audit purposes. --- # Event Hooks Slug: usage/event-hooks DataQueue emits real-time events for job lifecycle transitions, progress updates, and internal errors. Use event hooks to integrate with logging, metrics, alerting, or any custom logic without polling. Event hooks work identically with both the PostgreSQL and Redis backends. ## Listening for Events Register listeners with `on()`, `once()`, or remove them with `off()` and `removeAllListeners()`. ```typescript const queue = initJobQueue(config); queue.on('job:completed', (event) => { console.log(`Job ${event.jobId} (${event.jobType}) completed`); }); queue.on('job:failed', (event) => { console.error(`Job ${event.jobId} failed: ${event.error.message}`); if (!event.willRetry) { alertOps(`Permanent failure for job ${event.jobId}`); } }); queue.on('error', (error) => { logger.error('Queue internal error:', error); }); ``` ## Available Events | Event | Payload | When | | ---------------- | -------------------------------------- | --------------------------------------------------------------------------------- | | `job:added` | `{ jobId, jobType }` | After `addJob()` or `addJobs()` | | `job:processing` | `{ jobId, jobType }` | When a processor claims and starts a job | | `job:completed` | `{ jobId, jobType }` | When a handler completes successfully | | `job:failed` | `{ jobId, jobType, error, willRetry }` | When a handler throws or times out | | `job:cancelled` | `{ jobId }` | After `cancelJob()` | | `job:retried` | `{ jobId }` | After `retryJob()` | | `job:waiting` | `{ jobId, jobType }` | When a handler enters a wait (`ctx.waitFor`, `ctx.waitUntil`, `ctx.waitForToken`) | | `job:progress` | `{ jobId, progress }` | When a handler calls `ctx.setProgress()` | | `error` | `Error` | Internal errors from the processor or supervisor | ## One-Time Listeners Use `once()` when you only need to react to the first occurrence of an event. ```typescript queue.once('job:added', (event) => { console.log('First job added:', event.jobId); }); ``` ## Removing Listeners ```typescript const listener = (event) => console.log(event); queue.on('job:completed', listener); // Remove a specific listener queue.off('job:completed', listener); // Remove all listeners for one event queue.removeAllListeners('job:completed'); // Remove all listeners for all events queue.removeAllListeners(); ``` ## Error Monitoring The `error` event fires for internal errors in the processor and supervisor. It works alongside the existing `onError` callback in `ProcessorOptions` and `SupervisorOptions` -- both fire independently. ```typescript queue.on('error', (error) => { Sentry.captureException(error); }); // onError still works as before const processor = queue.createProcessor(handlers, { onError: (error) => console.error('Processor error:', error), }); ``` ## Failure Retry Detection The `job:failed` event includes a `willRetry` boolean that tells you whether the job will be retried automatically. ```typescript queue.on('job:failed', (event) => { if (event.willRetry) { metrics.increment('job.retry', { jobType: event.jobType }); } else { metrics.increment('job.permanent_failure', { jobType: event.jobType }); pagerDuty.alert(`Job ${event.jobId} permanently failed`); } }); ``` ## Progress Tracking The `job:progress` event fires whenever a handler calls `ctx.setProgress()`, giving you real-time progress updates. ```typescript queue.on('job:progress', (event) => { websocket.broadcast(`job:${event.jobId}`, { progress: event.progress }); }); ``` > **Note:** Events are emitted synchronously after the corresponding database operation completes. Slow event listeners will delay the return of methods like `addJob()` or the processing of the next job. Use async patterns in listeners if they perform I/O. --- # Failed Jobs Slug: usage/failed-jobs A job handler can fail for many reasons, such as a bug in the code or running out of resources. When a job fails, it is marked as `failed` and retried up to `maxAttempts` times (default: 3). You can view the error history for a job in its `errorHistory` field. ## Retry configuration You can control the retry behavior per-job using three options: | Option | Type | Default | Description | | --------------- | --------- | ------- | ---------------------------------------------- | | `retryDelay` | `number` | `60` | Base delay between retries in **seconds** | | `retryBackoff` | `boolean` | `true` | Use exponential backoff (doubles each attempt) | | `retryDelayMax` | `number` | _none_ | Maximum cap for the delay in **seconds** | ### Fixed delay Set `retryBackoff: false` to use a constant delay between retries: ```ts await jobQueue.addJob({ jobType: 'email', payload: { to: 'user@example.com' }, maxAttempts: 5, retryDelay: 30, // 30 seconds between each retry retryBackoff: false, }); ``` Every retry will wait exactly 30 seconds. ### Exponential backoff (default) When `retryBackoff` is `true` (the default), the delay doubles with each attempt. A small amount of random jitter is added to prevent thundering herd problems: ```ts await jobQueue.addJob({ jobType: 'email', payload: { to: 'user@example.com' }, maxAttempts: 5, retryDelay: 10, // base: 10 seconds retryBackoff: true, // enabled by default }); ``` This produces approximate delays of 10s, 20s, 40s, 80s, ... (with jitter). ### Capping the delay Use `retryDelayMax` to prevent the delay from growing unbounded: ```ts await jobQueue.addJob({ jobType: 'email', payload: { to: 'user@example.com' }, maxAttempts: 10, retryDelay: 5, retryBackoff: true, retryDelayMax: 300, // never wait more than 5 minutes }); ``` Delays: ~5s, ~10s, ~20s, ~40s, ~80s, ~160s, ~300s, ~300s, ... ### Default behavior If none of the retry options are set, the legacy formula `2^attempts * 1 minute` is used. This means the first retry is after ~2 minutes, then ~4 minutes, then ~8 minutes, and so on. ## Jitter When exponential backoff is enabled, each computed delay is multiplied by a random factor between 0.5 and 1.0. This prevents multiple failed jobs from retrying at exactly the same time, which could overload downstream services. ## Cron schedules Retry configuration can also be set on cron schedules. Every job enqueued by the schedule inherits the retry settings: ```ts await jobQueue.addCronJob({ scheduleName: 'daily-report', cronExpression: '0 9 * * *', jobType: 'report', payload: { type: 'daily' }, retryDelay: 60, retryBackoff: true, retryDelayMax: 600, }); ``` ## Editing retry config You can update the retry configuration of a pending job: ```ts await jobQueue.editJob(jobId, { retryDelay: 15, retryBackoff: false, }); ``` --- # Force Kill on Timeout Slug: usage/force-kill-timeout When you set `forceKillOnTimeout: true` on a job, the handler will be forcefully terminated (using Worker Threads) when the timeout is reached, rather than just receiving an AbortSignal. ## Runtime Requirements **⚠️ IMPORTANT**: `forceKillOnTimeout` requires **Node.js** and uses the `worker_threads` module. It will **not work** in Bun or other runtimes that don't support Node.js worker threads. - ✅ **Node.js**: Fully supported (Node.js v10.5.0+) - ❌ **Bun**: Not supported - use `forceKillOnTimeout: false` (default) and ensure your handler checks `signal.aborted` If you're using Bun or another runtime without worker thread support, use the default graceful shutdown approach (`forceKillOnTimeout: false`) and make sure your handlers check `signal.aborted` to exit gracefully when timed out. ## Handler Serialization Requirements **IMPORTANT**: When using `forceKillOnTimeout`, your handler must be **serializable**. This means the handler function can be converted to a string and executed in a separate worker thread. ### ✅ Serializable Handlers These handlers will work with `forceKillOnTimeout`: ```typescript // Standalone function const handler = async (payload, signal) => { await doSomething(payload); }; // Function that imports dependencies inside const handler = async (payload, signal) => { const { api } = await import('./api'); await api.call(payload); }; // Function with local variables const handler = async (payload, signal) => { const localVar = 'value'; await process(payload, localVar); }; ``` ### ❌ Non-Serializable Handlers These handlers will **NOT** work with `forceKillOnTimeout`: ```typescript // ❌ Closure over external variable const db = getDatabase(); const handler = async (payload, signal) => { await db.query(payload); // 'db' is captured from closure }; // ❌ Uses 'this' context class MyHandler { async handle(payload, signal) { await this.doSomething(payload); // 'this' won't work } } // ❌ Closure over imported module import { someService } from './services'; const handler = async (payload, signal) => { await someService.process(payload); // 'someService' is from closure }; ``` ## Validating Handler Serialization You can validate that your handlers are serializable before using them: ```typescript import { validateHandlerSerializable, testHandlerSerialization, } from '@nicnocquee/dataqueue'; const handler = async (payload, signal) => { await doSomething(payload); }; // Quick validation (synchronous) const result = validateHandlerSerializable(handler, 'myJob'); if (!result.isSerializable) { console.error('Handler is not serializable:', result.error); } // Thorough test (asynchronous, actually tries to serialize) const testResult = await testHandlerSerialization(handler, 'myJob'); if (!testResult.isSerializable) { console.error('Handler failed serialization test:', testResult.error); } ``` ## Limitations - **`prolong` and `onTimeout` are not supported** with `forceKillOnTimeout: true`. Because the handler runs in a separate Worker Thread, the `JobContext` methods (`prolong` and `onTimeout`) are no-ops in force-kill mode. If you need to extend timeouts dynamically, use the default graceful shutdown (`forceKillOnTimeout: false`) instead. See [Job Timeout](/usage/job-timeout) for details on extending timeouts. ## Best Practices 1. **Use standalone functions**: Define handlers as standalone functions, not closures 2. **Import dependencies inside**: If you need external dependencies, import them inside the handler function 3. **Avoid 'this' context**: Don't use class methods as handlers unless they're bound 4. **Test early**: Use `validateHandlerSerializable` during development to catch issues early 5. **When in doubt, use graceful shutdown**: If your handler can't be serialized, use `forceKillOnTimeout: false` (default) and ensure your handler checks `signal.aborted` 6. **Use `prolong`/`onTimeout` instead**: If your main concern is jobs that are still working but slow, consider using `prolong` or `onTimeout` (with `forceKillOnTimeout: false`) instead of forcefully terminating ## Example: Converting a Non-Serializable Handler **Before** (not serializable): ```typescript import { db } from './db'; export const jobHandlers = { processData: async (payload, signal) => { // ❌ 'db' is captured from closure await db.query('SELECT * FROM data WHERE id = $1', [payload.id]); }, }; ``` **After** (serializable): ```typescript export const jobHandlers = { processData: async (payload, signal) => { // ✅ Import inside the handler const { db } = await import('./db'); await db.query('SELECT * FROM data WHERE id = $1', [payload.id]); }, }; ``` ## Runtime Validation The library automatically validates handlers when `forceKillOnTimeout` is enabled. If a handler cannot be serialized, you'll get a clear error message: ``` Handler for job type "myJob" uses 'this' context which cannot be serialized. Use a regular function or avoid 'this' references when forceKillOnTimeout is enabled. ``` This validation happens when the job is processed, so you'll catch serialization issues early in development. --- # Get Jobs Slug: usage/get-jobs To get a job by its ID: ```typescript const job = await jobQueue.getJob(jobId); ``` To get all jobs: ```typescript const jobs = await jobQueue.getAllJobs(limit, offset); ``` To get jobs by status: ```typescript const jobs = await jobQueue.getJobsByStatus(status, limit, offset); ``` ## Get Jobs by Tags You can get jobs by their tags using the `getJobsByTags` method: ```typescript const jobs = await jobQueue.getJobsByTags(['welcome', 'user'], 'all', 10, 0); ``` - The first argument is an array of tags to match. - The second argument is the tag query mode. See [Tags](/api/tags) for more details. - The third and fourth arguments are optional for pagination. ## Get Jobs by Filter You can retrieve jobs using multiple filters with the `getJobs` method: ```typescript const jobs = await jobQueue.getJobs( { jobType: 'email', priority: 2, runAt: { gte: new Date('2024-01-01'), lt: new Date('2024-02-01') }, tags: { values: ['welcome', 'user'], mode: 'all' }, }, 10, 0, ); ``` - The first argument is an optional filter object. You can filter by: - `jobType`: The job type (string). - `priority`: The job priority (number). - `runAt`: The scheduled time. You can use a `Date` for exact match, or an object with `gt`, `gte`, `lt`, `lte`, or `eq` for range queries. - `tags`: An object with `values` (array of tags) and `mode` (see [Tags](/api/tags)). - The second and third arguments are optional for pagination (`limit`, `offset`). You can combine any of these filters. If no filters are provided, all jobs are returned (with pagination if specified). --- # Initialize Queue Slug: usage/init-queue After defining your job types, payloads, and handlers, you need to initialize the job queue which sets up the connection to your database backend. ## PostgreSQL ```typescript title="@lib/queue.ts" import { initJobQueue } from '@nicnocquee/dataqueue'; import { type JobPayloadMap } from './types/job-payload-map'; let jobQueue: ReturnType> | null = null; export const getJobQueue = () => { if (!jobQueue) {jobQueue = initJobQueue({ databaseConfig: { connectionString: process.env.PG_DATAQUEUE_DATABASE, // Set this in your environment }, verbose: process.env.NODE_ENV === 'development', }); } return jobQueue; }; ``` > **Note:** The value of `connectionString` must be a [valid Postgres connection string](https://www.postgresql.org/docs/current/libpq-connect.html#LIBPQ-CONNSTRING-URIS). For example: ```dotenv PG_DATAQUEUE_DATABASE=postgresql://postgres:password@localhost:5432/my_database?search_path=my_schema ``` ## Redis To use Redis as the backend, set `backend: 'redis'` and provide `redisConfig` instead of `databaseConfig`: ```typescript title="@lib/queue.ts" import { initJobQueue } from '@nicnocquee/dataqueue'; import { type JobPayloadMap } from './types/job-payload-map'; let jobQueue: ReturnType> | null = null; export const getJobQueue = () => { if (!jobQueue) {jobQueue = initJobQueue({ backend: 'redis', redisConfig: { url: process.env.REDIS_URL, // e.g. redis://localhost:6379 }, verbose: process.env.NODE_ENV === 'development', }); } return jobQueue; }; ``` You can also connect using individual connection options instead of a URL: ```typescript title="@lib/queue.ts" jobQueue = initJobQueue({ backend: 'redis', redisConfig: { host: 'localhost', port: 6379, password: process.env.REDIS_PASSWORD, db: 0, keyPrefix: 'myapp:', // Optional, defaults to 'dq:' }, verbose: process.env.NODE_ENV === 'development', }); ``` > **Note:** The `keyPrefix` option lets you namespace all Redis keys. This is useful when sharing a Redis instance between multiple applications or multiple queues. The default prefix is `dq:`. --- ## Bring Your Own Pool / Client Instead of providing connection configuration, you can pass an existing connection instance. This is useful when your application already manages its own connection pool and you want to share it with dataqueue. ### PostgreSQL — External Pool ```typescript title="@lib/queue.ts" import { Pool } from 'pg'; import { initJobQueue } from '@nicnocquee/dataqueue'; import { type JobPayloadMap } from './types/job-payload-map'; const pool = new Pool({ connectionString: process.env.DATABASE_URL }); const jobQueue = initJobQueue({ pool,verbose: process.env.NODE_ENV === 'development', }); ``` ### Redis — External Client ```typescript title="@lib/queue.ts" import IORedis from 'ioredis'; import { initJobQueue } from '@nicnocquee/dataqueue'; import { type JobPayloadMap } from './types/job-payload-map'; const redis = new IORedis(process.env.REDIS_URL); const jobQueue = initJobQueue({ backend: 'redis', client: redis,keyPrefix: 'myapp:', verbose: process.env.NODE_ENV === 'development', }); ``` > **Note:** **Connection ownership:** When you provide your own `pool` or `client`, the library will **not** close it on shutdown. You are responsible for calling `pool.end()` or `client.quit()` when your application exits. --- ## Using the Queue Once initialized, you use the queue instance identically regardless of backend. The API is the same for both PostgreSQL and Redis. ```typescript title="@/app/actions/send-email.ts" import { getJobQueue } from '@/lib/queue'; const sendEmail = async () => {const jobQueue = getJobQueue(); await jobQueue.addJob({ jobType: 'send_email', payload: { to: 'test@example.com', subject: 'Hello', body: 'Hello, world!', }, }); }; ``` --- ## SSL Configuration (PostgreSQL) Most managed Postgres providers (like DigitalOcean, Supabase, etc.) require SSL connections and use their own CA certificate (.crt file) to sign the server's certificate. To securely verify the server's identity, you must configure your client to trust this CA certificate. You can configure SSL for your database connection in several ways, depending on your environment and security requirements. ### Using PEM Strings from Environment Variables This is ideal for serverless environments where you cannot mount files. Store your CA certificate, and optionally client certificate and key, as environment variables then pass them to the `ssl` property of the `databaseConfig` object. ```typescript title="@lib/queue.ts" import { initJobQueue } from '@nicnocquee/dataqueue'; import { type JobPayloadMap } from './types/job-payload-map'; let jobQueue: ReturnType> | null = null; export const getJobQueue = () => { if (!jobQueue) { jobQueue = initJobQueue({ databaseConfig: { connectionString: process.env.PG_DATAQUEUE_DATABASE, // Set this in your environment ssl: { ca: process.env.PGSSLROOTCERT, // PEM string: the content of your .crt file cert: process.env.PGSSLCERT, // PEM string (optional, for client authentication) key: process.env.PGSSLKEY, // PEM string (optional, for client authentication) rejectUnauthorized: true, // Always true for CA-signed certs }, }, verbose: process.env.NODE_ENV === 'development', }); } return jobQueue; }; ``` > **Note:** When using a custom CA certificate and `connectionString`, you must remove the `sslmode` parameter from the connection string. Otherwise, the connection will fail. ### Using File Paths If you have the CA certificate, client certificate, or key on disk, provide their absolute paths using the `file://` prefix. Only values starting with `file://` will be loaded from the file system; all others are treated as PEM strings. ```typescript title="@lib/queue.ts" import { initJobQueue } from '@nicnocquee/dataqueue'; import { type JobPayloadMap } from './types/job-payload-map'; let jobQueue: ReturnType> | null = null; export const getJobQueue = () => { if (!jobQueue) { jobQueue = initJobQueue({ databaseConfig: { connectionString: process.env.PG_DATAQUEUE_DATABASE, ssl: { ca: 'file:///absolute/path/to/ca.crt', // Path to your provider's CA cert cert: 'file:///absolute/path/to/client.crt', // optional, for client authentication key: 'file:///absolute/path/to/client.key', // optional, for client authentication rejectUnauthorized: true, }, }, verbose: process.env.NODE_ENV === 'development', }); } return jobQueue; }; ``` > **Note:** When using a custom CA certificate and `connectionString`, you must remove the `sslmode` parameter from the connection string. Otherwise, the connection will fail. ### Skipping Certificate Validation For convenience, you can skip certificate validation (not recommended for production) by setting `rejectUnauthorized` to `false` and without providing a custom CA certificate. ```typescript title="@lib/queue.ts" import { initJobQueue } from '@nicnocquee/dataqueue'; import { type JobPayloadMap } from './types/job-payload-map'; let jobQueue: ReturnType> | null = null; export const getJobQueue = () => { if (!jobQueue) { jobQueue = initJobQueue({ databaseConfig: { connectionString: process.env.PG_DATAQUEUE_DATABASE, ssl: { rejectUnauthorized: false, }, }, verbose: process.env.NODE_ENV === 'development', }); } return jobQueue; }; ``` > **Note:** When using `rejectUnauthorized: false` and `connectionString`, you must remove the `sslmode` parameter from the connection string. Otherwise, the connection will fail. --- ## TLS Configuration (Redis) If your Redis server requires TLS (common with managed services like AWS ElastiCache, Redis Cloud, etc.), provide TLS options in the `redisConfig`: ```typescript title="@lib/queue.ts" jobQueue = initJobQueue({ backend: 'redis', redisConfig: { url: process.env.REDIS_URL, tls: { ca: process.env.REDIS_CA_CERT, // PEM string rejectUnauthorized: true, }, }, }); ``` --- # Job Events Slug: usage/job-events DataQueue keeps track of every change in a job's status. You can use `getJobEvents` to see all the events for a job, such as when it was added, started processing, completed, failed, cancelled, retried, edited, or prolonged. ```typescript tab="Code" const events = await jobQueue.getJobEvents(jobId); console.log(events); ``` ```json tab="Output" [ { "id": 1, "jobId": 1, "eventType": "processing", "createdAt": "2024-06-01T12:00:00Z", "metadata": "" } ] ``` --- # Job Handlers Slug: usage/job-handlers The first thing you need to do is define your job types and their corresponding payload types. A payload is the data passed to the job handler. A job handler is a function that runs when a job is processed. ### Define Job Types and Payloads Job types and their payloads are specific to your app. You can define them in any file. The important thing is that they are an object type, where the keys are the job types and the values are the payload types. In this example, `send_email`, `generate_report`, and `generate_image` are the job types, and their values are the payload types. ```typescript title="@lib/types/job-payload-map.ts" // Define the job payload map for this app. // This ensures that the job payload is typed correctly when adding jobs. // The keys are the job types, and the values are the payload types. export type JobPayloadMap = { send_email: { to: string; subject: string; body: string; }; generate_report: { reportId: string; userId: string; }; generate_image: { prompt: string; }; }; ``` ### Define Job Handlers Next, define the job handlers by exporting a `JobHandlers` object that maps job types to handler functions. If you forget to add a handler for a job type, TypeScript will show an error. ```typescript title="@lib/job-handlers.ts" import { sendEmail } from './services/email'; // Function to send the email import { generateReport } from './services/generate-report'; // Function to generate the report import { JobHandlers } from '@nicnocquee/dataqueue'; export const jobHandlers: JobHandlers = { send_email: async (payload) => { const { to, subject, body } = payload; await sendEmail(to, subject, body); }, generate_report: async (payload) => { const { reportId, userId } = payload; await generateReport(reportId, userId); }, generate_image: async (payload, signal) => { const { prompt } = payload; await generateImageAi(prompt, signal); }, }; ``` In the example above, we define three job handlers: `send_email`, `generate_report`, and `generate_image`. Each handler is a function that takes a payload, an `AbortSignal`, and a `JobContext` as arguments. The `AbortSignal` is used to abort the job if it takes too long to complete. The `JobContext` provides methods to extend the job's timeout while it's running. ### Job Handler Signature A job handler receives three arguments: the job payload, an `AbortSignal`, and a `JobContext`. ```typescript (payload: Payload, signal: AbortSignal, ctx: JobContext) => Promise; ``` You can omit arguments you don't need. For example, if you only need the payload: ```typescript const handler = async (payload) => { // ... }; ``` ### JobContext The third argument provides methods for timeout management and progress reporting: - `ctx.prolong(ms?)` — Proactively reset the timeout. If `ms` is provided, sets the deadline to `ms` milliseconds from now. If omitted, resets to the original `timeoutMs`. - `ctx.onTimeout(callback)` — Register a callback that fires when the timeout is about to hit, before the `AbortSignal` is triggered. Return a number (ms) to extend, or return nothing to let the timeout proceed. - `ctx.setProgress(percent)` — Report progress as a percentage (0–100). The value is persisted to the database and can be read by clients via `getJob()` or the React SDK's `useJob()` hook. See [Job Timeout](/usage/job-timeout) for timeout examples and [Progress Tracking](/usage/progress-tracking) for progress reporting. --- # Job Output > Store and retrieve results from job handlers Slug: usage/job-output Jobs can store an output value when they complete. This is useful when you need to retrieve the result of a background task — for example, a generated report URL, a processed image path, or computation results. ## Storing Output There are two ways to store output from a handler: ### 1. Return a value from the handler The simplest approach — return any JSON-serializable value from your handler function: ```typescript title="@lib/job-handlers.ts" import { JobHandlers } from '@nicnocquee/dataqueue'; export const jobHandlers: JobHandlers = { generate_report: async (payload, signal, ctx) => { const url = await generateReport(payload.reportId);return { url, generatedAt: new Date().toISOString() };}, }; ``` ### 2. Use `ctx.setOutput(data)` For more control, call `ctx.setOutput()` explicitly. This is useful when you want to store intermediate results during execution: ```typescript title="@lib/job-handlers.ts" export const jobHandlers: JobHandlers = { process_images: async (payload, signal, ctx) => { const results: string[] = []; for (const image of payload.images) { const url = await processImage(image); results.push(url); await ctx.setProgress( Math.round((results.length / payload.images.length) * 100), ); await ctx.setOutput({ processedUrls: results });} }, }; ``` ### Precedence If both `ctx.setOutput()` is called **and** the handler returns a value, the `ctx.setOutput()` value takes precedence. The handler's return value is ignored in that case. ### Rules - **JSON-serializable**: The output value must be JSON-serializable (objects, arrays, strings, numbers, booleans, null). - **Last write wins**: Calling `ctx.setOutput()` multiple times overwrites the previous value. - **Best-effort persistence**: Like `setProgress`, output writes to the database are best-effort — errors do not kill the handler. ## Reading Output Output is stored in the `output` field of the [JobRecord](/api/job-record): ```typescript const job = await jobQueue.getJob(jobId); console.log(job?.output); // null | any JSON value ``` - Before the handler stores output, the value is `null`. - After the job completes, the output is preserved and can be read at any time. - Handlers that return `undefined` (or `void`) do not store output — the field remains `null`. ## Tracking Output in React If you're using the [React SDK](/usage/react-sdk), the `useJob` hook exposes `output` directly: ```tsx import { useJob } from '@nicnocquee/dataqueue-react'; function JobResult({ jobId }: { jobId: number }) { const { status, output, progress } = useJob(jobId, { fetcher: (id) => fetch(`/api/jobs/${id}`) .then((r) => r.json()) .then((d) => d.job), }); if (status === 'completed' && output) { return Download Report; } return (

Status: {status}

); } ``` ## Listening for Output Events You can subscribe to the `job:output` event to be notified whenever a handler calls `ctx.setOutput()`: ```typescript jobQueue.on('job:output', ({ jobId, output }) => { console.log(`Job ${jobId} stored output:`, output); }); ``` ## Database Migration > **Note:** If you're using the **PostgreSQL** backend, make sure to run the latest migrations to add the `output` column. See [Database Migration](/usage/database-migration). The Redis backend requires no migration — the `output` field is stored automatically as part of the job hash. --- # Job Timeout Slug: usage/job-timeout When you add a job to the queue, you can set a timeout for it. If the job doesn't finish before the timeout, it will be marked as `failed` and may be retried. See [Failed Jobs](/usage/failed-jobs) for more information. When the timeout is reached, DataQueue does not actually stop the handler from running. You need to handle this in your handler by checking the `AbortSignal` at one or more points in your code. For example: ```typescript title="@lib/job-handlers.ts" const handler = async (payload, signal) => { // Simulate work // Do something that may take a long time // Check if the job is aborted if (signal.aborted) { return; } // Do something else // Check again if the job is aborted if (signal.aborted) { return; } // ...rest of your logic }; ``` If the job times out, the signal will be aborted and your handler should exit early. If your handler does not check for `signal.aborted`, it will keep running in the background even after the job is marked as failed due to timeout. For best results, always make your handlers abortable if they might run for a long time. ## Extending the Timeout Sometimes a job takes longer than expected but is still making progress. Instead of letting it time out and fail, you can extend the timeout from inside the handler using two mechanisms: **prolong** (proactive) and **onTimeout** (reactive). ### Prolong (proactive) Call `ctx.prolong()` at any point in your handler to reset the timeout deadline: ```typescript title="@lib/job-handlers.ts" const handler = async (payload, signal, { prolong }) => { await doStep1(payload); // "I know the next step is heavy, give me 60 more seconds" prolong(60_000); await doHeavyStep2(payload); // Reset to the original timeout duration (heartbeat-style) prolong(); await doStep3(payload); }; ``` - `prolong(ms)` — sets the timeout deadline to `ms` milliseconds from now. - `prolong()` — resets the timeout deadline to the original `timeoutMs` from now. ### onTimeout (reactive) Register a callback that fires when the timeout is about to hit, **before** the `AbortSignal` is triggered. The callback can decide whether to extend or let the timeout proceed: ```typescript title="@lib/job-handlers.ts" const handler = async (payload, signal, { onTimeout }) => { let progress = 0; onTimeout(() => { if (progress < 100) { return 30_000; // still working, give me 30 more seconds } // return nothing to let the timeout proceed }); for (const chunk of payload.chunks) { await processChunk(chunk); progress += 10; } }; ``` - If the callback returns a number > 0, the timeout is reset to that many milliseconds from now. - If the callback returns `undefined`, `null`, `0`, or a negative number, the timeout proceeds normally (signal is aborted, job fails). - The callback fires again each time a new deadline is reached, so the job can keep extending or finally let go. ### Using Both Together `prolong` and `onTimeout` work together. Use `prolong` when you know upfront that a step will be heavy. Use `onTimeout` for a last-second decision when the deadline arrives. ```typescript title="@lib/job-handlers.ts" const handler = async (payload, signal, { prolong, onTimeout }) => { // Reactive fallback: extend if still making progress let step = 0; onTimeout(() => { if (step < 3) return 30_000; }); step = 1; await doStep1(payload); // Proactive: we know step 2 is heavy step = 2; prolong(120_000); await doHeavyStep2(payload); step = 3; await doStep3(payload); }; ``` ### Side Effects When either mechanism extends the timeout, DataQueue also updates `locked_at` in the database. This prevents [`reclaimStuckJobs`](/usage/reclaim-jobs) from accidentally reclaiming the job while it's still actively working. A `prolonged` event is also recorded in the job's event history. Note that `reclaimStuckJobs` is already aware of each job's `timeoutMs` — a job will not be reclaimed until the greater of `maxProcessingTimeMinutes` and the job's own `timeoutMs` has elapsed. `prolong` is still useful when you want to extend _beyond_ the original timeout, or as a heartbeat for jobs without a `timeoutMs`. ### Limitations - Both `prolong` and `onTimeout` are no-ops if the job has no `timeoutMs` set. - Neither is supported with `forceKillOnTimeout: true` (worker thread mode). See [Force Kill on Timeout](/usage/force-kill-timeout) for details. ## Force Kill on Timeout If you need to forcefully terminate jobs that don't respond to the abort signal, you can use `forceKillOnTimeout: true`. This will run the handler in a Worker Thread and forcefully terminate it when the timeout is reached. **Warning**: `forceKillOnTimeout` requires **Node.js** and will **not work** in Bun or other runtimes without worker thread support. See [Force Kill on Timeout](/usage/force-kill-timeout) for details. **Important**: When using `forceKillOnTimeout`, your handler must be serializable. See [Force Kill on Timeout](/usage/force-kill-timeout) for details. ```typescript await queue.addJob({ jobType: 'longRunningTask', payload: { data: '...' }, timeoutMs: 5000, forceKillOnTimeout: true, // Forcefully terminate if timeout is reached }); ``` --- # Long-Running Server Slug: usage/long-running-server The [Process Jobs](/usage/process-jobs) page covers processing jobs in a serverless environment using cron-triggered API routes. If you're running a long-lived server (Express, Fastify, plain Node.js, etc.), you can instead run the processor continuously in the background and handle lifecycle management yourself. ## Starting the Processor in the Background Use `startInBackground()` to run the processor as a continuous polling loop. It will check for new jobs every `pollInterval` milliseconds (default: 5 seconds) and process them automatically. ```typescript import { initJobQueue } from '@nicnocquee/dataqueue'; import { jobHandlers } from './job-handlers'; const jobQueue = initJobQueue({ databaseConfig: { connectionString: process.env.PG_DATAQUEUE_DATABASE, }, }); const processor = jobQueue.createProcessor(jobHandlers, { workerId: `server-${process.pid}`, batchSize: 10, concurrency: 3, pollInterval: 5000, // check for new jobs every 5 seconds onError: (error) => { // Called when an unexpected error occurs during batch processing. // Use this to send errors to your monitoring service. console.error('Processor error:', error); }, }); processor.startInBackground(); ``` When a full batch is returned (i.e., the number of processed jobs equals `batchSize`), the processor immediately fetches the next batch when the current batch is finished, so it can drain a large backlog quickly. Once a batch returns fewer jobs than `batchSize`, it waits `pollInterval` before fetching the next batch. ### Configuration Tips - **`pollInterval`** -- Lower values (e.g., `1000`) reduce latency for new jobs but increase database load. Higher values (e.g., `10000`) are gentler on the database but introduce more delay. 5 seconds is a good default. - **`concurrency`** -- Keep this proportional to your server's resources. If jobs call external APIs with rate limits, keep it low. - **`batchSize`** -- Larger batches reduce polling overhead but hold a database lock longer during claim. 10-20 is typical. - **`onError`** -- Always set this in production. Without it, errors default to `console.error` which is easy to miss. ## Graceful Shutdown When your server receives a termination signal (e.g., `SIGTERM` from a container orchestrator), you should stop the processor and wait for in-flight jobs to finish before exiting. Use `stopAndDrain()` for this. ```typescript async function shutdown() { console.log('Shutting down...'); // Stop polling and wait for the current batch to finish (up to 30 seconds) await processor.stopAndDrain(30000); // Close the database connection pool // PostgreSQL: jobQueue.getPool().end(); // Redis: // jobQueue.getRedisClient().quit(); console.log('Shutdown complete'); process.exit(0); } process.on('SIGTERM', shutdown); process.on('SIGINT', shutdown); ``` `stopAndDrain()` accepts an optional timeout in milliseconds (default: 30000). If the current batch does not finish within that time, the promise resolves anyway so your process is not stuck indefinitely. > **Note:** Use `stopAndDrain()` instead of `stop()` for graceful shutdown. `stop()` halts the polling loop immediately without waiting for in-flight jobs, which can leave jobs stuck in the `processing` state until they are reclaimed. ## Background Supervisor In production, you need periodic maintenance: reclaiming stuck jobs, cleaning up old completed jobs/events, and expiring timed-out waitpoint tokens. The **supervisor** automates all of this. ```typescript const supervisor = jobQueue.createSupervisor({ intervalMs: 60_000, // run maintenance every 60 seconds stuckJobsTimeoutMinutes: 10, // reclaim jobs stuck > 10 minutes cleanupJobsDaysToKeep: 30, // delete completed jobs older than 30 days cleanupEventsDaysToKeep: 30, // delete job events older than 30 days onError: (error) => { console.error('Supervisor error:', error); }, }); supervisor.startInBackground(); ``` The supervisor runs independently from the processor. Each maintenance task is isolated -- if one fails, the others still run. All operations are idempotent, so it's safe to run multiple supervisor instances across a cluster. ### Supervisor Options | Option | Default | Description | | ------------------------- | --------------- | ------------------------------------------------------------ | | `intervalMs` | `60000` | How often the maintenance loop runs (ms) | | `stuckJobsTimeoutMinutes` | `10` | Reclaim jobs stuck in `processing` longer than this | | `cleanupJobsDaysToKeep` | `30` | Delete completed jobs older than this (days). `0` to disable | | `cleanupEventsDaysToKeep` | `30` | Delete job events older than this (days). `0` to disable | | `cleanupBatchSize` | `1000` | Batch size for cleanup deletions | | `reclaimStuckJobs` | `true` | Enable/disable stuck job reclaiming | | `expireTimedOutTokens` | `true` | Enable/disable waitpoint token expiry | | `onError` | `console.error` | Called when a maintenance task throws | | `verbose` | `false` | Enable verbose logging | ### Graceful Shutdown with the Supervisor When shutting down, drain both the processor and the supervisor: ```typescript async function shutdown() { console.log('Shutting down...'); await Promise.all([ processor.stopAndDrain(30000), supervisor.stopAndDrain(30000), ]); jobQueue.getPool().end(); console.log('Shutdown complete'); process.exit(0); } process.on('SIGTERM', shutdown); process.on('SIGINT', shutdown); ``` ### One-Shot Mode (Serverless) If you prefer to run maintenance from a cron-triggered API route or a serverless function, use `start()` instead of `startInBackground()`. It runs all tasks once and returns the results: ```typescript const supervisor = jobQueue.createSupervisor(); const result = await supervisor.start(); // { reclaimedJobs: 3, cleanedUpJobs: 120, cleanedUpEvents: 45, expiredTokens: 0 } ``` ## Manual Maintenance > **Note:** The supervisor above is the recommended approach. Manual maintenance is still supported if you need fine-grained control over individual tasks. In a serverless setup, you use cron-triggered API routes for [cleanup](/usage/cleanup-jobs) and [reclaim](/usage/reclaim-jobs). In a long-running server, you can use `setInterval` instead. ```typescript // Reclaim stuck jobs every 10 minutes const reclaimInterval = setInterval( async () => { try { const reclaimed = await jobQueue.reclaimStuckJobs(10); if (reclaimed > 0) console.log(`Reclaimed ${reclaimed} stuck jobs`); } catch (error) { console.error('Reclaim error:', error); } }, 10 * 60 * 1000, ); // Clean up completed jobs older than 30 days, once per day const cleanupInterval = setInterval( async () => { try { const deleted = await jobQueue.cleanupOldJobs(30); if (deleted > 0) console.log(`Cleaned up ${deleted} old jobs`); const deletedEvents = await jobQueue.cleanupOldJobEvents(30); if (deletedEvents > 0) console.log(`Cleaned up ${deletedEvents} old job events`); } catch (error) { console.error('Cleanup error:', error); } }, 24 * 60 * 60 * 1000, ); ``` Make sure to clear these intervals during shutdown: ```typescript async function shutdown() { clearInterval(reclaimInterval); clearInterval(cleanupInterval); await processor.stopAndDrain(30000); jobQueue.getPool().end(); process.exit(0); } ``` ## Full Example Here is a complete Express server that ties everything together: ```typescript title="server.ts" import express from 'express'; import { initJobQueue } from '@nicnocquee/dataqueue'; import { jobHandlers } from './job-handlers'; // --- Initialize the queue --- const jobQueue = initJobQueue({ databaseConfig: { connectionString: process.env.PG_DATAQUEUE_DATABASE!, }, }); // --- Create and start the processor --- const processor = jobQueue.createProcessor(jobHandlers, { workerId: `server-${process.pid}`, batchSize: 10, concurrency: 3, pollInterval: 5000, onError: (error) => { console.error('Processor error:', error); }, }); processor.startInBackground(); // --- Start the background supervisor --- const supervisor = jobQueue.createSupervisor({ intervalMs: 60_000, stuckJobsTimeoutMinutes: 10, cleanupJobsDaysToKeep: 30, cleanupEventsDaysToKeep: 30, onError: (error) => { console.error('Supervisor error:', error); }, }); supervisor.startInBackground(); // --- Express app --- const app = express(); app.use(express.json()); app.post('/jobs', async (req, res) => { const { jobType, payload } = req.body; const jobId = await jobQueue.addJob({ jobType, payload }); res.json({ jobId }); }); app.get('/jobs/:id', async (req, res) => { const job = await jobQueue.getJob(Number(req.params.id)); if (!job) return res.status(404).json({ error: 'Not found' }); res.json(job); }); const server = app.listen(3000, () => { console.log('Server running on port 3000'); }); // --- Graceful shutdown --- async function shutdown() { console.log('Shutting down gracefully...'); // Stop accepting new HTTP connections server.close(); // Wait for in-flight work to finish await Promise.all([ processor.stopAndDrain(30000), supervisor.stopAndDrain(30000), ]); // Close the database pool jobQueue.getPool().end(); console.log('Shutdown complete'); process.exit(0); } process.on('SIGTERM', shutdown); process.on('SIGINT', shutdown); ``` --- # Process Jobs Slug: usage/process-jobs So far, we haven't actually performed any jobs—we've only added them to the queue. Now, let's process those jobs. > **Note:** This page covers processing jobs in a **serverless environment** using cron-triggered API routes. If you're running a long-lived server (Express, Fastify, etc.), see [Long-Running Server](/usage/long-running-server). In a serverless environment, we can't have a long-running process that constantly monitors and processes the queue. Instead, we create an API endpoint that checks the queue and processes jobs in batches. This endpoint is then triggered by a cron job. For example, you can create an API endpoint at `app/api/cron/process` to process jobs in batches: ```typescript title="@/app/api/cron/process.ts" import { jobHandlers } from '@/lib/job-handler'; import { getJobQueue } from '@/lib/queue'; import { NextResponse } from 'next/server'; export async function GET(request: Request) { // Secure the cron route: https://vercel.com/docs/cron-jobs/manage-cron-jobs#securing-cron-jobs const authHeader = request.headers.get('authorization'); if (authHeader !== `Bearer ${process.env.CRON_SECRET}`) { return NextResponse.json({ message: 'Unauthorized' }, { status: 401 }); } try {const jobQueue = getJobQueue(); // Control how many jobs are processed in parallel per batch using the `concurrency` option. // For example, to process up to 3 jobs in parallel per batch: const processor = jobQueue.createProcessor(jobHandlers, { workerId: `cron-${Date.now()}`, batchSize: 10, // up to 10 jobs per batch concurrency: 3, // up to 3 jobs processed in parallel verbose: true, }); const processed = await processor.start(); return NextResponse.json({ message: 'Job processing completed', processed, }); } catch (error) { console.error('Error processing jobs:', error); return NextResponse.json( { message: 'Failed to process jobs' }, { status: 500 }, ); } } ``` In the example above, we use the `createProcessor` method to create a processor. When you call the processor's `start` function, it processes jobs in the queue up to the `batchSize` limit. ### Batch Size Serverless platforms like Vercel limit how long a function can run. If you set `batchSize` too high, the function might run too long and get killed. Choose a `batchSize` that fits your use case. You can also process only certain job types by setting the `jobType` option. If a job type is more resource-intensive, use a lower `batchSize` for that type. For example, you can define two endpoints: one for low-resource jobs and another for high-resource jobs, each with different `batchSize` and `concurrency` values. ### Concurrency Some jobs are resource-intensive, like image processing, LLM calls, or calling a rate-limited external service. In these cases, set the `concurrency` option to control how many jobs run in parallel per batch. The default is `3`. Set it to `1` to process jobs one at a time. Use a lower value to avoid exhausting resources in constrained environments. ### Triggering the Processor via Cron Defining an endpoint isn't enough—you need to trigger it regularly. For example, use Vercel cron to trigger the endpoint every minute by adding this to your `vercel.json`: ```json title="vercel.json" { "$schema": "https://openapi.vercel.sh/vercel.json", "crons": [ { "path": "/api/cron/process", "schedule": "* * * * *" } ] } ``` For Vercel cron, set the `CRON_SECRET` environment variable, as it's sent in the `authorization` header. If you use a different cron service, set the `authorization` header to the value of `CRON_SECRET`: ``` Authorization: Bearer ``` During development, you can create a small script to run the cron job continuously in the background. For example, you can create a `cron.sh` file like [this one](https://github.com/nicnocquee/dataqueue/blob/main/apps/demo/cron.sh), then add it to your `package.json` scripts: ```json title="package.json" { "scripts": { "cron": "bash cron.sh" } } ``` Then, you can run the cron job by running `pnpm cron` from the apps/demo directory. --- # Progress Tracking > Report and track job progress from handlers Slug: usage/progress-tracking Jobs can report their progress as a percentage (0–100) while they run. This is useful for long-running tasks like file processing, data imports, or image generation where you want to show a progress bar or percentage to the user. ## Reporting Progress from a Handler Use `ctx.setProgress(percent)` inside your job handler to report progress: ```typescript title="@lib/job-handlers.ts" import { JobHandlers } from '@nicnocquee/dataqueue'; export const jobHandlers: JobHandlers = { generate_report: async (payload, signal, ctx) => { const chunks = await loadData(payload.reportId); for (let i = 0; i < chunks.length; i++) { if (signal.aborted) return; await processChunk(chunks[i]); // Report progress (0-100) await ctx.setProgress(Math.round(((i + 1) / chunks.length) * 100)); } }, }; ``` ### setProgress Rules - **Range**: The value must be between 0 and 100 (inclusive). Values outside this range throw an error. - **Rounding**: Fractional values are rounded to the nearest integer (`33.7` becomes `34`). - **Best-effort persistence**: Progress is written to the database but errors during the write do not kill the handler — processing continues. ## Reading Progress Progress is stored in the `progress` field of the [JobRecord](/api/job-record): ```typescript const job = await jobQueue.getJob(jobId); console.log(job?.progress); // null | 0–100 ``` - Before the handler calls `setProgress`, the value is `null`. - After the job completes, the last progress value is preserved (typically `100`). ## Tracking Progress in React If you're using the [React SDK](/usage/react-sdk), the `useJob` hook exposes `progress` directly: ```tsx import { useJob } from '@nicnocquee/dataqueue-react'; function JobProgress({ jobId }: { jobId: number }) { const { status, progress } = useJob(jobId, { fetcher: (id) => fetch(`/api/jobs/${id}`) .then((r) => r.json()) .then((d) => d.job), }); return (

Status: {status}

{progress ?? 0}%
); } ``` ## Database Migration > **Note:** If you're using the **PostgreSQL** backend, make sure to run the latest migrations to add the `progress` column. See [Database Migration](/usage/database-migration). The Redis backend requires no migration — the `progress` field is stored automatically as part of the job hash. ## Related - [Job Output](/usage/job-output) — Store and retrieve results from job handlers using `ctx.setOutput()` or handler return values. --- # Quick Start > Get started with DataQueue Slug: usage/quick-start In this docs, we'll use a Next.js with App Router project which is deployed to Vercel as an example. ## Next.js Shortcut If you're using Next.js, you can scaffold everything — API routes, a job queue singleton, a cron script, and all dependencies — with a single command: ```bash npx dataqueue-cli init ``` The command auto-detects your project structure (App Router vs Pages Router, `src/` directory vs root) and creates all the files you need. See the [`init` CLI reference](/cli/init) for full details. If you prefer to set things up manually, follow the steps below. ## PostgreSQL Backend 1. [Run migrations before deploying your app](/usage/database-migration) 2. [Define job handlers](/usage/job-handlers) 3. [Initialize the job queue](/usage/init-queue) 4. [Add a job](/usage/add-job) 5. Create three API routes to [process jobs](/usage/process-jobs), [reclaim stuck jobs](/usage/reclaim-jobs), and [cleanup old jobs](/usage/cleanup-jobs) 6. [Call those API routes periodically](/usage/process-jobs#triggering-the-processor-via-cron) via a cron service (like Vercel cron) or a small script like [this one](https://github.com/nicnocquee/dataqueue/blob/main/apps/demo/cron.sh) during development. ## Redis Backend 1. [Install `ioredis`](/intro/install) 2. [Define job handlers](/usage/job-handlers) 3. [Initialize the job queue with Redis config](/usage/init-queue#redis) 4. [Add a job](/usage/add-job) 5. Create three API routes to [process jobs](/usage/process-jobs), [reclaim stuck jobs](/usage/reclaim-jobs), and [cleanup old jobs](/usage/cleanup-jobs) 6. [Call those API routes periodically](/usage/process-jobs#triggering-the-processor-via-cron) via a cron service (like Vercel cron) or a small script like [this one](https://github.com/nicnocquee/dataqueue/blob/main/apps/demo/cron.sh) during development. > **Note:** The Redis backend requires **no database migrations**. Just install `ioredis`, configure the connection, and you're ready to go. ## Long-Running Server If you're running a persistent server (Express, Fastify, plain Node.js, etc.) instead of a serverless environment, the setup is slightly different: 1. [Run migrations](/usage/database-migration) (PostgreSQL) or [install `ioredis`](/intro/install) (Redis) 2. [Define job handlers](/usage/job-handlers) 3. [Initialize the job queue](/usage/init-queue) 4. Start the processor in the background with `startInBackground()` 5. Start the [background supervisor](/usage/long-running-server#background-supervisor) with `createSupervisor()` to automate maintenance (reclaim stuck jobs, cleanup old data) 6. Handle `SIGTERM`/`SIGINT` for graceful shutdown with `stopAndDrain()` See [Long-Running Server](/usage/long-running-server) for a complete walkthrough and full example. --- # React SDK > Subscribe to job status and progress from React Slug: usage/react-sdk The `@nicnocquee/dataqueue-react` package provides React hooks for subscribing to job updates. It uses polling to track a job's status and progress in real-time. ## Installation ```bash npm install @nicnocquee/dataqueue-react ``` > **Note:** The React SDK requires React 18 or later. ## Quick Start The simplest way to use the SDK is with the `useJob` hook: ```tsx title="components/JobTracker.tsx" 'use client'; import { useJob } from '@nicnocquee/dataqueue-react'; function JobTracker({ jobId }: { jobId: number }) { const { status, progress, data, isLoading, error } = useJob(jobId, { fetcher: (id) => fetch(`/api/jobs/${id}`) .then((r) => r.json()) .then((d) => d.job), pollingInterval: 1000, }); if (isLoading) return

Loading...

; if (error) return

Error: {error.message}

; return (

Status: {status}

{progress ?? 0}%
); } ``` ### API Route The `fetcher` function should call an API route that returns the job data. Here's an example Next.js API route: ```typescript title="app/api/jobs/[id]/route.ts" import { getJobQueue } from '@/lib/queue'; import { NextResponse } from 'next/server'; export async function GET( _request: Request, { params }: { params: Promise<{ id: string }> }, ) { const { id } = await params; const jobQueue = getJobQueue(); const job = await jobQueue.getJob(Number(id)); if (!job) { return NextResponse.json({ error: 'Job not found' }, { status: 404 }); } return NextResponse.json({ job }); } ``` ## DataqueueProvider To avoid passing the `fetcher` and `pollingInterval` to every `useJob` call, wrap your app (or a subtree) in a `DataqueueProvider`: ```tsx title="app/providers.tsx" 'use client'; import { DataqueueProvider } from '@nicnocquee/dataqueue-react'; const fetcher = (id: number) => fetch(`/api/jobs/${id}`) .then((r) => r.json()) .then((d) => d.job); export function Providers({ children }: { children: React.ReactNode }) { return ( {children} ); } ``` Then use `useJob` without repeating the config: ```tsx const { status, progress } = useJob(jobId); ``` Options passed directly to `useJob` override the provider values. ## useJob API ```typescript const result = useJob(jobId, options?); ``` ### Parameters - `jobId`: _number | null | undefined_ — The job ID to subscribe to. Pass `null` or `undefined` to skip polling. - `options` _(optional)_: | Option | Type | Default | Description | | ----------------- | ---------------------------------- | ------------- | --------------------------------- | | `fetcher` | `(id: number) => Promise` | from provider | Function that fetches a job by ID | | `pollingInterval` | `number` | `1000` | Milliseconds between polls | | `enabled` | `boolean` | `true` | Set to `false` to pause polling | | `onStatusChange` | `(newStatus, oldStatus) => void` | — | Called when status changes | | `onComplete` | `(job) => void` | — | Called when job completes | | `onFailed` | `(job) => void` | — | Called when job fails | ### Return Value | Field | Type | Description | | ----------- | ------------------- | --------------------------------------------- | | `data` | `JobData \| null` | Latest job data, or `null` before first fetch | | `status` | `JobStatus \| null` | Current job status | | `progress` | `number \| null` | Progress percentage (0–100) | | `isLoading` | `boolean` | `true` until the first fetch resolves | | `error` | `Error \| null` | Last fetch error, if any | ### Smart Polling The hook automatically stops polling when the job reaches a **terminal status**: `completed`, `failed`, or `cancelled`. This avoids unnecessary network requests once the job is done. ## Callbacks Use callbacks to react to job lifecycle events: ```tsx useJob(jobId, { fetcher, onStatusChange: (newStatus, oldStatus) => { console.log(`Job went from ${oldStatus} to ${newStatus}`); }, onComplete: (job) => { toast.success('Job completed!'); }, onFailed: (job) => { toast.error('Job failed.'); }, }); ``` ## JobData Type The `fetcher` should return an object matching the `JobData` interface: ```typescript interface JobData { id: number; status: | 'pending' | 'processing' | 'completed' | 'failed' | 'cancelled' | 'waiting'; progress?: number | null; [key: string]: unknown; } ``` The `id`, `status`, and optionally `progress` fields are required. Any additional fields from your API response are preserved in `data`. --- # Reclaim Jobs Slug: usage/reclaim-jobs > **Note:** Running a long-lived server? Use [`createSupervisor()`](/usage/long-running-server#background-supervisor) to automate stuck-job reclaiming instead of calling `reclaimStuckJobs` manually. Sometimes, a job can get stuck in the `processing` state. This usually happens if the process is killed or an unhandled error occurs after the job status is updated, but before it is marked as `completed` or `failed`. To recover stuck jobs, use the `reclaimStuckJobs` method. The example below shows how to create an API route (`/api/cron/reclaim`) that can be triggered by a cron job: ```typescript title="@/app/api/cron/reclaim.ts" import { getJobQueue } from '@/lib/queue'; import { NextResponse } from 'next/server'; export async function GET(request: Request) { // Secure the cron route: https://vercel.com/docs/cron-jobs/manage-cron-jobs#securing-cron-jobs const authHeader = request.headers.get('authorization'); if (authHeader !== `Bearer ${process.env.CRON_SECRET}`) { return NextResponse.json({ message: 'Unauthorized' }, { status: 401 }); } try {const jobQueue = getJobQueue(); // Reclaim jobs stuck for more than 10 minutes const reclaimed = await jobQueue.reclaimStuckJobs(10); console.log(`Reclaimed ${reclaimed} stuck jobs`); return NextResponse.json({ message: 'Stuck jobs reclaimed', reclaimed, }); } catch (error) { console.error('Error reclaiming jobs:', error); return NextResponse.json( { message: 'Failed to reclaim jobs' }, { status: 500 }, ); } } ``` #### Per-Job Timeout Awareness `reclaimStuckJobs` respects each job's individual `timeoutMs`. If a job has a `timeoutMs` that is longer than `maxProcessingTimeMinutes`, it will not be reclaimed until its own timeout has elapsed. For example, if you call `reclaimStuckJobs(10)` and a job has `timeoutMs: 1800000` (30 minutes), that job will only be reclaimed after 30 minutes — not 10. Jobs without a `timeoutMs` continue to use the global `maxProcessingTimeMinutes` threshold as before. #### Scheduling the Reclaim Job with Cron Add the following to your `vercel.json` to call the cron route every 10 minutes: ```json title="vercel.json" { "crons": [ { "path": "/api/cron/reclaim", "schedule": "*/10 * * * *" } ] } ``` --- # Scaling to Thousands of Jobs Slug: usage/scaling DataQueue is designed to handle high-volume workloads out of the box. This page covers how to tune your setup for thousands (or more) of concurrent jobs, whether you're using PostgreSQL or Redis. ## How Throughput Works When a processor runs, it follows this cycle: 1. **Claim** a batch of ready jobs from the database (atomically, using `FOR UPDATE SKIP LOCKED` in PostgreSQL or Lua scripts in Redis). 2. **Process** up to `concurrency` jobs in parallel from that batch. 3. **Repeat** immediately if the batch was full, or wait `pollInterval` before checking again. The theoretical maximum throughput of a single processor instance is: ``` throughput = batchSize / (avgJobDuration + pollInterval) ``` For example, with `batchSize: 20`, `concurrency: 10`, `pollInterval: 2000ms`, and jobs averaging 500ms each, a single processor can handle roughly **10 jobs/second** (600/minute). > **Note:** When a full batch is returned, the processor immediately polls again without waiting. This means it can drain backlogs much faster than the formula suggests during peak load. ## Tuning Processor Settings ### Batch Size `batchSize` controls how many jobs are claimed per polling cycle. | Environment | Recommended | Why | | --------------------------- | ----------- | -------------------------------------- | | Serverless (Vercel, Lambda) | 1-10 | Functions have execution time limits | | Long-running server | 10-50 | Larger batches reduce polling overhead | | High-throughput worker | 50-100 | Maximizes throughput per poll cycle | ### Concurrency `concurrency` controls how many jobs from the batch run in parallel. - **CPU-bound jobs** (image processing, compression): keep concurrency low (1-4) to avoid CPU saturation. - **IO-bound jobs** (API calls, email sending): higher concurrency (5-20) works well since jobs spend most time waiting. - **Rate-limited APIs**: match concurrency to the API's rate limit to avoid throttling. ### Poll Interval `pollInterval` controls how often the processor checks for new jobs when idle. - **1000ms**: Low latency, higher database load. Good for real-time workloads. - **5000ms** (default): Balanced. Good for most use cases. - **10000-30000ms**: Gentle on the database. Use when latency tolerance is high. ```typescript const processor = jobQueue.createProcessor(jobHandlers, { batchSize: 30, concurrency: 10, pollInterval: 2000, }); ``` ## Horizontal Scaling with Multiple Workers DataQueue supports running **multiple processor instances** simultaneously -- on the same server, across multiple servers, or in separate containers. No coordination is needed between workers. ### How It Works - Each processor gets a unique `workerId` (auto-generated, or set manually). - PostgreSQL uses `FOR UPDATE SKIP LOCKED` to ensure no two workers claim the same job. - Redis uses atomic Lua scripts for the same guarantee. - Workers can safely run on different machines pointing at the same database. ### Example: Multiple Workers ```typescript // Worker 1 (e.g., on server A) const processor1 = jobQueue.createProcessor(jobHandlers, { workerId: 'worker-a', batchSize: 20, concurrency: 5, }); processor1.startInBackground(); // Worker 2 (e.g., on server B) const processor2 = jobQueue.createProcessor(jobHandlers, { workerId: 'worker-b', batchSize: 20, concurrency: 5, }); processor2.startInBackground(); ``` ### Specialized Workers Use the `jobType` filter to create workers dedicated to specific job types. This lets you scale different workloads independently: ```typescript // Fast worker for lightweight jobs const emailWorker = jobQueue.createProcessor(jobHandlers, { jobType: 'email', batchSize: 50, concurrency: 20, pollInterval: 1000, }); // Slow worker for heavy jobs const reportWorker = jobQueue.createProcessor(jobHandlers, { jobType: 'report', batchSize: 5, concurrency: 1, pollInterval: 5000, }); ``` ## PostgreSQL Scaling ### Connection Pool Each processor instance uses database connections from the pool. The default `max` is 10, which works for a single processor. If you run multiple processors in the same process, increase the pool size: ```typescript const jobQueue = initJobQueue({ databaseConfig: { connectionString: process.env.DATABASE_URL, max: 20, // increase for multiple processors }, }); ``` > **Note:** If you run processors on separate servers, each has its own pool. The total connections across all servers should stay within your database's `max_connections` setting (typically 100 for managed databases like Neon or Supabase). ### Table Maintenance As completed jobs accumulate, the `job_queue` table grows. This doesn't affect processing speed (claim queries use partial indexes that only cover active jobs), but it does increase storage and slow down full-table queries like `getJobs()`. **Run cleanup regularly:** ```typescript // Delete completed jobs older than 30 days await jobQueue.cleanupOldJobs(30); // Delete old job events await jobQueue.cleanupOldJobEvents(30); ``` Cleanup operations are batched internally (1000 rows at a time) so they won't lock the table or timeout even with hundreds of thousands of old jobs. **Run reclaim regularly:** ```typescript // Reclaim jobs stuck in 'processing' for more than 10 minutes await jobQueue.reclaimStuckJobs(10); ``` This recovers jobs from workers that crashed or timed out. ### Monitoring Table Size You can monitor the `job_queue` table size with this query: ```sql SELECT pg_size_pretty(pg_total_relation_size('job_queue')) AS total_size, (SELECT count(*) FROM job_queue WHERE status = 'pending') AS pending, (SELECT count(*) FROM job_queue WHERE status = 'processing') AS processing, (SELECT count(*) FROM job_queue WHERE status = 'completed') AS completed, (SELECT count(*) FROM job_queue WHERE status = 'failed') AS failed; ``` ### Performance Indexes DataQueue includes optimized partial indexes out of the box: - `idx_job_queue_claimable` -- speeds up job claiming (pending jobs by priority). - `idx_job_queue_failed_retry` -- speeds up retry scheduling. - `idx_job_queue_stuck` -- speeds up stuck job reclamation. - `idx_job_queue_cleanup` -- speeds up cleanup of old completed jobs. These are created automatically when you run migrations. ## Redis Scaling ### Memory Usage Redis stores everything in memory. Estimate memory usage as: | Jobs | Approximate Memory | | ------- | ------------------ | | 1,000 | 1-2 MB | | 10,000 | 10-20 MB | | 100,000 | 100-200 MB | This assumes payloads under 1 KB each. Larger payloads increase memory proportionally. ### Key Best Practices - **Enable persistence**: Use AOF (Append Only File) for durability. Without it, a Redis restart loses all jobs. - **Use `keyPrefix`** to isolate multiple queues in the same Redis instance: ```typescript const jobQueue = initJobQueue({ backend: 'redis', redisConfig: { url: process.env.REDIS_URL, keyPrefix: 'myapp:jobs:', }, }); ``` - **Run cleanup regularly** to free memory from completed jobs. Like PostgreSQL, Redis cleanup is batched internally using cursor-based scanning, so it's safe to run even with a large number of completed jobs. ## Payload Best Practices Keep payloads small. Store references (IDs, URLs) rather than full data: ```typescript // Good: small payload with reference await jobQueue.addJob({ jobType: 'processImage', payload: { imageId: 'img_abc123', bucket: 's3://uploads' }, }); // Avoid: large payload with embedded data await jobQueue.addJob({ jobType: 'processImage', payload: { imageData: '' }, // too large }); ``` A good target is **under 10 KB per payload**. This keeps database queries fast and Redis memory predictable. ## Example: Processing 10,000 Jobs per Hour Here's a configuration that can comfortably process ~10,000 jobs per hour, assuming each job takes about 1 second: ```typescript title="worker.ts" import { initJobQueue } from '@nicnocquee/dataqueue'; import { jobHandlers } from './job-handlers'; const jobQueue = initJobQueue({ databaseConfig: { connectionString: process.env.DATABASE_URL, max: 15, // room for processor + maintenance queries }, }); const processor = jobQueue.createProcessor(jobHandlers, { batchSize: 30, concurrency: 10, // 10 jobs in parallel pollInterval: 2000, onError: (error) => { console.error('Processor error:', error); }, }); processor.startInBackground(); // Maintenance setInterval( async () => { try { await jobQueue.reclaimStuckJobs(10); } catch (e) { console.error(e); } }, 10 * 60 * 1000, ); setInterval( async () => { try { await jobQueue.cleanupOldJobs(30); await jobQueue.cleanupOldJobEvents(30); } catch (e) { console.error(e); } }, 24 * 60 * 60 * 1000, ); // Graceful shutdown process.on('SIGTERM', async () => { await processor.stopAndDrain(30000); jobQueue.getPool().end(); process.exit(0); }); ``` With 10 concurrent jobs and ~1s each, a single worker handles roughly **10 jobs/second** = **36,000 jobs/hour**. For higher throughput, add more worker instances. ## Quick Reference | Scale | Workers | batchSize | concurrency | pollInterval | Notes | | ------------------- | ------- | --------- | ----------- | ------------ | -------------------------------- | | < 100 jobs/hour | 1 | 10 | 3 | 5000ms | Default settings work fine | | 100-1,000/hour | 1 | 20 | 5 | 3000ms | Single worker is sufficient | | 1,000-10,000/hour | 1-2 | 30 | 10 | 2000ms | Add a second worker if needed | | 10,000-100,000/hour | 2-5 | 50 | 15 | 1000ms | Multiple workers recommended | | 100,000+/hour | 5+ | 50-100 | 20 | 1000ms | Specialized workers per job type | --- # Wait Slug: usage/wait During a job handler you can wait for a period of time, until a specific date, or for an external signal before continuing execution. This is useful for building multi-step workflows like onboarding sequences, approval flows, or delayed notifications—all as a single handler. ## How It Works Unlike traditional job queues where you would schedule separate jobs for each step, DataQueue's wait feature lets you write linear, async code. Under the hood: 1. When a wait is triggered, the handler throws a `WaitSignal` internally 2. The job moves to `'waiting'` status, the handler stops, and **the worker lock is released** 3. After the wait condition is met, the job is re-picked by the processor 4. The handler re-runs from the top, but **completed steps are replayed from cache** > **Note:** Waiting jobs are completely idle — they don't hold a worker lock, don't occupy a concurrency slot, and don't consume any processing resources. You can safely have thousands of jobs waiting in parallel with zero impact on queue throughput. This means your handlers need to use `ctx.run()` to wrap side-effectful work (like sending emails) so it doesn't re-execute on re-invocation. ## Step Tracking with `ctx.run()` `ctx.run()` wraps a step with memoization. Each step is identified by a unique name. If the step was already completed in a previous invocation, the cached result is returned without re-executing the function. ```typescript title="@/lib/job-handlers.ts" const handler = async (payload, signal, ctx) => { // This will only execute once, even if the handler is re-invoked const data = await ctx.run('fetch-data', async () => { return await fetchExternalData(payload.url); }); // This will also only execute once await ctx.run('send-email', async () => { await sendEmail(payload.email, data.subject, data.body); }); }; ``` Step results are persisted to the database after each `ctx.run()` call, ensuring durability even if the handler crashes. **Important**: Step names must be unique within a handler and stable across re-invocations. ## Time-Based Waits ### `ctx.waitFor(duration)` Wait for a specific duration before continuing. ```typescript title="@/lib/job-handlers.ts" const onboardingHandler = async (payload, signal, ctx) => { // Step 1: Send welcome email await ctx.run('send-welcome', async () => { await sendEmail(payload.email, 'Welcome!'); }); // Wait 24 hours await ctx.waitFor({ hours: 24 }); // Step 2: Send follow-up (runs after the wait) await ctx.run('send-followup', async () => { await sendEmail(payload.email, 'How are you finding things?'); }); // Wait 7 days await ctx.waitFor({ days: 7 }); // Step 3: Send survey await ctx.run('send-survey', async () => { await sendEmail(payload.email, 'We would love your feedback!'); }); }; ``` Supported duration fields (additive): | Field | Description | | :-------- | :------------ | | `seconds` | Seconds | | `minutes` | Minutes | | `hours` | Hours | | `days` | Days | | `weeks` | Weeks | | `months` | Months (~30d) | | `years` | Years (~365d) | ### `ctx.waitUntil(date)` Wait until a specific date/time. ```typescript title="@/lib/job-handlers.ts" const handler = async (payload, signal, ctx) => { await ctx.run('prepare', async () => { await prepareReport(); }); // Wait until next Monday at 9am const nextMonday = getNextMonday9AM(); await ctx.waitUntil(nextMonday); await ctx.run('deliver', async () => { await deliverReport(); }); }; ``` ## Token-Based Waits Token waits allow you to pause a job until an external signal—like a human approval, a webhook callback, or another service's response. ### Creating and Waiting for Tokens ```typescript title="@/lib/job-handlers.ts" const approvalHandler = async (payload, signal, ctx) => { // Step 1: Submit for review const token = await ctx.run('create-token', async () => { return await ctx.createToken({ timeout: '48h' }); }); // Notify the reviewer (use ctx.run to avoid re-sending on resume) await ctx.run('notify-reviewer', async () => { await sendSlackMessage( `Please review request ${payload.id}. Token: ${token.id}`, ); }); // Wait for the token to be completed const result = await ctx.waitForToken<{ action: 'approve' | 'reject' }>( token.id, ); if (result.ok) { if (result.output.action === 'approve') { await ctx.run('approve', async () => { await approveRequest(payload.id); }); } else { await ctx.run('reject', async () => { await rejectRequest(payload.id); }); } } else { // Token timed out await ctx.run('timeout', async () => { await escalateRequest(payload.id); }); } }; ``` ### Completing Tokens Externally Tokens can be completed from anywhere—API routes, webhooks, or external services: ```typescript title="@/app/api/approve/route.ts" import { getJobQueue } from '@/lib/queue'; export async function POST(request: Request) { const { tokenId, action } = await request.json(); const jobQueue = getJobQueue(); await jobQueue.completeToken(tokenId, { action }); return Response.json({ success: true }); } ``` ### Token Options ```typescript const token = await ctx.createToken({ timeout: '10m', // Optional: '10s', '5m', '1h', '24h', '7d' tags: ['approval', 'user:123'], // Optional: tags for filtering }); ``` If a timeout is set and the token isn't completed in time, call `jobQueue.expireTimedOutTokens()` periodically (e.g., alongside `reclaimStuckJobs`) to expire tokens and resume waiting jobs: ```typescript title="@/app/api/cron/maintenance/route.ts" export async function GET() { const jobQueue = getJobQueue(); await jobQueue.reclaimStuckJobs(); await jobQueue.expireTimedOutTokens(); return Response.json({ ok: true }); } ``` ### Retrieving Tokens ```typescript const token = await jobQueue.getToken(tokenId); // { id, jobId, status, output, timeoutAt, createdAt, completedAt, tags } ``` ## Backward Compatibility The wait feature is fully backward compatible. Existing handlers that don't use `ctx.run()` or any wait methods will continue to work exactly as before. The new methods are purely additive to the existing `JobContext`. ## Important Notes - **Step names must be stable**: Don't change step names between deployments while jobs are waiting. The handler uses step names to replay cached results. - **Wait counter is position-based**: If you add or remove `waitFor`/`waitUntil`/`waitForToken` calls between deployments while jobs are mid-wait, the counter may mismatch. Either deploy changes when no jobs are in `'waiting'` status, or create a new job type with the updated handler instead of editing the existing one. Existing waiting jobs will continue with the old logic safely. - **Waiting is free**: A waiting job releases its worker lock and concurrency slot. It sits passively in the database until resumed, consuming no processing resources. - **Waiting does not consume attempts**: When a job resumes from a wait, the attempt counter is not incremented. Only real failures count. - **Cancel waiting jobs**: You can cancel a job in `'waiting'` status just like a pending job using `jobQueue.cancelJob(jobId)`. - **`forceKillOnTimeout` limitation**: Wait features (`ctx.run`, `ctx.waitFor`, etc.) are not available when `forceKillOnTimeout` is enabled, since that mode runs handlers in isolated worker threads without database access.