Wait
During a job handler you can wait for a period of time, until a specific date, or for an external signal before continuing execution. This is useful for building multi-step workflows like onboarding sequences, approval flows, or delayed notifications—all as a single handler.
How It Works
Unlike traditional job queues where you would schedule separate jobs for each step, DataQueue's wait feature lets you write linear, async code. Under the hood:
- When a wait is triggered, the handler throws a
WaitSignalinternally - The job moves to
'waiting'status and the handler stops - After the wait condition is met, the job is re-picked by the processor
- The handler re-runs from the top, but completed steps are replayed from cache
This means your handlers need to use ctx.run() to wrap side-effectful work (like sending emails) so it doesn't re-execute on re-invocation.
Step Tracking with ctx.run()
ctx.run() wraps a step with memoization. Each step is identified by a unique name. If the step was already completed in a previous invocation, the cached result is returned without re-executing the function.
const handler = async (payload, signal, ctx) => {
// This will only execute once, even if the handler is re-invoked
const data = await ctx.run('fetch-data', async () => {
return await fetchExternalData(payload.url);
});
// This will also only execute once
await ctx.run('send-email', async () => {
await sendEmail(payload.email, data.subject, data.body);
});
};Step results are persisted to the database after each ctx.run() call, ensuring durability even if the handler crashes.
Important: Step names must be unique within a handler and stable across re-invocations.
Time-Based Waits
ctx.waitFor(duration)
Wait for a specific duration before continuing.
const onboardingHandler = async (payload, signal, ctx) => {
// Step 1: Send welcome email
await ctx.run('send-welcome', async () => {
await sendEmail(payload.email, 'Welcome!');
});
// Wait 24 hours
await ctx.waitFor({ hours: 24 });
// Step 2: Send follow-up (runs after the wait)
await ctx.run('send-followup', async () => {
await sendEmail(payload.email, 'How are you finding things?');
});
// Wait 7 days
await ctx.waitFor({ days: 7 });
// Step 3: Send survey
await ctx.run('send-survey', async () => {
await sendEmail(payload.email, 'We would love your feedback!');
});
};Supported duration fields (additive):
| Field | Description |
|---|---|
seconds | Seconds |
minutes | Minutes |
hours | Hours |
days | Days |
weeks | Weeks |
months | Months (~30d) |
years | Years (~365d) |
ctx.waitUntil(date)
Wait until a specific date/time.
const handler = async (payload, signal, ctx) => {
await ctx.run('prepare', async () => {
await prepareReport();
});
// Wait until next Monday at 9am
const nextMonday = getNextMonday9AM();
await ctx.waitUntil(nextMonday);
await ctx.run('deliver', async () => {
await deliverReport();
});
};Token-Based Waits
Token waits allow you to pause a job until an external signal—like a human approval, a webhook callback, or another service's response.
Creating and Waiting for Tokens
const approvalHandler = async (payload, signal, ctx) => {
// Step 1: Submit for review
const token = await ctx.run('create-token', async () => {
return await ctx.createToken({ timeout: '48h' });
});
// Notify the reviewer (use ctx.run to avoid re-sending on resume)
await ctx.run('notify-reviewer', async () => {
await sendSlackMessage(
`Please review request ${payload.id}. Token: ${token.id}`,
);
});
// Wait for the token to be completed
const result = await ctx.waitForToken<{ action: 'approve' | 'reject' }>(
token.id,
);
if (result.ok) {
if (result.output.action === 'approve') {
await ctx.run('approve', async () => {
await approveRequest(payload.id);
});
} else {
await ctx.run('reject', async () => {
await rejectRequest(payload.id);
});
}
} else {
// Token timed out
await ctx.run('timeout', async () => {
await escalateRequest(payload.id);
});
}
};Completing Tokens Externally
Tokens can be completed from anywhere—API routes, webhooks, or external services:
import { getJobQueue } from '@/lib/queue';
export async function POST(request: Request) {
const { tokenId, action } = await request.json();
const jobQueue = getJobQueue();
await jobQueue.completeToken(tokenId, { action });
return Response.json({ success: true });
}Token Options
const token = await ctx.createToken({
timeout: '10m', // Optional: '10s', '5m', '1h', '24h', '7d'
tags: ['approval', 'user:123'], // Optional: tags for filtering
});If a timeout is set and the token isn't completed in time, call jobQueue.expireTimedOutTokens() periodically (e.g., alongside reclaimStuckJobs) to expire tokens and resume waiting jobs:
export async function GET() {
const jobQueue = getJobQueue();
await jobQueue.reclaimStuckJobs();
await jobQueue.expireTimedOutTokens();
return Response.json({ ok: true });
}Retrieving Tokens
const token = await jobQueue.getToken(tokenId);
// { id, jobId, status, output, timeoutAt, createdAt, completedAt, tags }Backward Compatibility
The wait feature is fully backward compatible. Existing handlers that don't use ctx.run() or any wait methods will continue to work exactly as before. The new methods are purely additive to the existing JobContext.
Important Notes
- Step names must be stable: Don't change step names between deployments while jobs are waiting. The handler uses step names to replay cached results.
- Wait counter is position-based: If you add or remove
waitFor/waitUntil/waitForTokencalls between deployments while jobs are mid-wait, the counter may mismatch. Either deploy changes when no jobs are in'waiting'status, or create a new job type with the updated handler instead of editing the existing one. Existing waiting jobs will continue with the old logic safely. - Waiting does not consume attempts: When a job resumes from a wait, the attempt counter is not incremented. Only real failures count.
- Cancel waiting jobs: You can cancel a job in
'waiting'status just like a pending job usingjobQueue.cancelJob(jobId). forceKillOnTimeoutlimitation: Wait features (ctx.run,ctx.waitFor, etc.) are not available whenforceKillOnTimeoutis enabled, since that mode runs handlers in isolated worker threads without database access.