Process Jobs
So far, we haven't actually performed any jobs—we've only added them to the queue. Now, let's process those jobs.
In a serverless environment, we can't have a long-running process that constantly monitors and processes the queue.
Instead, we create an API endpoint that checks the queue and processes jobs in batches. This endpoint is then triggered by a cron job. For example, you can create an API endpoint at app/api/cron/process
to process jobs in batches:
import { jobHandlers } from '@/lib/job-handler';
import { getJobQueue } from '@/lib/queue';
import { NextResponse } from 'next/server';
export async function GET(request: Request) {
// Secure the cron route: https://vercel.com/docs/cron-jobs/manage-cron-jobs#securing-cron-jobs
const authHeader = request.headers.get('authorization');
if (authHeader !== `Bearer ${process.env.CRON_SECRET}`) {
return NextResponse.json({ message: 'Unauthorized' }, { status: 401 });
}
try {
const jobQueue = getJobQueue();
// Control how many jobs are processed in parallel per batch using the `concurrency` option.
// For example, to process up to 3 jobs in parallel per batch:
const processor = jobQueue.createProcessor(jobHandlers, {
workerId: `cron-${Date.now()}`,
batchSize: 10, // up to 10 jobs per batch
concurrency: 3, // up to 3 jobs processed in parallel
verbose: true,
});
const processed = await processor.start();
return NextResponse.json({
message: 'Job processing completed',
processed,
});
} catch (error) {
console.error('Error processing jobs:', error);
return NextResponse.json(
{ message: 'Failed to process jobs' },
{ status: 500 },
);
}
}
In the example above, we use the createProcessor
method to create a processor. When you call the processor's start
function, it processes jobs in the queue up to the batchSize
limit.
Batch Size
Serverless platforms like Vercel limit how long a function can run. If you set batchSize
too high, the function might run too long and get killed. Choose a batchSize
that fits your use case.
You can also process only certain job types by setting the jobType
option. If a job type is more resource-intensive, use a lower batchSize
for that type.
For example, you can define two endpoints: one for low-resource jobs and another for high-resource jobs, each with different batchSize
and concurrency
values.
Concurrency
Some jobs are resource-intensive, like image processing, LLM calls, or calling a rate-limited external service. In these cases, set the concurrency
option to control how many jobs run in parallel per batch.
The default is 3
. Set it to 1
to process jobs one at a time. Use a lower value to avoid exhausting resources in constrained environments.
Triggering the Processor via Cron
Defining an endpoint isn't enough—you need to trigger it regularly. For example, use Vercel cron to trigger the endpoint every minute by adding this to your vercel.json
:
{
"$schema": "https://openapi.vercel.sh/vercel.json",
"crons": [
{
"path": "/api/cron/process",
"schedule": "* * * * *"
}
]
}
For Vercel cron, set the CRON_SECRET
environment variable, as it's sent in the authorization
header. If you use a different cron service, set the authorization
header to the value of CRON_SECRET
:
Authorization: Bearer <VALUE_OF_CRON_SECRET>