DataQueueDataQueue
Usage

Cron Jobs (Recurring Schedules)

Define recurring jobs that automatically enqueue on a cron schedule.

DataQueue supports recurring cron schedules. Define a schedule with a cron expression, and the processor will automatically enqueue job instances before each batch — no extra code required.

Add a Cron Schedule

@/app/api/cron-schedules/route.ts
import { NextRequest, NextResponse } from 'next/server';
import { getJobQueue } from '@/lib/queue';

export async function POST(request: NextRequest) {
  const jobQueue = getJobQueue();
  const id = await jobQueue.addCronJob({
    scheduleName: 'daily-report', // must be unique!
    cronExpression: '0 9 * * *', // every day at 9:00 AM
    jobType: 'generate_report',
    payload: { reportId: 'daily', userId: 'system' },
    timezone: 'America/New_York', // default: 'UTC'
  });
  return NextResponse.json({ id });
}

Options

OptionTypeDefaultDescription
scheduleNamestringrequiredUnique name for the schedule
cronExpressionstringrequiredStandard 5-field cron expression
jobTypestringrequiredJob type from your PayloadMap
payloadobjectrequiredPayload for each job instance
timezonestring'UTC'IANA timezone for cron evaluation
allowOverlapbooleanfalseAllow new instance while previous is still running
maxAttemptsnumber3Max retry attempts per job instance
prioritynumber0Priority for each job instance
timeoutMsnumberTimeout per job instance
tagsstring[]Tags for each job instance

Automatic Enqueueing

When you call processor.start() or processor.startInBackground(), DataQueue automatically checks all active cron schedules and enqueues jobs whose next run time has passed — before processing the batch.

@/app/api/cron/process/route.ts
import { NextRequest, NextResponse } from 'next/server';
import { getJobQueue, jobHandlers } from '@/lib/queue';

export async function GET(request: NextRequest) {
  const authHeader = request.headers.get('authorization');
  if (authHeader !== `Bearer ${process.env.CRON_SECRET}`) {
    return NextResponse.json({ error: 'Unauthorized' }, { status: 401 });
  }

  const jobQueue = getJobQueue();
  // Cron jobs are automatically enqueued before each batch
  const processor = jobQueue.createProcessor(jobHandlers, {
    batchSize: 10,
    concurrency: 3,
  });
  const processed = await processor.start();

  return NextResponse.json({ processed });
}

Vercel Cron Example

vercel.json
{
  "crons": [
    {
      "path": "/api/cron/process",
      "schedule": "* * * * *"
    }
  ]
}

Manual Trigger

If you need to enqueue due cron jobs outside the processor (e.g., in tests or one-off scripts), you can still call enqueueDueCronJobs() directly:

const enqueued = await jobQueue.enqueueDueCronJobs();

Overlap Protection

By default, allowOverlap is false. This means if a previous job instance from the same schedule is still pending, processing, or waiting, a new instance will not be enqueued — even if the cron expression says it's time.

// Allow overlapping instances (e.g., for idempotent jobs)
await jobQueue.addCronJob({
  scheduleName: 'heartbeat',
  cronExpression: '* * * * *',
  jobType: 'send_email',
  payload: { to: 'admin@example.com', subject: 'heartbeat', body: 'ping' },
  allowOverlap: true,
});

Manage Schedules

Pause and Resume

// Pause — skipped during automatic enqueueing
await jobQueue.pauseCronJob(scheduleId);

// Resume
await jobQueue.resumeCronJob(scheduleId);

Edit a Schedule

await jobQueue.editCronJob(scheduleId, {
  cronExpression: '0 */2 * * *', // change to every 2 hours
  payload: { reportId: 'bi-hourly', userId: 'system' },
});

When cronExpression or timezone changes, nextRunAt is automatically recalculated.

Remove a Schedule

// Deletes the schedule definition. Already-enqueued jobs are not cancelled.
await jobQueue.removeCronJob(scheduleId);

List and Query

// List all schedules
const all = await jobQueue.listCronJobs();

// List only active / paused
const active = await jobQueue.listCronJobs('active');
const paused = await jobQueue.listCronJobs('paused');

// Get by ID or name
const byId = await jobQueue.getCronJob(id);
const byName = await jobQueue.getCronJobByName('daily-report');

Database Migration

The cron feature requires the cron_schedules table. Run the DataQueue migrations to create it:

npx dataqueue-cli migrate up

If you're already using DataQueue, just run migrations again — the new table will be added alongside existing ones.