kafka-db-tool v1.0.2
Kafka-DB Data Flow Tool
š A powerful Node.js CLI tool for seamless bidirectional data synchronization between Apache Kafka and PostgreSQL databases. Perfect for real-time data pipelines, database replication, and event-driven architectures, with built-in support for Confluent Cloud.
Features
- Bidirectional Sync: Transfer data seamlessly between Kafka and PostgreSQL
- Batch Processing: Optimized performance with configurable batch sizes and intervals
- Robust Error Handling: Comprehensive error recovery with detailed logging
- Enterprise Security: Built-in support for SASL authentication and SSL encryption
- Cloud-Ready: Native integration with Confluent Cloud
- Production Monitoring: Detailed logging with Winston for better observability
- Graceful Recovery: Automatic handling of connection issues and process signals
Prerequisites
Before you begin, ensure you have the following installed:
- Node.js 18.x or later
- PostgreSQL 12.x or later
- Access to a Kafka cluster (local or Confluent Cloud)
- npm 8.x or later
š Quick Start
Installation
# Install globally (recommended for CLI usage)
npm install -g kafka-db-tool
# Or install locally in your project
npm install kafka-db-tool
Environment Setup
Create a .env
file with your credentials:
# Kafka Configuration
KAFKA_BROKERS=your-broker.confluent.cloud:9092
KAFKA_SASL_USERNAME=your-api-key
KAFKA_SASL_PASSWORD=your-api-secret
# PostgreSQL Configuration
PGHOST=localhost
PGPORT=5432
PGDATABASE=your_database
PGUSER=your_username
PGPASSWORD=your_password
Quick Example
# Start producing data from PostgreSQL to Kafka
kafka-db-tool produce users-topic "SELECT id, name, email FROM users LIMIT 10"
# Consume data from Kafka to PostgreSQL
kafka-db-tool consume users-topic users_backup
Verify Installation
# Check if the tool is properly installed
kafka-db-tool --version
# View available commands
kafka-db-tool --help
Configuration
Kafka Configuration
The tool supports both Confluent Cloud and local Kafka deployments with comprehensive security options.
Confluent Cloud Setup
Get your credentials from Confluent Cloud:
- Go to your Confluent Cloud cluster
- Navigate to "API keys" section
- Create a new API key or use existing one
- Note down the Bootstrap server
Configure environment variables:
# Required Confluent Cloud Configuration
KAFKA_BROKERS=<your-cluster>.confluent.cloud:9092
KAFKA_SASL_USERNAME=<your-api-key>
KAFKA_SASL_PASSWORD=<your-api-secret>
# Performance Tuning (Optional)
KAFKA_CONNECTION_TIMEOUT=3000
KAFKA_REQUEST_TIMEOUT=25000
KAFKA_RETRY_COUNT=8
KAFKA_MAX_RETRY_TIME=30000
# Security Configuration (Recommended)
KAFKA_SSL_REJECT_UNAUTHORIZED=true # Always true in production
Local Development Setup
- Start local Kafka using provided Docker Compose:
docker-compose up -d
- Configure environment for local development:
# Local Development Configuration
KAFKA_BROKERS=localhost:9092
KAFKA_CONNECTION_TIMEOUT=3000
Security Best Practices
- Always use SASL authentication in production
- Keep SSL verification enabled (
KAFKA_SSL_REJECT_UNAUTHORIZED=true
) - Rotate API keys regularly
- Use separate API keys for development and production
2. PostgreSQL Configuration
PGHOST=localhost
PGPORT=5432
PGDATABASE=your_database
PGUSER=your_username
PGPASSWORD=your_password
Usage Guide
The tool provides two main operations: producing messages from database queries and consuming messages into database tables.
Producer Mode (Database ā Kafka)
Send data from PostgreSQL queries to Kafka topics:
kafka-db-tool produce <topic> <query> [options]
Options:
-b, --batch-size <size> Batch size for queries (default: "100")
-i, --interval <ms> Interval between batches (default: "1000")
Producer Examples
- Real-time Updates:
# Stream recent user activities
kafka-db-tool produce user-activities \
"SELECT user_id, action, created_at FROM activities WHERE created_at > NOW() - INTERVAL '1 hour'" \
--batch-size 50 \
--interval 5000
- Incremental Data Loading:
# Load orders with pagination
kafka-db-tool produce orders-stream \
"SELECT * FROM orders WHERE processed = false ORDER BY created_at" \
--batch-size 100
- Filtered Data Sync:
# Sync specific order statuses
kafka-db-tool produce pending-orders \
"SELECT id, customer_id, total, status FROM orders WHERE status IN ('pending', 'processing')" \
--batch-size 75 \
--interval 3000
Consumer Mode (Kafka ā Database)
Write messages from Kafka topics to PostgreSQL tables:
kafka-db-tool consume <topic> <table> [options]
Options:
-g, --group <id> Consumer group ID (default: "kafka-db-tool")
-b, --batch-size <size> Batch size for writes (default: "100")
Consumer Examples
- Basic Consumption:
# Write messages to backup table
kafka-db-tool consume orders-topic orders_archive \
--batch-size 200
- Multiple Consumers:
# Run parallel consumers with different group IDs
kafka-db-tool consume user-events user_logs \
--group user-logger-1 \
--batch-size 150
kafka-db-tool consume user-events user_logs \
--group user-logger-2 \
--batch-size 150
- Data Warehouse Loading:
# Load analytics data with larger batches
kafka-db-tool consume metrics-stream analytics_raw \
--batch-size 500 \
--group analytics-loader
Best Practices
Batch Size Tuning:
- Start with smaller batches (50-100)
- Increase if throughput is needed and memory allows
- Monitor database and Kafka broker load
Consumer Groups:
- Use meaningful group IDs for tracking
- Separate groups for different purposes
- Consider using hostname or environment in group ID
Error Handling:
- The tool automatically handles connection issues
- Failed messages are logged in error.log
- Use monitoring for production deployments
Development Setup
- Start local Kafka cluster:
docker-compose up -d
- Create test database:
createdb kafka_db_test
- Set up environment variables:
cp .env.example .env
# Edit .env with your configurations
š§ Troubleshooting
Common Issues
Kafka Connection Issues
Error: Could not reach Kafka cluster
ā Solutions:
- Verify your
KAFKA_BROKERS
URL is correct - Check if your network allows the connection
- For Confluent Cloud, ensure you're using the correct port (9092)
- Verify SSL/TLS settings if using secure connections
Error: Invalid API key or secret
ā Solutions:
- Double-check your
KAFKA_SASL_USERNAME
andKAFKA_SASL_PASSWORD
- Ensure API key has appropriate permissions
- For Confluent Cloud, verify API key is active
Database Connection Issues
Error: Connection terminated unexpectedly
ā Solutions:
- Verify PostgreSQL is running and accessible
- Check database credentials in your
.env
file - Ensure database port is not blocked by firewall
Error: Relation does not exist
ā Solutions:
- Confirm table exists in the specified database
- Check user permissions for the table
- Verify correct database name in connection string
Getting Help
- Run
kafka-db-tool --help
for command usage - Check logs in
error.log
andcombined.log
for detailed error messages - Submit issues on GitHub for bug reports
Environment Variables Reference
Kafka Configuration
KAFKA_BROKERS
: Comma-separated list of Kafka brokersKAFKA_SASL_USERNAME
: SASL username (API key)KAFKA_SASL_PASSWORD
: SASL password (API secret)KAFKA_SSL_REJECT_UNAUTHORIZED
: SSL verification (default: true)KAFKA_CONNECTION_TIMEOUT
: Connection timeout in ms (default: 3000)KAFKA_REQUEST_TIMEOUT
: Request timeout in ms (default: 25000)
PostgreSQL Configuration
PGHOST
: Database hostPGPORT
: Database portPGDATABASE
: Database namePGUSER
: Database userPGPASSWORD
: Database password
Logging
Logs are written to:
error.log
: Error-level logscombined.log
: All logs
Contributing
- Fork the repository
- Create your feature branch
- Commit your changes
- Push to the branch
- Create a new Pull Request
License
This project is licensed under the MIT License - see the LICENSE file for details.
node src/index.js produce data_migration information_schema.tables 'select * from information_schema.tables' -rk 'table_catalog,table_schema,table_name'
node src/index.js consume test-user test_table -g 'local'
data_migration