DataQueueDataQueue
Usage

Process Jobs

So far, we haven't actually performed any jobs—we've only added them to the queue. Now, let's process those jobs.

In a serverless environment, we can't have a long-running process that constantly monitors and processes the queue.

Instead, we create an API endpoint that checks the queue and processes jobs in batches. This endpoint is then triggered by a cron job. For example, you can create an API endpoint at app/api/cron/process to process jobs in batches:

@/app/api/cron/process.ts
import { jobHandlers } from '@/lib/job-handler';
import { getJobQueue } from '@/lib/queue';
import { NextResponse } from 'next/server';

export async function GET(request: Request) {
  // Secure the cron route: https://vercel.com/docs/cron-jobs/manage-cron-jobs#securing-cron-jobs
  const authHeader = request.headers.get('authorization');
  if (authHeader !== `Bearer ${process.env.CRON_SECRET}`) {
    return NextResponse.json({ message: 'Unauthorized' }, { status: 401 });
  }

  try {
    const jobQueue = getJobQueue();

    // Control how many jobs are processed in parallel per batch using the `concurrency` option.
    // For example, to process up to 3 jobs in parallel per batch:
    const processor = jobQueue.createProcessor(jobHandlers, {
      workerId: `cron-${Date.now()}`,
      batchSize: 10, // up to 10 jobs per batch
      concurrency: 3, // up to 3 jobs processed in parallel
      verbose: true,
    });

    const processed = await processor.start();

    return NextResponse.json({
      message: 'Job processing completed',
      processed,
    });
  } catch (error) {
    console.error('Error processing jobs:', error);
    return NextResponse.json(
      { message: 'Failed to process jobs' },
      { status: 500 },
    );
  }
}

In the example above, we use the createProcessor method to create a processor. When you call the processor's start function, it processes jobs in the queue up to the batchSize limit.

Batch Size

Serverless platforms like Vercel limit how long a function can run. If you set batchSize too high, the function might run too long and get killed. Choose a batchSize that fits your use case.

You can also process only certain job types by setting the jobType option. If a job type is more resource-intensive, use a lower batchSize for that type.

For example, you can define two endpoints: one for low-resource jobs and another for high-resource jobs, each with different batchSize and concurrency values.

Concurrency

Some jobs are resource-intensive, like image processing, LLM calls, or calling a rate-limited external service. In these cases, set the concurrency option to control how many jobs run in parallel per batch.

The default is 3. Set it to 1 to process jobs one at a time. Use a lower value to avoid exhausting resources in constrained environments.

Triggering the Processor via Cron

Defining an endpoint isn't enough—you need to trigger it regularly. For example, use Vercel cron to trigger the endpoint every minute by adding this to your vercel.json:

vercel.json
{
  "$schema": "https://openapi.vercel.sh/vercel.json",
  "crons": [
    {
      "path": "/api/cron/process",
      "schedule": "* * * * *"
    }
  ]
}

For Vercel cron, set the CRON_SECRET environment variable, as it's sent in the authorization header. If you use a different cron service, set the authorization header to the value of CRON_SECRET:

Authorization: Bearer <VALUE_OF_CRON_SECRET>