1.0.2 • Published 5 months ago

kafka-db-tool v1.0.2

Weekly downloads
-
License
MIT
Repository
github
Last release
5 months ago

Kafka-DB Data Flow Tool

License: MIT Node.js Version npm package

šŸš€ A powerful Node.js CLI tool for seamless bidirectional data synchronization between Apache Kafka and PostgreSQL databases. Perfect for real-time data pipelines, database replication, and event-driven architectures, with built-in support for Confluent Cloud.

Features

  • Bidirectional Sync: Transfer data seamlessly between Kafka and PostgreSQL
  • Batch Processing: Optimized performance with configurable batch sizes and intervals
  • Robust Error Handling: Comprehensive error recovery with detailed logging
  • Enterprise Security: Built-in support for SASL authentication and SSL encryption
  • Cloud-Ready: Native integration with Confluent Cloud
  • Production Monitoring: Detailed logging with Winston for better observability
  • Graceful Recovery: Automatic handling of connection issues and process signals

Prerequisites

Before you begin, ensure you have the following installed:

  • Node.js 18.x or later
  • PostgreSQL 12.x or later
  • Access to a Kafka cluster (local or Confluent Cloud)
  • npm 8.x or later

šŸš€ Quick Start

Installation

# Install globally (recommended for CLI usage)
npm install -g kafka-db-tool

# Or install locally in your project
npm install kafka-db-tool

Environment Setup

Create a .env file with your credentials:

# Kafka Configuration
KAFKA_BROKERS=your-broker.confluent.cloud:9092
KAFKA_SASL_USERNAME=your-api-key
KAFKA_SASL_PASSWORD=your-api-secret

# PostgreSQL Configuration
PGHOST=localhost
PGPORT=5432
PGDATABASE=your_database
PGUSER=your_username
PGPASSWORD=your_password

Quick Example

# Start producing data from PostgreSQL to Kafka
kafka-db-tool produce users-topic "SELECT id, name, email FROM users LIMIT 10"

# Consume data from Kafka to PostgreSQL
kafka-db-tool consume users-topic users_backup

Verify Installation

# Check if the tool is properly installed
kafka-db-tool --version

# View available commands
kafka-db-tool --help

Configuration

Kafka Configuration

The tool supports both Confluent Cloud and local Kafka deployments with comprehensive security options.

Confluent Cloud Setup

  1. Get your credentials from Confluent Cloud:

    • Go to your Confluent Cloud cluster
    • Navigate to "API keys" section
    • Create a new API key or use existing one
    • Note down the Bootstrap server
  2. Configure environment variables:

# Required Confluent Cloud Configuration
KAFKA_BROKERS=<your-cluster>.confluent.cloud:9092
KAFKA_SASL_USERNAME=<your-api-key>
KAFKA_SASL_PASSWORD=<your-api-secret>

# Performance Tuning (Optional)
KAFKA_CONNECTION_TIMEOUT=3000
KAFKA_REQUEST_TIMEOUT=25000
KAFKA_RETRY_COUNT=8
KAFKA_MAX_RETRY_TIME=30000

# Security Configuration (Recommended)
KAFKA_SSL_REJECT_UNAUTHORIZED=true  # Always true in production

Local Development Setup

  1. Start local Kafka using provided Docker Compose:
docker-compose up -d
  1. Configure environment for local development:
# Local Development Configuration
KAFKA_BROKERS=localhost:9092
KAFKA_CONNECTION_TIMEOUT=3000

Security Best Practices

  • Always use SASL authentication in production
  • Keep SSL verification enabled (KAFKA_SSL_REJECT_UNAUTHORIZED=true)
  • Rotate API keys regularly
  • Use separate API keys for development and production

2. PostgreSQL Configuration

PGHOST=localhost
PGPORT=5432
PGDATABASE=your_database
PGUSER=your_username
PGPASSWORD=your_password

Usage Guide

The tool provides two main operations: producing messages from database queries and consuming messages into database tables.

Producer Mode (Database → Kafka)

Send data from PostgreSQL queries to Kafka topics:

kafka-db-tool produce <topic> <query> [options]

Options:
  -b, --batch-size <size>   Batch size for queries (default: "100")
  -i, --interval <ms>       Interval between batches (default: "1000")

Producer Examples

  1. Real-time Updates:
# Stream recent user activities
kafka-db-tool produce user-activities \
  "SELECT user_id, action, created_at FROM activities WHERE created_at > NOW() - INTERVAL '1 hour'" \
  --batch-size 50 \
  --interval 5000
  1. Incremental Data Loading:
# Load orders with pagination
kafka-db-tool produce orders-stream \
  "SELECT * FROM orders WHERE processed = false ORDER BY created_at" \
  --batch-size 100
  1. Filtered Data Sync:
# Sync specific order statuses
kafka-db-tool produce pending-orders \
  "SELECT id, customer_id, total, status FROM orders WHERE status IN ('pending', 'processing')" \
  --batch-size 75 \
  --interval 3000

Consumer Mode (Kafka → Database)

Write messages from Kafka topics to PostgreSQL tables:

kafka-db-tool consume <topic> <table> [options]

Options:
  -g, --group <id>         Consumer group ID (default: "kafka-db-tool")
  -b, --batch-size <size>  Batch size for writes (default: "100")

Consumer Examples

  1. Basic Consumption:
# Write messages to backup table
kafka-db-tool consume orders-topic orders_archive \
  --batch-size 200
  1. Multiple Consumers:
# Run parallel consumers with different group IDs
kafka-db-tool consume user-events user_logs \
  --group user-logger-1 \
  --batch-size 150

kafka-db-tool consume user-events user_logs \
  --group user-logger-2 \
  --batch-size 150
  1. Data Warehouse Loading:
# Load analytics data with larger batches
kafka-db-tool consume metrics-stream analytics_raw \
  --batch-size 500 \
  --group analytics-loader

Best Practices

  1. Batch Size Tuning:

    • Start with smaller batches (50-100)
    • Increase if throughput is needed and memory allows
    • Monitor database and Kafka broker load
  2. Consumer Groups:

    • Use meaningful group IDs for tracking
    • Separate groups for different purposes
    • Consider using hostname or environment in group ID
  3. Error Handling:

    • The tool automatically handles connection issues
    • Failed messages are logged in error.log
    • Use monitoring for production deployments

Development Setup

  1. Start local Kafka cluster:
docker-compose up -d
  1. Create test database:
createdb kafka_db_test
  1. Set up environment variables:
cp .env.example .env
# Edit .env with your configurations

šŸ”§ Troubleshooting

Common Issues

Kafka Connection Issues

Error: Could not reach Kafka cluster

āœ… Solutions:

  • Verify your KAFKA_BROKERS URL is correct
  • Check if your network allows the connection
  • For Confluent Cloud, ensure you're using the correct port (9092)
  • Verify SSL/TLS settings if using secure connections
Error: Invalid API key or secret

āœ… Solutions:

  • Double-check your KAFKA_SASL_USERNAME and KAFKA_SASL_PASSWORD
  • Ensure API key has appropriate permissions
  • For Confluent Cloud, verify API key is active

Database Connection Issues

Error: Connection terminated unexpectedly

āœ… Solutions:

  • Verify PostgreSQL is running and accessible
  • Check database credentials in your .env file
  • Ensure database port is not blocked by firewall
Error: Relation does not exist

āœ… Solutions:

  • Confirm table exists in the specified database
  • Check user permissions for the table
  • Verify correct database name in connection string

Getting Help

  • Run kafka-db-tool --help for command usage
  • Check logs in error.log and combined.log for detailed error messages
  • Submit issues on GitHub for bug reports

Environment Variables Reference

Kafka Configuration

  • KAFKA_BROKERS: Comma-separated list of Kafka brokers
  • KAFKA_SASL_USERNAME: SASL username (API key)
  • KAFKA_SASL_PASSWORD: SASL password (API secret)
  • KAFKA_SSL_REJECT_UNAUTHORIZED: SSL verification (default: true)
  • KAFKA_CONNECTION_TIMEOUT: Connection timeout in ms (default: 3000)
  • KAFKA_REQUEST_TIMEOUT: Request timeout in ms (default: 25000)

PostgreSQL Configuration

  • PGHOST: Database host
  • PGPORT: Database port
  • PGDATABASE: Database name
  • PGUSER: Database user
  • PGPASSWORD: Database password

Logging

Logs are written to:

  • error.log: Error-level logs
  • combined.log: All logs

Contributing

  1. Fork the repository
  2. Create your feature branch
  3. Commit your changes
  4. Push to the branch
  5. Create a new Pull Request

License

This project is licensed under the MIT License - see the LICENSE file for details.

node src/index.js produce data_migration information_schema.tables 'select * from information_schema.tables' -rk 'table_catalog,table_schema,table_name'

node src/index.js consume test-user test_table -g 'local'

data_migration