DataQueueDataQueue
Usage

Wait

During a job handler you can wait for a period of time, until a specific date, or for an external signal before continuing execution. This is useful for building multi-step workflows like onboarding sequences, approval flows, or delayed notifications—all as a single handler.

How It Works

Unlike traditional job queues where you would schedule separate jobs for each step, DataQueue's wait feature lets you write linear, async code. Under the hood:

  1. When a wait is triggered, the handler throws a WaitSignal internally
  2. The job moves to 'waiting' status and the handler stops
  3. After the wait condition is met, the job is re-picked by the processor
  4. The handler re-runs from the top, but completed steps are replayed from cache

This means your handlers need to use ctx.run() to wrap side-effectful work (like sending emails) so it doesn't re-execute on re-invocation.

Step Tracking with ctx.run()

ctx.run() wraps a step with memoization. Each step is identified by a unique name. If the step was already completed in a previous invocation, the cached result is returned without re-executing the function.

@/lib/job-handlers.ts
const handler = async (payload, signal, ctx) => {
  // This will only execute once, even if the handler is re-invoked
  const data = await ctx.run('fetch-data', async () => {
    return await fetchExternalData(payload.url);
  });

  // This will also only execute once
  await ctx.run('send-email', async () => {
    await sendEmail(payload.email, data.subject, data.body);
  });
};

Step results are persisted to the database after each ctx.run() call, ensuring durability even if the handler crashes.

Important: Step names must be unique within a handler and stable across re-invocations.

Time-Based Waits

ctx.waitFor(duration)

Wait for a specific duration before continuing.

@/lib/job-handlers.ts
const onboardingHandler = async (payload, signal, ctx) => {
  // Step 1: Send welcome email
  await ctx.run('send-welcome', async () => {
    await sendEmail(payload.email, 'Welcome!');
  });

  // Wait 24 hours
  await ctx.waitFor({ hours: 24 });

  // Step 2: Send follow-up (runs after the wait)
  await ctx.run('send-followup', async () => {
    await sendEmail(payload.email, 'How are you finding things?');
  });

  // Wait 7 days
  await ctx.waitFor({ days: 7 });

  // Step 3: Send survey
  await ctx.run('send-survey', async () => {
    await sendEmail(payload.email, 'We would love your feedback!');
  });
};

Supported duration fields (additive):

FieldDescription
secondsSeconds
minutesMinutes
hoursHours
daysDays
weeksWeeks
monthsMonths (~30d)
yearsYears (~365d)

ctx.waitUntil(date)

Wait until a specific date/time.

@/lib/job-handlers.ts
const handler = async (payload, signal, ctx) => {
  await ctx.run('prepare', async () => {
    await prepareReport();
  });

  // Wait until next Monday at 9am
  const nextMonday = getNextMonday9AM();
  await ctx.waitUntil(nextMonday);

  await ctx.run('deliver', async () => {
    await deliverReport();
  });
};

Token-Based Waits

Token waits allow you to pause a job until an external signal—like a human approval, a webhook callback, or another service's response.

Creating and Waiting for Tokens

@/lib/job-handlers.ts
const approvalHandler = async (payload, signal, ctx) => {
  // Step 1: Submit for review
  const token = await ctx.run('create-token', async () => {
    return await ctx.createToken({ timeout: '48h' });
  });

  // Notify the reviewer (use ctx.run to avoid re-sending on resume)
  await ctx.run('notify-reviewer', async () => {
    await sendSlackMessage(
      `Please review request ${payload.id}. Token: ${token.id}`,
    );
  });

  // Wait for the token to be completed
  const result = await ctx.waitForToken<{ action: 'approve' | 'reject' }>(
    token.id,
  );

  if (result.ok) {
    if (result.output.action === 'approve') {
      await ctx.run('approve', async () => {
        await approveRequest(payload.id);
      });
    } else {
      await ctx.run('reject', async () => {
        await rejectRequest(payload.id);
      });
    }
  } else {
    // Token timed out
    await ctx.run('timeout', async () => {
      await escalateRequest(payload.id);
    });
  }
};

Completing Tokens Externally

Tokens can be completed from anywhere—API routes, webhooks, or external services:

@/app/api/approve/route.ts
import { getJobQueue } from '@/lib/queue';

export async function POST(request: Request) {
  const { tokenId, action } = await request.json();
  const jobQueue = getJobQueue();

  await jobQueue.completeToken(tokenId, { action });

  return Response.json({ success: true });
}

Token Options

const token = await ctx.createToken({
  timeout: '10m', // Optional: '10s', '5m', '1h', '24h', '7d'
  tags: ['approval', 'user:123'], // Optional: tags for filtering
});

If a timeout is set and the token isn't completed in time, call jobQueue.expireTimedOutTokens() periodically (e.g., alongside reclaimStuckJobs) to expire tokens and resume waiting jobs:

@/app/api/cron/maintenance/route.ts
export async function GET() {
  const jobQueue = getJobQueue();
  await jobQueue.reclaimStuckJobs();
  await jobQueue.expireTimedOutTokens();
  return Response.json({ ok: true });
}

Retrieving Tokens

const token = await jobQueue.getToken(tokenId);
// { id, jobId, status, output, timeoutAt, createdAt, completedAt, tags }

Backward Compatibility

The wait feature is fully backward compatible. Existing handlers that don't use ctx.run() or any wait methods will continue to work exactly as before. The new methods are purely additive to the existing JobContext.

Important Notes

  • Step names must be stable: Don't change step names between deployments while jobs are waiting. The handler uses step names to replay cached results.
  • Wait counter is position-based: If you add or remove waitFor/waitUntil/waitForToken calls between deployments while jobs are mid-wait, the counter may mismatch. Either deploy changes when no jobs are in 'waiting' status, or create a new job type with the updated handler instead of editing the existing one. Existing waiting jobs will continue with the old logic safely.
  • Waiting does not consume attempts: When a job resumes from a wait, the attempt counter is not incremented. Only real failures count.
  • Cancel waiting jobs: You can cancel a job in 'waiting' status just like a pending job using jobQueue.cancelJob(jobId).
  • forceKillOnTimeout limitation: Wait features (ctx.run, ctx.waitFor, etc.) are not available when forceKillOnTimeout is enabled, since that mode runs handlers in isolated worker threads without database access.