Event Hooks
DataQueue emits real-time events for job lifecycle transitions, progress updates, and internal errors. Use event hooks to integrate with logging, metrics, alerting, or any custom logic without polling.
Event hooks work identically with both the PostgreSQL and Redis backends.
Listening for Events
Register listeners with on(), once(), or remove them with off() and removeAllListeners().
const queue = initJobQueue<MyPayloadMap>(config);
queue.on('job:completed', (event) => {
console.log(`Job ${event.jobId} (${event.jobType}) completed`);
});
queue.on('job:failed', (event) => {
console.error(`Job ${event.jobId} failed: ${event.error.message}`);
if (!event.willRetry) {
alertOps(`Permanent failure for job ${event.jobId}`);
}
});
queue.on('error', (error) => {
logger.error('Queue internal error:', error);
});Available Events
| Event | Payload | When |
|---|---|---|
job:added | { jobId, jobType } | After addJob() or addJobs() |
job:processing | { jobId, jobType } | When a processor claims and starts a job |
job:completed | { jobId, jobType } | When a handler completes successfully |
job:failed | { jobId, jobType, error, willRetry } | When a handler throws or times out |
job:cancelled | { jobId } | After cancelJob() |
job:retried | { jobId } | After retryJob() |
job:waiting | { jobId, jobType } | When a handler enters a wait (ctx.waitFor, ctx.waitUntil, ctx.waitForToken) |
job:progress | { jobId, progress } | When a handler calls ctx.setProgress() |
error | Error | Internal errors from the processor or supervisor |
One-Time Listeners
Use once() when you only need to react to the first occurrence of an event.
queue.once('job:added', (event) => {
console.log('First job added:', event.jobId);
});Removing Listeners
const listener = (event) => console.log(event);
queue.on('job:completed', listener);
// Remove a specific listener
queue.off('job:completed', listener);
// Remove all listeners for one event
queue.removeAllListeners('job:completed');
// Remove all listeners for all events
queue.removeAllListeners();Error Monitoring
The error event fires for internal errors in the processor and supervisor. It works alongside the existing onError callback in ProcessorOptions and SupervisorOptions -- both fire independently.
queue.on('error', (error) => {
Sentry.captureException(error);
});
// onError still works as before
const processor = queue.createProcessor(handlers, {
onError: (error) => console.error('Processor error:', error),
});Failure Retry Detection
The job:failed event includes a willRetry boolean that tells you whether the job will be retried automatically.
queue.on('job:failed', (event) => {
if (event.willRetry) {
metrics.increment('job.retry', { jobType: event.jobType });
} else {
metrics.increment('job.permanent_failure', { jobType: event.jobType });
pagerDuty.alert(`Job ${event.jobId} permanently failed`);
}
});Progress Tracking
The job:progress event fires whenever a handler calls ctx.setProgress(), giving you real-time progress updates.
queue.on('job:progress', (event) => {
websocket.broadcast(`job:${event.jobId}`, { progress: event.progress });
});Events are emitted synchronously after the corresponding database operation
completes. Slow event listeners will delay the return of methods like
addJob() or the processing of the next job. Use async patterns in listeners
if they perform I/O.