DataQueueDataQueue
Usage

Initialize Queue

After defining your job types, payloads, and handlers, you need to initialize the job queue which basically sets up the connection pool to the Postgres database.

@lib/queue.ts
import { initJobQueue } from '@nicnocquee/dataqueue';
import { type JobPayloadMap } from './types/job-payload-map';

let jobQueue: ReturnType<typeof initJobQueue<JobPayloadMap>> | null = null;

export const getJobQueue = () => {
  if (!jobQueue) {
    jobQueue = initJobQueue<JobPayloadMap>({
      databaseConfig: {
        connectionString: process.env.PG_DATAQUEUE_DATABASE, // Set this in your environment
      },
      verbose: process.env.NODE_ENV === 'development',
    });
  }
  return jobQueue;
};

The value of connectionString must be a valid Postgres connection string. For example:

PG_DATAQUEUE_DATABASE=postgresql://postgres:password@localhost:5432/my_database?search_path=my_schema

You can now use this queue instance throughout your app to add jobs, process jobs, and more.

@/app/actions/send-email.ts
import { getJobQueue } from '@/lib/queue';

const sendEmail = async () => {
  const jobQueue = getJobQueue();
  await jobQueue.addJob({
    jobType: 'send_email',
    payload: {
      to: 'test@example.com',
      subject: 'Hello',
      body: 'Hello, world!',
    },
  });
};

SSL Configuration

Most managed Postgres providers (like DigitalOcean, Supabase, etc.) require SSL connections and use their own CA certificate (.crt file) to sign the server’s certificate. To securely verify the server’s identity, you must configure your client to trust this CA certificate.

You can configure SSL for your database connection in several ways, depending on your environment and security requirements.

Using PEM Strings from Environment Variables

This is ideal for serverless environments where you cannot mount files. Store your CA certificate, and optionally client certificate and key, as environment variables then pass them to the ssl property of the databaseConfig object.

@lib/queue.ts
import { initJobQueue } from '@nicnocquee/dataqueue';
import { type JobPayloadMap } from './types/job-payload-map';

let jobQueue: ReturnType<typeof initJobQueue<JobPayloadMap>> | null = null;

export const getJobQueue = () => {
  if (!jobQueue) {
    jobQueue = initJobQueue<JobPayloadMap>({
      databaseConfig: {
        connectionString: process.env.PG_DATAQUEUE_DATABASE, // Set this in your environment
        ssl: {
          ca: process.env.PGSSLROOTCERT, // PEM string: the content of your .crt file
          cert: process.env.PGSSLCERT, // PEM string (optional, for client authentication)
          key: process.env.PGSSLKEY, // PEM string (optional, for client authentication)
          rejectUnauthorized: true, // Always true for CA-signed certs
        },
      },
      verbose: process.env.NODE_ENV === 'development',
    });
  }
  return jobQueue;
};

When using a custom CA certificate and connectionString, you must remove the sslmode parameter from the connection string. Otherwise, the connection will fail.

Using File Paths

If you have the CA certificate, client certificate, or key on disk, provide their absolute paths using the file:// prefix. Only values starting with file:// will be loaded from the file system; all others are treated as PEM strings.

@lib/queue.ts
import { initJobQueue } from '@nicnocquee/dataqueue';
import { type JobPayloadMap } from './types/job-payload-map';

let jobQueue: ReturnType<typeof initJobQueue<JobPayloadMap>> | null = null;

export const getJobQueue = () => {
  if (!jobQueue) {
    jobQueue = initJobQueue<JobPayloadMap>({
      databaseConfig: {
        connectionString: process.env.PG_DATAQUEUE_DATABASE,
        ssl: {
          ca: 'file:///absolute/path/to/ca.crt', // Path to your provider's CA cert
          cert: 'file:///absolute/path/to/client.crt', // optional, for client authentication
          key: 'file:///absolute/path/to/client.key', // optional, for client authentication
          rejectUnauthorized: true,
        },
      },
      verbose: process.env.NODE_ENV === 'development',
    });
  }
  return jobQueue;
};

When using a custom CA certificate and connectionString, you must remove the sslmode parameter from the connection string. Otherwise, the connection will fail.

Skipping Certificate Validation

For convenience, you can skip certificate validation (not recommended for production) by setting rejectUnauthorized to false and without providing a custom CA certificate.

@lib/queue.ts
import { initJobQueue } from '@nicnocquee/dataqueue';
import { type JobPayloadMap } from './types/job-payload-map';

let jobQueue: ReturnType<typeof initJobQueue<JobPayloadMap>> | null = null;

export const getJobQueue = () => {
  if (!jobQueue) {
    jobQueue = initJobQueue<JobPayloadMap>({
      databaseConfig: {
        connectionString: process.env.PG_DATAQUEUE_DATABASE,
        ssl: {
          rejectUnauthorized: false,
        },
      },
      verbose: process.env.NODE_ENV === 'development',
    });
  }
  return jobQueue;
};

When using rejectUnauthorized: false and connectionString, you must remove the sslmode parameter from the connection string. Otherwise, the connection will fail.