Skip to content

andeerc/pg-multiverse

Repository files navigation

🌌 PG Multiverse

Multi-cluster PostgreSQL manager for Node.js with TypeScript support

npm version TypeScript License: MIT Downloads GitHub Stars

PG Multiverse is a powerful, enterprise-grade PostgreSQL multi-cluster manager that brings advanced database scaling capabilities to Node.js applications. With full TypeScript support, intelligent load balancing, distributed caching, and seamless multi-schema operations.

✨ Features

🏒 Multi-Cluster Management

  • Read/Write Splitting - Automatic query routing to primary/replica clusters
  • Intelligent Load Balancing - 5 strategies: round-robin, weighted, least-connections, response-time, health-aware
  • Health Monitoring - Continuous health checks with automatic failover
  • Multi-Schema Support - Route queries based on schema with zero configuration

⚑ Performance & Reliability

  • Distributed Caching - Built-in Redis-like caching with TTL and invalidation
  • Connection Pooling - Optimized connection management with warming and metrics
  • Distributed Transactions - Cross-cluster ACID transactions with 2PC
  • Automatic Failover - Zero-downtime cluster switching

πŸ› οΈ Developer Experience

  • Full TypeScript Support - Type-safe queries with generics and interfaces
  • Database Migrations - Multi-cluster schema management with TypeScript/JavaScript support
  • Zero Configuration - Works out of the box with sensible defaults
  • Rich Metrics - Detailed performance monitoring and health statistics
  • Event-Driven - Comprehensive event system for monitoring and debugging

πŸš€ Quick Start

Installation

# Core library
npm install pg-multiverse

# Optional: For TypeScript migration support
npm install ts-node

# Optional: CLI tools
npm install -g pg-multiverse
# or use: npx pgm <command>

Basic Usage

import { PgMultiverse } from 'pg-multiverse';

// Define your data types
interface User {
  id: number;
  email: string;
  name: string;
  active: boolean;
}

// Initialize with cluster configuration
const postgres = new PgMultiverse({
  enableCache: true,
  enableMetrics: true,
  enableTransactions: true
});

await postgres.initialize({
  users_cluster: {
    schemas: ['users', 'auth'],
    primary: {
      host: 'primary.db.com',
      port: 5432,
      database: 'app_users',
      user: 'postgres',
      password: 'password'
    },
    replicas: [{
      host: 'replica.db.com',
      port: 5432,
      database: 'app_users',
      user: 'postgres',
      password: 'password'
    }]
  }
});

// Type-safe queries with automatic cluster routing
const users = await postgres.query<User>(
  'SELECT * FROM users WHERE active = $1',
  [true],
  {
    schema: 'users',     // Auto-routes to users_cluster
    cache: true,         // Cache results
    cacheTtl: 300000     // 5 minute cache
  }
);

// users.rows is automatically typed as User[]
users.rows.forEach(user => {
  console.log(`${user.name} <${user.email}>`);
});

Database Migrations

// Enable migrations
const postgres = new PgMultiverse({
  enableMigrations: true,
  migrations: {
    migrationsPath: './migrations',
    autoCreateMigrationsTable: true,
  },
});

await postgres.initialize(config);

// Run pending migrations
await postgres.migrate();

// Check migration status
const status = await postgres.getMigrationStatus();
console.log(`Applied: ${status.appliedMigrations}, Pending: ${status.pendingMigrations}`);

// Rollback last migration
await postgres.rollback({ steps: 1 });

Distributed Transactions

// Cross-cluster transactions
await postgres.withTransaction(['users', 'orders'], async (tx) => {
  // Update user in users_cluster
  await tx.query(
    'UPDATE users SET last_order = NOW() WHERE id = $1',
    [userId],
    { schema: 'users' }
  );

  // Create order in orders_cluster
  const order = await tx.query<Order>(
    'INSERT INTO orders (user_id, total) VALUES ($1, $2) RETURNING *',
    [userId, total],
    { schema: 'orders' }
  );

  return order.rows[0];
});

πŸ“– Documentation

Core Concepts

Multi-Cluster Architecture

PG Multiverse manages multiple PostgreSQL clusters, each handling specific schemas:

const clusterConfig = {
  // User data cluster
  users_cluster: {
    schemas: ['users', 'profiles', 'auth'],
    primary: { /* primary connection */ },
    replicas: [{ /* replica connections */ }],
    readPreference: 'replica',
    consistencyLevel: 'eventual'
  },

  // Commerce cluster
  commerce_cluster: {
    schemas: ['products', 'orders', 'payments'],
    primary: { /* primary connection */ },
    readPreference: 'primary',
    consistencyLevel: 'strong'
  }
};

Load Balancing Strategies

  1. Round Robin (default) - Distributes requests evenly
  2. Weighted - Routes based on server weights
  3. Least Connections - Prefers servers with fewer active connections
  4. Response Time - Routes to fastest responding servers
  5. Health Aware - Combines multiple metrics for intelligent routing
{
  loadBalancing: {
    strategy: 'health_aware',
    weights: {
      'replica_1': 2,
      'replica_2': 1
    },
    healthThreshold: 80
  }
}

Caching System

Built-in distributed caching with multiple invalidation strategies:

// Cache with automatic invalidation
await postgres.query('SELECT * FROM products', [], {
  schema: 'products',
  cache: true,
  cacheTtl: 1800000,           // 30 minutes
  cacheKey: 'all_products',
  tags: ['products', 'catalog'] // For bulk invalidation
});

// Invalidate cache
await postgres.invalidateCache({
  schema: 'products'
});

await postgres.invalidateCache({
  tags: ['products', 'inventory']
});

API Reference

Core Methods

class PgMultiverse {
  // Initialize with cluster configuration
  async initialize(config: ClusterConfiguration): Promise<void>

  // Execute type-safe queries
  async query<T>(sql: string, params?: any[], options?: QueryOptions): Promise<QueryResult<T>>

  // Distributed transactions
  async withTransaction<T>(schemas: string[], callback: TransactionCallback<T>): Promise<T>

  // Database migrations
  async migrate(options?: MigrationExecutionOptions): Promise<MigrationStatus>
  async rollback(options?: MigrationRollbackOptions): Promise<MigrationStatus>
  async getMigrationStatus(): Promise<MigrationStatus>
  async createMigration(name: string, options: MigrationCreateOptions): Promise<string>
  getMigrations(): Migration[]
  addMigration(migration: Migration): void
  getMigrationManager(): MigrationManager

  // Health and metrics
  async healthCheck(): Promise<HealthReport>
  getMetrics(): SystemMetrics

  // Cache management
  async invalidateCache(criteria: InvalidationCriteria): Promise<number>

  // Lifecycle
  async close(): Promise<void>
}

Configuration Options

interface MultiClusterConfig {
  enableCache?: boolean;        // Enable distributed caching
  enableMetrics?: boolean;      // Collect performance metrics
  enableTransactions?: boolean; // Enable distributed transactions
  enableMigrations?: boolean;   // Enable database migrations

  cache?: {
    maxSize?: number;          // Max cache entries (1000)
    ttl?: number;              // Default TTL in ms (300000)
    enableCompression?: boolean; // Compress large entries
  };

  migrations?: {
    migrationsPath?: string;           // Path to migration files (./migrations)
    migrationsTable?: string;          // Control table name
    lockTable?: string;                // Locking table name
    autoCreateMigrationsTable?: boolean; // Auto-create control tables
    validateChecksums?: boolean;       // Validate migration integrity
    allowOutOfOrder?: boolean;         // Require sequential execution
    lockTimeout?: number;              // Lock timeout in ms (60000)
    batchSize?: number;                // Batch size for parallel execution
    logger?: MigrationLogger;          // Custom logger instance
  };

  cluster?: {
    healthCheckInterval?: number;     // Health check frequency (30000ms)
    retryAttempts?: number;          // Connection retry attempts (3)
    maxFailuresBeforeMarkDown?: number; // Failures before marking down (3)
  };
}

πŸ—„οΈ Database Migrations

PG Multiverse includes a powerful migration system that supports multi-cluster and multi-schema databases with both TypeScript and JavaScript.

Migration Configuration

const postgres = new PgMultiverse({
  enableMigrations: true,
  migrations: {
    migrationsPath: './migrations',           // Path to migration files
    migrationsTable: 'pg_multiverse_migrations', // Control table name
    lockTable: 'pg_multiverse_migration_locks',  // Locking table name
    autoCreateMigrationsTable: true,         // Auto-create control tables
    validateChecksums: true,                 // Validate migration integrity
    allowOutOfOrder: false,                  // Require sequential execution
    lockTimeout: 60000,                      // Lock timeout in ms
    batchSize: 100,                          // Batch size for parallel execution
    logger: customLogger                     // Custom logger instance
  }
});

Creating Migrations

TypeScript Migrations

// migrations/20241230120000_create_users.ts
import { Migration, MigrationContext } from 'pg-multiverse';

const migration: Migration = {
  version: '20241230120000_create_users',
  name: 'create_users',
  description: 'Create users table with profiles',
  targetSchemas: ['users', 'auth'],
  targetClusters: ['users_cluster'], // Optional: specific clusters
  dependencies: ['20241230110000_create_base'], // Optional: dependencies
  tags: ['users', 'initial'],        // Optional: tags for organization
  createdAt: new Date(),

  async up(context: MigrationContext): Promise<void> {
    context.logger.info(`Creating users table in ${context.schema}`);
    
    // Type-safe query execution
    await context.query(`
      CREATE TABLE users (
        id SERIAL PRIMARY KEY,
        email VARCHAR(255) UNIQUE NOT NULL,
        name VARCHAR(255) NOT NULL,
        active BOOLEAN DEFAULT true,
        created_at TIMESTAMP DEFAULT NOW()
      )
    `);

    // Insert seed data
    await context.query(`
      INSERT INTO users (email, name) VALUES 
      ($1, $2), ($3, $4)
    `, ['admin@example.com', 'Admin', 'user@example.com', 'User']);

    context.logger.info('Users table created successfully');
  },

  async down(context: MigrationContext): Promise<void> {
    context.logger.info(`Dropping users table from ${context.schema}`);
    await context.query(`DROP TABLE IF EXISTS users`);
    context.logger.info('Users table dropped successfully');
  }
};

export default migration;

JavaScript Migrations

// migrations/20241230130000_add_profiles.js
const migration = {
  version: '20241230130000_add_profiles',
  name: 'add_profiles',
  description: 'Add user profiles functionality',
  targetSchemas: ['users'],
  createdAt: new Date(),

  async up(context) {
    await context.query(`
      CREATE TABLE profiles (
        user_id INTEGER PRIMARY KEY REFERENCES users(id),
        bio TEXT,
        avatar_url VARCHAR(500),
        settings JSONB DEFAULT '{}'
      )
    `);
  },

  async down(context) {
    await context.query(`DROP TABLE IF EXISTS profiles`);
  }
};

module.exports = migration;

Migration Operations

Running Migrations

// Run all pending migrations
await postgres.migrate();

// Run migrations with options
await postgres.migrate({
  targetVersion: '20241230120000_create_users', // Run up to specific version
  targetSchemas: ['users', 'auth'],             // Specific schemas only
  targetClusters: ['users_cluster'],            // Specific clusters only
  dryRun: true,                                 // Show what would run without executing
  parallel: true,                               // Run in parallel where possible
  maxParallel: 4,                               // Max parallel executions
  continueOnError: false,                       // Stop on first error
});

// Run with detailed progress monitoring
postgres.getMigrationManager().on('migrationStarted', (data) => {
  console.log(`⚑ Starting: ${data.name} on ${data.schema}@${data.cluster}`);
});

postgres.getMigrationManager().on('migrationCompleted', (data) => {
  console.log(`βœ… Completed: ${data.name} (${data.duration}ms)`);
});

Rollback Operations

// Rollback last migration
await postgres.rollback({ steps: 1 });

// Rollback to specific version
await postgres.rollback({
  targetVersion: '20241230110000_create_base',
  targetSchemas: ['users'],
  dryRun: true // Preview rollback without executing
});

// Rollback with error handling
try {
  await postgres.rollback({ steps: 2, force: true });
} catch (error) {
  console.error('Rollback failed:', error.message);
}

Migration Status and Management

// Get comprehensive status
const status = await postgres.getMigrationStatus();
console.log(`Total: ${status.totalMigrations}`);
console.log(`Applied: ${status.appliedMigrations}`);
console.log(`Pending: ${status.pendingMigrations}`);

// Status by schema
Object.entries(status.bySchema).forEach(([schema, stats]) => {
  console.log(`${schema}: ${stats.applied} applied, ${stats.pending} pending`);
  if (stats.lastApplied) {
    console.log(`  Last applied: ${stats.lastApplied}`);
  }
});

// Status by cluster
Object.entries(status.byCluster).forEach(([cluster, stats]) => {
  console.log(`${cluster}: ${stats.applied} applied, ${stats.pending} pending`);
});

// List all available migrations
const migrations = postgres.getMigrations();
migrations.forEach(migration => {
  console.log(`${migration.version}: ${migration.name}`);
  console.log(`  Schemas: ${migration.targetSchemas.join(', ')}`);
  console.log(`  Description: ${migration.description}`);
});

Creating Migration Files

// Create new migration file
const filePath = await postgres.createMigration('add_user_permissions', {
  targetSchemas: ['users', 'auth'],
  targetClusters: ['users_cluster'],
  description: 'Add user permissions and roles system',
  tags: ['permissions', 'security']
});

console.log(`Migration created: ${filePath}`);

CLI Migration Management

Install the CLI globally or use npx:

npm install -g pg-multiverse
# or use npx pg-multiverse

CLI Commands

# Create a new migration
pgm create add_user_roles --schemas users,auth --description "Add role-based permissions"

# Run migrations
pgm migrate                              # Run all pending
pgm migrate --target 20241230120000      # Run up to specific version
pgm migrate --schemas users,products     # Specific schemas only
pgm migrate --parallel                   # Run in parallel
pgm migrate --dry-run                    # Preview without executing

# Check status
pgm status                               # Overall status
pgm status --schemas users              # Schema-specific status
pgm status --verbose                     # Detailed information

# Rollback migrations
pgm rollback --steps 1                   # Rollback last migration
pgm rollback --target 20241230110000     # Rollback to specific version
pgm rollback --dry-run                   # Preview rollback

# List migrations
pgm list                                 # Show all migrations
pgm list --verbose                       # Show detailed information

# Global options
pgm --config config.json                 # Use custom config file
pgm --migrations ./db/migrations         # Custom migrations path
pgm --verbose                            # Verbose output

Advanced Migration Features

Cross-Cluster Dependencies

const migration: Migration = {
  version: '20241230150000_create_orders',
  name: 'create_orders',
  targetSchemas: ['orders'],
  dependencies: ['20241230120000_create_users'], // Users must exist first
  
  async up(context) {
    // Reference data from other clusters is handled automatically
    await context.query(`
      CREATE TABLE orders (
        id SERIAL PRIMARY KEY,
        user_email VARCHAR(255) NOT NULL, -- Reference to users cluster
        total DECIMAL(10,2) NOT NULL,
        status VARCHAR(50) DEFAULT 'pending'
      )
    `);
  }
};

Conditional Migrations

const migration: Migration = {
  version: '20241230160000_conditional_update',
  name: 'conditional_update',
  targetSchemas: ['users'],
  
  async up(context) {
    // Check if column exists before adding
    const result = await context.query(`
      SELECT column_name 
      FROM information_schema.columns 
      WHERE table_name = 'users' AND column_name = 'last_login'
    `);
    
    if (result.rows.length === 0) {
      await context.query(`
        ALTER TABLE users ADD COLUMN last_login TIMESTAMP
      `);
      context.logger.info('Added last_login column');
    } else {
      context.logger.info('last_login column already exists, skipping');
    }
  }
};

Data Migrations with Batching

const migration: Migration = {
  version: '20241230170000_migrate_user_data',
  name: 'migrate_user_data',
  targetSchemas: ['users'],
  
  async up(context) {
    // Process large datasets in batches
    let offset = 0;
    const batchSize = 1000;
    let hasMore = true;
    
    while (hasMore) {
      const users = await context.query(`
        SELECT id, old_field 
        FROM users 
        WHERE new_field IS NULL 
        LIMIT $1 OFFSET $2
      `, [batchSize, offset]);
      
      if (users.rows.length === 0) {
        hasMore = false;
        break;
      }
      
      // Process batch
      for (const user of users.rows) {
        await context.query(`
          UPDATE users 
          SET new_field = $1 
          WHERE id = $2
        `, [processOldField(user.old_field), user.id]);
      }
      
      offset += batchSize;
      context.logger.info(`Processed ${offset} users`);
    }
  }
};

Migration Best Practices

1. Naming Convention

YYYYMMDDHHMMSS_descriptive_name.ts
20241230120000_create_users_table.ts
20241230130000_add_user_permissions.ts
20241230140000_migrate_legacy_data.ts

2. Schema Organization

  • Group related tables in the same migration
  • Use descriptive names and comments
  • Always provide rollback functionality

3. Production Safety

// Always test with dry-run first
await postgres.migrate({ dryRun: true });

// Use transactions for complex migrations
const migration: Migration = {
  async up(context) {
    await context.query('BEGIN');
    try {
      await context.query('-- complex operations');
      await context.query('COMMIT');
    } catch (error) {
      await context.query('ROLLBACK');
      throw error;
    }
  }
};

4. Error Handling

// Comprehensive error handling
postgres.getMigrationManager().on('migrationFailed', (data) => {
  console.error(`❌ Migration ${data.name} failed:`, data.error.message);
  // Send alerts, log to monitoring system, etc.
});

πŸ—οΈ Advanced Usage

Sharding Support

{
  orders_cluster: {
    schemas: ['orders'],
    sharding: {
      strategy: 'hash',
      key: 'user_id',
      partitions: 4
    },
    primary: { /* config */ },
    replicas: [
      { /* shard 1 replica */ },
      { /* shard 2 replica */ },
      { /* shard 3 replica */ },
      { /* shard 4 replica */ }
    ]
  }
}

Dynamic Schema Registration

// Register schemas at runtime
postgres.registerSchema('analytics', 'analytics_cluster', {
  cacheStrategy: 'aggressive',
  priority: 3
});

// Query the new schema
const events = await postgres.query(
  'SELECT * FROM events WHERE date > $1',
  [startDate],
  { schema: 'analytics' }
);

Event Monitoring

// Monitor cluster health
postgres.on('clusterDown', ({ clusterId, reason }) => {
  console.error(`Cluster ${clusterId} is down: ${reason}`);
  // Trigger alerts, logging, etc.
});

postgres.on('clusterRecovered', ({ clusterId, downtime }) => {
  console.log(`Cluster ${clusterId} recovered after ${downtime}ms`);
});

// Monitor cache performance
postgres.on('cacheHit', ({ key, schema }) => {
  console.log(`Cache hit for ${key} in schema ${schema}`);
});

// Monitor query performance
postgres.on('queryExecuted', ({ duration, clusterId }) => {
  if (duration > 1000) {
    console.warn(`Slow query detected: ${duration}ms on ${clusterId}`);
  }
});

πŸ§ͺ Testing

Unit Tests

npm test

Integration Tests

# With real PostgreSQL instances
npm run test:integration

Coverage Report

npm run test:coverage

πŸ“Š Monitoring & Metrics

System Metrics

const metrics = postgres.getMetrics();

console.log(`Total Queries: ${metrics.totalQueries}`);
console.log(`Average Response Time: ${metrics.avgResponseTime}ms`);
console.log(`Error Rate: ${metrics.errorRate}%`);

// Per-cluster metrics
Object.entries(metrics.clusters).forEach(([clusterId, stats]) => {
  console.log(`${clusterId}: ${stats.queries.total} queries, ${stats.connections.active} active connections`);
});

// Cache performance
if (metrics.cache) {
  console.log(`Cache Hit Rate: ${(metrics.cache.hitRate * 100).toFixed(2)}%`);
  console.log(`Cache Size: ${metrics.cache.itemCount} items`);
}

Health Monitoring

const health = await postgres.healthCheck();

Object.entries(health).forEach(([clusterId, status]) => {
  console.log(`${clusterId}: ${status.healthy ? 'βœ…' : '❌'} (${status.responseTime}ms)`);

  if (!status.healthy) {
    console.error(`Error: ${status.error}`);
    console.log(`Failure Count: ${status.failureCount}`);
  }
});

πŸš€ Performance

Benchmarks

Feature Performance
Connection Pooling 10,000+ concurrent connections
Query Caching 95%+ cache hit rates
Load Balancing Sub-1ms routing overhead
Health Checks 30s intervals, <5ms latency
Failover Time <2s automatic recovery

Production Tips

  1. Connection Tuning

    {
      connectionPool: {
        min: 5,              // Always keep 5 connections warm
        max: 50,             // Scale up to 50 under load
        warmupConnections: true
      }
    }
  2. Cache Optimization

    {
      cache: {
        maxSize: 10000,      // Increase for high-traffic apps
        enableCompression: true, // Save memory on large results
        compressionThreshold: 1024
      }
    }
  3. Health Check Tuning

    {
      cluster: {
        healthCheckInterval: 15000,    // More frequent checks
        maxFailuresBeforeMarkDown: 5   // More resilient to transient issues
      }
    }

🌍 Use Cases

πŸ›’ E-commerce Applications

  • User Cluster: customers, profiles, authentication
  • Catalog Cluster: products, categories, inventory
  • Order Cluster: orders, payments, shipping
  • Analytics Cluster: events, metrics, reports
  • Schema Evolution: Coordinated migrations across all clusters

🏒 Multi-tenant SaaS

  • Tenant Isolation: Separate clusters per tier
  • Shared Services: Common clusters for billing, analytics
  • Geographic Distribution: Region-specific clusters
  • Rolling Updates: Zero-downtime schema migrations per tenant

πŸ”§ Microservices Architecture

  • Service Isolation: One cluster per microservice
  • Cross-service Transactions: Distributed transactions
  • Centralized Monitoring: Unified metrics and health
  • Independent Deployments: Service-specific migration management

πŸ“Š Data-intensive Applications

  • OLTP/OLAP Separation: Transactional vs analytical workloads
  • Read Scaling: Multiple read replicas
  • Cache Optimization: Aggressive caching for hot data
  • Data Pipeline Management: Automated migrations for ETL processes

πŸ“¦ Migration Guide

From pg to pg-multiverse

// Before: Direct pg usage
import { Pool } from 'pg';
const pool = new Pool({ /* config */ });
const result = await pool.query('SELECT * FROM users');

// After: PG Multiverse with type safety
import { PgMultiverse } from 'pg-multiverse';
const postgres = new PgMultiverse();
await postgres.initialize(config);
const result = await postgres.query<User>('SELECT * FROM users');

From Single Database

  1. Start with single cluster
  2. Add read replicas
  3. Introduce schema-based routing
  4. Scale to multiple clusters

🀝 Contributing

We welcome contributions! Please see our Contributing Guide.

Development Setup

# Clone the repository
git clone https://github.com/andeerc/pg-multiverse.git
cd pg-multiverse

# Install dependencies
npm install

# Run tests
npm test

# Build the project
npm run build

πŸ“ Changelog

See CHANGELOG.md for version history.

πŸ“œ License

MIT Β© PG Multiverse Team

πŸ™ Acknowledgments

  • PostgreSQL Community - For the amazing database
  • Node.js pg Driver - Reliable PostgreSQL connectivity
  • TypeScript Team - Type safety and developer experience
  • Contributors - Everyone who helped make this project better

🌟 Star us on GitHub β€’ πŸ“– Documentation β€’ πŸ’¬ Discord Community β€’ πŸ› Report Bug

Made with ❀️ by the PG Multiverse Team

About

No description, website, or topics provided.

Resources

License

Contributing

Stars

Watchers

Forks

Packages

No packages published

Contributors 2

  •  
  •