Initialize Queue
After defining your job types, payloads, and handlers, you need to initialize the job queue which basically sets up the connection pool to the Postgres database.
import { initJobQueue } from '@nicnocquee/dataqueue';
import { type JobPayloadMap } from './types/job-payload-map';
let jobQueue: ReturnType<typeof initJobQueue<JobPayloadMap>> | null = null;
export const getJobQueue = () => {
if (!jobQueue) {
jobQueue = initJobQueue<JobPayloadMap>({
databaseConfig: {
connectionString: process.env.PG_DATAQUEUE_DATABASE, // Set this in your environment
},
verbose: process.env.NODE_ENV === 'development',
});
}
return jobQueue;
};
The value of connectionString
must be a valid Postgres connection
string.
For example:
PG_DATAQUEUE_DATABASE=postgresql://postgres:password@localhost:5432/my_database?search_path=my_schema
You can now use this queue instance throughout your app to add jobs, process jobs, and more.
import { getJobQueue } from '@/lib/queue';
const sendEmail = async () => {
const jobQueue = getJobQueue();
await jobQueue.addJob({
jobType: 'send_email',
payload: {
to: 'test@example.com',
subject: 'Hello',
body: 'Hello, world!',
},
});
};
SSL Configuration
Most managed Postgres providers (like DigitalOcean, Supabase, etc.) require SSL connections and use their own CA certificate (.crt file) to sign the server’s certificate. To securely verify the server’s identity, you must configure your client to trust this CA certificate.
You can configure SSL for your database connection in several ways, depending on your environment and security requirements.
Using PEM Strings from Environment Variables
This is ideal for serverless environments where you cannot mount files. Store your CA certificate, and optionally client certificate and key, as environment variables then pass them to the ssl
property of the databaseConfig
object.
import { initJobQueue } from '@nicnocquee/dataqueue';
import { type JobPayloadMap } from './types/job-payload-map';
let jobQueue: ReturnType<typeof initJobQueue<JobPayloadMap>> | null = null;
export const getJobQueue = () => {
if (!jobQueue) {
jobQueue = initJobQueue<JobPayloadMap>({
databaseConfig: {
connectionString: process.env.PG_DATAQUEUE_DATABASE, // Set this in your environment
ssl: {
ca: process.env.PGSSLROOTCERT, // PEM string: the content of your .crt file
cert: process.env.PGSSLCERT, // PEM string (optional, for client authentication)
key: process.env.PGSSLKEY, // PEM string (optional, for client authentication)
rejectUnauthorized: true, // Always true for CA-signed certs
},
},
verbose: process.env.NODE_ENV === 'development',
});
}
return jobQueue;
};
When using a custom CA certificate and connectionString
, you must remove the
sslmode
parameter from the connection string. Otherwise, the connection will
fail.
Using File Paths
If you have the CA certificate, client certificate, or key on disk, provide their absolute paths using the file://
prefix. Only values starting with file://
will be loaded from the file system; all others are treated as PEM strings.
import { initJobQueue } from '@nicnocquee/dataqueue';
import { type JobPayloadMap } from './types/job-payload-map';
let jobQueue: ReturnType<typeof initJobQueue<JobPayloadMap>> | null = null;
export const getJobQueue = () => {
if (!jobQueue) {
jobQueue = initJobQueue<JobPayloadMap>({
databaseConfig: {
connectionString: process.env.PG_DATAQUEUE_DATABASE,
ssl: {
ca: 'file:///absolute/path/to/ca.crt', // Path to your provider's CA cert
cert: 'file:///absolute/path/to/client.crt', // optional, for client authentication
key: 'file:///absolute/path/to/client.key', // optional, for client authentication
rejectUnauthorized: true,
},
},
verbose: process.env.NODE_ENV === 'development',
});
}
return jobQueue;
};
When using a custom CA certificate and connectionString
, you must remove the
sslmode
parameter from the connection string. Otherwise, the connection will
fail.
Skipping Certificate Validation
For convenience, you can skip certificate validation (not recommended for production) by setting rejectUnauthorized
to false
and without providing a custom CA certificate.
import { initJobQueue } from '@nicnocquee/dataqueue';
import { type JobPayloadMap } from './types/job-payload-map';
let jobQueue: ReturnType<typeof initJobQueue<JobPayloadMap>> | null = null;
export const getJobQueue = () => {
if (!jobQueue) {
jobQueue = initJobQueue<JobPayloadMap>({
databaseConfig: {
connectionString: process.env.PG_DATAQUEUE_DATABASE,
ssl: {
rejectUnauthorized: false,
},
},
verbose: process.env.NODE_ENV === 'development',
});
}
return jobQueue;
};
When using rejectUnauthorized: false
and connectionString
, you must remove
the sslmode
parameter from the connection string. Otherwise, the connection
will fail.