DataQueueDataQueue
Usage

Add Job

You can add jobs to the queue from your application logic, such as in a server function:

@/app/actions/send-email.ts
'use server';

import { getJobQueue } from '@/lib/queue';
import { revalidatePath } from 'next/cache';

export const sendEmail = async ({
  name,
  email,
}: {
  name: string;
  email: string;
}) => {
  // Add a welcome email job
  const jobQueue = getJobQueue(); 
  try {
    const runAt = new Date(Date.now() + 5 * 1000); // Run 5 seconds from now
    const job = await jobQueue.addJob({
      jobType: 'send_email',
      payload: {
        to: email,
        subject: 'Welcome to our platform!',
        body: `Hi ${name}, welcome to our platform!`,
      },
      priority: 10, // Higher number = higher priority
      runAt: runAt,
      tags: ['welcome', 'user'], // Add tags for grouping/searching
    });

    revalidatePath('/');
    return { job };
  } catch (error) {
    console.error('Error adding job:', error);
    throw error;
  }
};

In the example above, a job is added to the queue to send an email. The job type is send_email, and the payload includes the recipient's email, subject, and body.

When adding a job, you can set its priority, schedule when it should run using runAt, and specify a timeout in milliseconds with timeoutMs.

You can also add tags (an array of strings) to group, search, or batch jobs by category. See Tags for more details.

Batch Insert

When you need to enqueue many jobs at once, use addJobs instead of calling addJob in a loop. It batches the inserts into a single database round-trip (PostgreSQL) or a single atomic Lua script (Redis), which is significantly faster.

@/app/actions/send-bulk.ts
'use server';

import { getJobQueue } from '@/lib/queue';

export const sendBulkEmails = async (
  recipients: { email: string; name: string }[],
) => {
  const jobQueue = getJobQueue();
  const jobIds = await jobQueue.addJobs(
    recipients.map((r) => ({
      jobType: 'send_email' as const,
      payload: {
        to: r.email,
        subject: 'Newsletter',
        body: `Hi ${r.name}, here's your update!`,
      },
      tags: ['newsletter'],
    })),
  );
  // jobIds[i] corresponds to recipients[i]
  return { jobIds };
};

addJobs returns an array of job IDs in the same order as the input array. Each job can independently have its own priority, runAt, tags, idempotencyKey, and other options.

  • Empty array: addJobs([]) returns [] immediately without touching the database.
  • Idempotency: Each job's idempotencyKey is handled independently. Duplicate keys resolve to the existing job's ID.
  • Transactional: The { db } option works with addJobs the same way as addJob (PostgreSQL only).

Idempotency

You can provide an idempotencyKey when adding a job to prevent duplicate jobs. If a job with the same key already exists in the queue, addJob returns the existing job's ID instead of creating a new one.

This is useful for preventing duplicates caused by retries, double-clicks, webhook replays, or serverless function re-invocations.

@/app/actions/send-welcome.ts
'use server';

import { getJobQueue } from '@/lib/queue';

export const sendWelcomeEmail = async (userId: string, email: string) => {
  const jobQueue = getJobQueue();
  const jobId = await jobQueue.addJob({
    jobType: 'send_email',
    payload: {
      to: email,
      subject: 'Welcome!',
      body: `Welcome to our platform!`,
    },
    idempotencyKey: `welcome-email-${userId}`, // prevents duplicate welcome emails
  });

  return { jobId };
};

In the example above, calling sendWelcomeEmail multiple times for the same userId will only create one job. Subsequent calls return the existing job's ID.

Behavior

  • No key provided: Works exactly as before, no uniqueness check is performed.
  • Key provided, no conflict: The job is inserted and its new ID is returned.
  • Key provided, conflict: The existing job's ID is returned. The existing job is not updated.
  • Scope: The key is unique across the entire job_queue table regardless of job status. Once a key exists, it cannot be reused until the job is cleaned up via cleanupOldJobs.

Transactional Job Creation

Transactional job creation is only available with the PostgreSQL backend.

You can insert a job within an existing database transaction by passing an external database client via the db option. This guarantees that the job is enqueued atomically with your other database writes — if the transaction rolls back, the job is never enqueued.

This is useful when you need to ensure that a job is only created when a related database operation succeeds (e.g., creating a user and enqueuing a welcome email in the same transaction).

@/app/actions/register.ts
'use server';

import { Pool } from 'pg';
import { getJobQueue } from '@/lib/queue';

const pool = new Pool({ connectionString: process.env.DATABASE_URL });

export const registerUser = async (email: string, name: string) => {
  const client = await pool.connect();
  try {
    await client.query('BEGIN');

    // Insert the user
    await client.query('INSERT INTO users (email, name) VALUES ($1, $2)', [
      email,
      name,
    ]);

    // Enqueue the welcome email in the same transaction
    const jobQueue = getJobQueue();
    await jobQueue.addJob(
      {
        jobType: 'send_email',
        payload: { to: email, subject: 'Welcome!', body: `Hi ${name}!` },
      },
      { db: client }, // Use the transaction client
    );

    await client.query('COMMIT');
  } catch (error) {
    await client.query('ROLLBACK');
    throw error;
  } finally {
    client.release();
  }
};

How it works

  • When db is provided, the INSERT into the job_queue table and the associated job event are both executed on the supplied client.
  • The library does not call client.release() — you are responsible for managing the client lifecycle.
  • If the transaction is rolled back, both the job and its event are discarded.
  • When db is not provided, addJob behaves exactly as before (gets a connection from the internal pool).