@fazpi-ai/qtask v1.0.0
QTask
A Redis-based queue processing library for Node.js, designed for efficient background job processing.
Overview
QTask is a lightweight, flexible job queue system built on Redis. It provides a simple yet powerful way to handle asynchronous tasks in your Node.js applications. With QTask, you can easily distribute workloads across multiple processes or servers, prioritize jobs, delay execution, and set time-to-live for tasks.
Key Features
- Redis-backed persistence: Reliable storage of jobs even if your application crashes
- Publisher/Subscriber model: Clear separation between job producers and consumers
- Job grouping: Organize jobs by queues and groups for better management
- Priority queuing: Process important jobs first
- Delayed execution: Schedule jobs to run at a specific time
- TTL support: Automatically expire jobs after a certain period
- TypeScript support: Full type definitions for a better development experience
- Connection pooling: Efficient Redis connection management
- Atomic operations: Uses Redis Lua scripts for reliable job processing
- Graceful shutdown: Clean handling of in-progress jobs during shutdown
- Customizable logging: Configure log levels to suit your needs
Installation
npm install @fazpi-ai/qtaskRequirements
- Node.js >= 14.0.0
- Redis >= 6.0.0
Basic Usage
Publisher
import { Queue } from '@fazpi-ai/qtask';
// Create a publisher instance
const publisher = new Queue({
credentials: {
host: 'localhost',
port: 6379
},
type: 'publisher',
logLevel: 'info'
});
// Initialize
await publisher.init();
// Add a job to the queue
const jobId = await publisher.add('emails', 'notifications', {
to: 'user@example.com',
subject: 'Welcome',
body: 'Welcome to our platform'
});
console.log(`Job added with ID: ${jobId}`);Subscriber
import { Queue } from '@fazpi-ai/qtask';
// Create a subscriber instance
const subscriber = new Queue({
credentials: {
host: 'localhost',
port: 6379
},
type: 'subscriber',
logLevel: 'info',
consumerLimits: {
emails: 5 // Maximum 5 consumers for the 'emails' queue
}
});
// Initialize
await subscriber.init();
// Register a processor for the 'emails' queue
subscriber.process('emails', (job, done) => {
console.log(`Processing job ${job.id}`);
console.log(`Data: ${JSON.stringify(job.data)}`);
// Simulate sending an email
setTimeout(() => {
console.log(`Email sent to ${job.data.to}`);
done(); // Mark the job as completed
}, 1000);
});
// Handle application shutdown
process.on('SIGINT', async () => {
console.log('Closing the queue...');
await subscriber.close();
process.exit(0);
});Advanced Usage
Job Options
When adding jobs to the queue, you can specify various options:
// Add a job with priority, delay, and TTL
const jobId = await publisher.add('emails', 'notifications', {
to: 'user@example.com',
subject: 'Welcome',
body: 'Welcome to our platform'
}, {
priority: 1, // Lower number = higher priority (default: 0)
delay: 60000, // Delay execution by 60 seconds
ttl: 3600000 // Job expires after 1 hour if not processed
});Job Progress Tracking
You can track the progress of a job during processing:
subscriber.process('fileProcessing', (job, done) => {
// The job object includes a progress function
const totalSteps = 10;
for (let step = 1; step <= totalSteps; step++) {
// Update progress (0-100%)
job.progress && job.progress(step * 10);
// Do some work...
console.log(`Step ${step}/${totalSteps} completed`);
}
done(); // Mark job as completed
});Error Handling
Handle job processing errors:
subscriber.process('emails', (job, done) => {
try {
// Process the job...
done(); // Success
} catch (error) {
console.error(`Error processing job ${job.id}:`, error);
done(error); // Mark job as failed with error
}
});Redis Cluster Support
QTask supports Redis Cluster configuration:
const queue = new Queue({
credentials: {
clusters: [
{ host: 'redis-1', port: 6379 },
{ host: 'redis-2', port: 6379 },
{ host: 'redis-3', port: 6379 }
],
options: {
redisOptions: {
password: 'your-password'
}
}
},
type: 'publisher',
logLevel: 'info'
});Architecture
QTask uses a combination of Redis data structures to manage job queues efficiently:
- Sorted Sets: For priority queues and delayed jobs
- Hashes: For storing job data and metadata
- Sets: For tracking queue groups
- Pub/Sub: For real-time job notifications
The library uses Lua scripts to ensure atomic operations when enqueueing, dequeueing, and updating job status.
Examples
Check the examples directory for complete usage examples:
publisher.js: Example of how to create a publisher and add jobs to the queuesubscriber.js: Example of how to create a subscriber and process jobs from the queuebasic-example.ts: Complete TypeScript example showing both publisher and subscriber in action
API Reference
Queue Class
The main class for interacting with the queue system.
Constructor Options
interface IQueueOptions {
// Redis connection options
credentials: RedisOptions;
// Consumer limits per queue
consumerLimits?: Record<string, number>;
// Log level
logLevel?: 'silent' | 'debug' | 'info' | 'warn' | 'error';
// Instance type: publisher or subscriber
type: 'publisher' | 'subscriber';
}Methods
init(): Initialize the queueadd(queueName, groupName, data, options?): Add a job to the queueprocess(queueName, callback): Register a processor for a queuegetStatus(jobId): Get the status of a jobclose(): Close the queue and clean up resources
Development
Installing dependencies
npm installBuilding
npm run buildTesting
# Run all tests
npm test
# Run tests with coverage
npm run test:coverage
# Run tests in watch mode
npm run test:watchContributing
Contributions are welcome! Please feel free to submit a Pull Request.
- Fork the repository
- Create your feature branch (
git checkout -b feature/amazing-feature) - Commit your changes (
git commit -m 'Add some amazing feature') - Push to the branch (
git push origin feature/amazing-feature) - Open a Pull Request
Troubleshooting
Common Issues
- Redis Connection Errors: Ensure Redis is running and accessible with the provided credentials.
- Job Not Being Processed: Check that you have a subscriber running and processing the correct queue.
- Memory Issues: If processing large volumes of jobs, consider adjusting the Redis configuration for optimal performance.
Debugging
Enable debug logging for more detailed information:
const queue = new Queue({
// ...other options
logLevel: 'debug'
});License
This project is licensed under the GNU General Public License v3.0 - see the LICENSE file for details.
11 months ago