DataQueueDataQueue
Usage

Initialize Queue

After defining your job types, payloads, and handlers, you need to initialize the job queue which sets up the connection to your database backend.

PostgreSQL

@lib/queue.ts
import { initJobQueue } from '@nicnocquee/dataqueue';
import { type JobPayloadMap } from './types/job-payload-map';

let jobQueue: ReturnType<typeof initJobQueue<JobPayloadMap>> | null = null;

export const getJobQueue = () => {
  if (!jobQueue) {
    jobQueue = initJobQueue<JobPayloadMap>({
      databaseConfig: {
        connectionString: process.env.PG_DATAQUEUE_DATABASE, // Set this in your environment
      },
      verbose: process.env.NODE_ENV === 'development',
    });
  }
  return jobQueue;
};

The value of connectionString must be a valid Postgres connection string. For example:

PG_DATAQUEUE_DATABASE=postgresql://postgres:password@localhost:5432/my_database?search_path=my_schema

Redis

To use Redis as the backend, set backend: 'redis' and provide redisConfig instead of databaseConfig:

@lib/queue.ts
import { initJobQueue } from '@nicnocquee/dataqueue';
import { type JobPayloadMap } from './types/job-payload-map';

let jobQueue: ReturnType<typeof initJobQueue<JobPayloadMap>> | null = null;

export const getJobQueue = () => {
  if (!jobQueue) {
    jobQueue = initJobQueue<JobPayloadMap>({
      backend: 'redis',
      redisConfig: {
        url: process.env.REDIS_URL, // e.g. redis://localhost:6379
      },
      verbose: process.env.NODE_ENV === 'development',
    });
  }
  return jobQueue;
};

You can also connect using individual connection options instead of a URL:

@lib/queue.ts
jobQueue = initJobQueue<JobPayloadMap>({
  backend: 'redis',
  redisConfig: {
    host: 'localhost',
    port: 6379,
    password: process.env.REDIS_PASSWORD,
    db: 0,
    keyPrefix: 'myapp:', // Optional, defaults to 'dq:'
  },
  verbose: process.env.NODE_ENV === 'development',
});

The keyPrefix option lets you namespace all Redis keys. This is useful when sharing a Redis instance between multiple applications or multiple queues. The default prefix is dq:.


Bring Your Own Pool / Client

Instead of providing connection configuration, you can pass an existing connection instance. This is useful when your application already manages its own connection pool and you want to share it with dataqueue.

PostgreSQL — External Pool

@lib/queue.ts
import { Pool } from 'pg';
import { initJobQueue } from '@nicnocquee/dataqueue';
import { type JobPayloadMap } from './types/job-payload-map';

const pool = new Pool({ connectionString: process.env.DATABASE_URL });

const jobQueue = initJobQueue<JobPayloadMap>({
  pool, 
  verbose: process.env.NODE_ENV === 'development',
});

Redis — External Client

@lib/queue.ts
import IORedis from 'ioredis';
import { initJobQueue } from '@nicnocquee/dataqueue';
import { type JobPayloadMap } from './types/job-payload-map';

const redis = new IORedis(process.env.REDIS_URL);

const jobQueue = initJobQueue<JobPayloadMap>({
  backend: 'redis',
  client: redis, 
  keyPrefix: 'myapp:',
  verbose: process.env.NODE_ENV === 'development',
});

Connection ownership: When you provide your own pool or client, the library will not close it on shutdown. You are responsible for calling pool.end() or client.quit() when your application exits.


Using the Queue

Once initialized, you use the queue instance identically regardless of backend. The API is the same for both PostgreSQL and Redis.

@/app/actions/send-email.ts
import { getJobQueue } from '@/lib/queue';

const sendEmail = async () => {
  const jobQueue = getJobQueue();
  await jobQueue.addJob({
    jobType: 'send_email',
    payload: {
      to: 'test@example.com',
      subject: 'Hello',
      body: 'Hello, world!',
    },
  });
};

SSL Configuration (PostgreSQL)

Most managed Postgres providers (like DigitalOcean, Supabase, etc.) require SSL connections and use their own CA certificate (.crt file) to sign the server's certificate. To securely verify the server's identity, you must configure your client to trust this CA certificate.

You can configure SSL for your database connection in several ways, depending on your environment and security requirements.

Using PEM Strings from Environment Variables

This is ideal for serverless environments where you cannot mount files. Store your CA certificate, and optionally client certificate and key, as environment variables then pass them to the ssl property of the databaseConfig object.

@lib/queue.ts
import { initJobQueue } from '@nicnocquee/dataqueue';
import { type JobPayloadMap } from './types/job-payload-map';

let jobQueue: ReturnType<typeof initJobQueue<JobPayloadMap>> | null = null;

export const getJobQueue = () => {
  if (!jobQueue) {
    jobQueue = initJobQueue<JobPayloadMap>({
      databaseConfig: {
        connectionString: process.env.PG_DATAQUEUE_DATABASE, // Set this in your environment
        ssl: {
          ca: process.env.PGSSLROOTCERT, // PEM string: the content of your .crt file
          cert: process.env.PGSSLCERT, // PEM string (optional, for client authentication)
          key: process.env.PGSSLKEY, // PEM string (optional, for client authentication)
          rejectUnauthorized: true, // Always true for CA-signed certs
        },
      },
      verbose: process.env.NODE_ENV === 'development',
    });
  }
  return jobQueue;
};

When using a custom CA certificate and connectionString, you must remove the sslmode parameter from the connection string. Otherwise, the connection will fail.

Using File Paths

If you have the CA certificate, client certificate, or key on disk, provide their absolute paths using the file:// prefix. Only values starting with file:// will be loaded from the file system; all others are treated as PEM strings.

@lib/queue.ts
import { initJobQueue } from '@nicnocquee/dataqueue';
import { type JobPayloadMap } from './types/job-payload-map';

let jobQueue: ReturnType<typeof initJobQueue<JobPayloadMap>> | null = null;

export const getJobQueue = () => {
  if (!jobQueue) {
    jobQueue = initJobQueue<JobPayloadMap>({
      databaseConfig: {
        connectionString: process.env.PG_DATAQUEUE_DATABASE,
        ssl: {
          ca: 'file:///absolute/path/to/ca.crt', // Path to your provider's CA cert
          cert: 'file:///absolute/path/to/client.crt', // optional, for client authentication
          key: 'file:///absolute/path/to/client.key', // optional, for client authentication
          rejectUnauthorized: true,
        },
      },
      verbose: process.env.NODE_ENV === 'development',
    });
  }
  return jobQueue;
};

When using a custom CA certificate and connectionString, you must remove the sslmode parameter from the connection string. Otherwise, the connection will fail.

Skipping Certificate Validation

For convenience, you can skip certificate validation (not recommended for production) by setting rejectUnauthorized to false and without providing a custom CA certificate.

@lib/queue.ts
import { initJobQueue } from '@nicnocquee/dataqueue';
import { type JobPayloadMap } from './types/job-payload-map';

let jobQueue: ReturnType<typeof initJobQueue<JobPayloadMap>> | null = null;

export const getJobQueue = () => {
  if (!jobQueue) {
    jobQueue = initJobQueue<JobPayloadMap>({
      databaseConfig: {
        connectionString: process.env.PG_DATAQUEUE_DATABASE,
        ssl: {
          rejectUnauthorized: false,
        },
      },
      verbose: process.env.NODE_ENV === 'development',
    });
  }
  return jobQueue;
};

When using rejectUnauthorized: false and connectionString, you must remove the sslmode parameter from the connection string. Otherwise, the connection will fail.


TLS Configuration (Redis)

If your Redis server requires TLS (common with managed services like AWS ElastiCache, Redis Cloud, etc.), provide TLS options in the redisConfig:

@lib/queue.ts
jobQueue = initJobQueue<JobPayloadMap>({
  backend: 'redis',
  redisConfig: {
    url: process.env.REDIS_URL,
    tls: {
      ca: process.env.REDIS_CA_CERT, // PEM string
      rejectUnauthorized: true,
    },
  },
});