From 108cbcb76585a92f106904d0c20e96285a98984a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edgardo=20Obreg=C3=B3n?= Date: Fri, 9 Jan 2026 10:02:13 -0500 Subject: [PATCH] feat: add natural key support for idempotent SQL generation Adds comprehensive natural key detection and CTE-based SQL generation for tables without unique constraints, enabling idempotent multi-run imports. Supports both manual specification and auto-detection with priority-based detection heuristics. --- README.md | 115 +++++ src/pgslice/cli.py | 85 ++++ src/pgslice/config.py | 1 + src/pgslice/dumper/dump_service.py | 4 +- src/pgslice/dumper/sql_generator.py | 470 +++++++++++++++++-- tests/unit/dumper/test_sql_generator.py | 592 ++++++++++++++++++++++-- 6 files changed, 1191 insertions(+), 76 deletions(-) diff --git a/README.md b/README.md index a7844a1..2b7ea48 100644 --- a/README.md +++ b/README.md @@ -42,6 +42,7 @@ Extract only what you need while maintaining referential integrity. - ✅ **Multiple records**: Extract multiple records in one operation - ✅ **Timeframe filtering**: Filter specific tables by date ranges - ✅ **PK remapping**: Auto-remaps auto-generated primary keys for clean imports +- ✅ **Natural key support**: Idempotent SQL generation for tables without unique constraints - ✅ **DDL generation**: Optionally include CREATE DATABASE/SCHEMA/TABLE statements for self-contained dumps - ✅ **Progress bar**: Visual progress indicator for dump operations - ✅ **Schema caching**: SQLite-based caching for improved performance @@ -181,6 +182,8 @@ pgslice --host localhost --database mydb --dump users --pks 42 \ --log-level DEBUG 2>debug.log ``` +**Transaction Safety**: All generated SQL dumps are wrapped in `BEGIN`/`COMMIT` transactions by default. If any part of the import fails, everything automatically rolls back, leaving your database unchanged. + ### Schema Exploration ```bash @@ -202,6 +205,118 @@ pgslice> tables pgslice> describe film ``` +## Idempotent Imports with Natural Keys + +### What Are Natural Keys? + +**Natural keys** are columns (or combinations of columns) that uniquely identify a record by its business meaning, even without explicit database constraints. They represent the "real-world" identifier for your data. + +Examples: +- `roles.name` - Role names like "Admin", "User", "Guest" are naturally unique +- `statuses.code` - Status codes like "ACTIVE", "INACTIVE", "PENDING" +- `(tenant_id, setting_key)` - Configuration settings in multi-tenant systems +- `countries.iso_code` - ISO country codes like "US", "CA", "UK" + +### Why Use Natural Keys? + +By default, pgslice remaps auto-generated primary keys (SERIAL, IDENTITY) to avoid conflicts when importing. However, this can create duplicate records if you reimport the same dump multiple times: + +```sql +-- First import: Creates record with new id=1 +INSERT INTO roles (name) VALUES ('Admin'); + +-- Second import: Creates duplicate with new id=2 (no UNIQUE constraint to prevent it!) +INSERT INTO roles (name) VALUES ('Admin'); +``` + +The `--natural-keys` flag solves this by generating **idempotent SQL** - scripts that check "does a record with this natural key already exist?" before inserting. Run the same dump multiple times safely with no duplicates. + +### When to Use `--natural-keys` + +Use this flag when: +- ✅ Tables have auto-generated PKs (SERIAL, IDENTITY columns) +- ✅ You need to reimport dumps multiple times (development, testing, CI/CD) +- ✅ Tables lack explicit UNIQUE constraints on natural key columns +- ✅ You need composite natural keys (multiple columns for uniqueness) +- ✅ Auto-detection fails or you want explicit control + +### Usage Examples + +```bash +# Single-column natural key (common for reference/lookup tables) +pgslice --host localhost --database mydb --dump users --pks 42 \ + --natural-keys "roles=name" + +# With schema prefix (explicit schema) +pgslice --host localhost --database mydb --dump users --pks 42 \ + --natural-keys "public.roles=name" + +# Composite natural key (multiple columns define uniqueness) +pgslice --host localhost --database mydb --dump customers --pks 1 \ + --natural-keys "tenant_settings=tenant_id,setting_key" + +# Multiple tables (semicolon-separated) +pgslice --host localhost --database mydb --dump orders --pks 123 \ + --natural-keys "roles=name;statuses=code;countries=iso_code" + +# Complex example with mixed single and composite keys +pgslice --host localhost --database mydb --dump products --pks 456 \ + --natural-keys "roles=name;tenant_configs=tenant_id,config_key;categories=slug" +``` + +**Format**: `--natural-keys "schema.table=col1,col2;other_table=col1;..."` +- Tables separated by `;` +- Columns separated by `,` +- Schema prefix optional (defaults to `public`) + +### Auto-Detection + +pgslice automatically detects natural keys in this priority order: + +1. **Manual specification** (highest priority) - Your `--natural-keys` flag +2. **Common column names** - Recognizes patterns like: + - Exact matches: `name`, `code`, `slug`, `email`, `username`, `key`, `identifier`, `handle` + - Suffix patterns: `*_code`, `*_key`, `*_identifier`, `*_slug` +3. **Reference table heuristic** - Small tables (2-3 columns) with one non-PK text column +4. **Error if none found** - Suggests using `--natural-keys` manually + +For most reference tables (roles, statuses, categories), auto-detection works automatically. Use manual specification for: +- Tables with unconventional column names +- Composite natural keys +- When you want explicit control + +### How It Works + +When natural keys are specified, pgslice generates sophisticated CTE-based SQL that: + +1. Checks if records with matching natural keys already exist +2. Only inserts records that don't exist yet +3. Maps old primary keys to new (or existing) primary keys for foreign key resolution +4. Ensures idempotency - running multiple times produces the same result + +Example generated SQL structure: +```sql +WITH to_insert AS ( + -- Values to potentially insert + SELECT * FROM (VALUES (...)) AS v(...) +), +existing AS ( + -- Find records that already exist by natural key + SELECT t.id, ti.old_id + FROM roles t + INNER JOIN to_insert ti ON t.name IS NOT DISTINCT FROM ti.name +), +inserted AS ( + -- Insert only new records (skip existing) + INSERT INTO roles (name, permissions) + SELECT name, permissions FROM to_insert + WHERE old_id NOT IN (SELECT old_id FROM existing) + RETURNING id, name +) +-- Map old IDs to new IDs for FK resolution +... +``` + ## Configuration Key environment variables (see `.env.example` for full reference): diff --git a/src/pgslice/cli.py b/src/pgslice/cli.py index 8a99841..740ad60 100644 --- a/src/pgslice/cli.py +++ b/src/pgslice/cli.py @@ -81,6 +81,74 @@ def parse_main_timeframe(spec: str) -> MainTableTimeframe: ) +def parse_natural_keys(spec: str) -> dict[str, list[str]]: + """ + Parse natural keys specification. + + Format: schema.table=col1,col2;other_table=col1 + Example: public.roles=name;public.statuses=code + + Args: + spec: Natural keys specification string + + Returns: + Dict mapping "schema.table" to list of column names + + Raises: + InvalidTimeframeError: If specification is invalid + + Examples: + >>> parse_natural_keys("public.roles=name") + {"public.roles": ["name"]} + + >>> parse_natural_keys("public.roles=name;statuses=code") + {"public.roles": ["name"], "statuses": ["code"]} + + >>> parse_natural_keys("roles=name,code") + {"roles": ["name", "code"]} + """ + result: dict[str, list[str]] = {} + + # Split by semicolon to get individual table specifications + table_specs = spec.split(";") + + for table_spec in table_specs: + table_spec = table_spec.strip() + if not table_spec: + continue + + # Split by = to get table and columns + if "=" not in table_spec: + raise InvalidTimeframeError( + f"Invalid natural key format: {table_spec}. " + "Expected: table=col1,col2 or schema.table=col1" + ) + + table_part, columns_part = table_spec.split("=", 1) + table_part = table_part.strip() + columns_part = columns_part.strip() + + if not table_part or not columns_part: + raise InvalidTimeframeError( + f"Invalid natural key format: {table_spec}. " + "Both table and columns must be specified" + ) + + # Split columns by comma + columns = [col.strip() for col in columns_part.split(",")] + columns = [col for col in columns if col] # Remove empty strings + + if not columns: + raise InvalidTimeframeError( + f"Invalid natural key format: {table_spec}. " + "At least one column must be specified" + ) + + result[table_part] = columns + + return result + + def fetch_pks_by_timeframe( conn_manager: ConnectionManager, table: str, @@ -407,6 +475,15 @@ def main() -> int: "-o", help="Output file path (default: stdout)", ) + dump_group.add_argument( + "--natural-keys", + help=( + "Manually specify natural keys for tables without unique constraints. " + "Format: 'schema.table=col1,col2;other_table=col1'. " + "Enables idempotent INSERTs for tables with auto-generated PKs. " + "Example: 'public.roles=name;public.statuses=code'" + ), + ) # Other arguments parser.add_argument( @@ -456,6 +533,14 @@ def main() -> int: if args.log_level: config.log_level = args.log_level + # Parse natural keys if provided + if args.natural_keys: + try: + config.natural_keys = parse_natural_keys(args.natural_keys) + except InvalidTimeframeError as e: + sys.stderr.write(f"Error: {e}\n") + return 1 + # Validate CLI dump mode arguments if args.dump and not args.pks and not args.timeframe: sys.stderr.write( diff --git a/src/pgslice/config.py b/src/pgslice/config.py index 40a6092..f32458b 100644 --- a/src/pgslice/config.py +++ b/src/pgslice/config.py @@ -46,6 +46,7 @@ class AppConfig: sql_batch_size: int = 100 output_dir: Path = Path.home() / ".pgslice" / "dumps" create_schema: bool = False + natural_keys: dict[str, list[str]] | None = None def load_config() -> AppConfig: diff --git a/src/pgslice/dumper/dump_service.py b/src/pgslice/dumper/dump_service.py index 33ad2da..339e9dd 100644 --- a/src/pgslice/dumper/dump_service.py +++ b/src/pgslice/dumper/dump_service.py @@ -128,7 +128,9 @@ def dump( # Step 3: Generate SQL (using animated spinner) with animated_spinner(spinner, pbar.set_description, "Generating SQL"): generator = SQLGenerator( - introspector, batch_size=self.config.sql_batch_size + introspector, + batch_size=self.config.sql_batch_size, + natural_keys=self.config.natural_keys, ) sql = generator.generate_batch( sorted_records, diff --git a/src/pgslice/dumper/sql_generator.py b/src/pgslice/dumper/sql_generator.py index 30a1942..7fff0ca 100644 --- a/src/pgslice/dumper/sql_generator.py +++ b/src/pgslice/dumper/sql_generator.py @@ -10,6 +10,7 @@ from ..db.schema import SchemaIntrospector from ..graph.models import RecordData, RecordIdentifier, Table +from ..utils.exceptions import SchemaError from ..utils.logging_config import get_logger from .ddl_generator import DDLGenerator @@ -20,7 +21,10 @@ class SQLGenerator: """Generates INSERT statements from record data.""" def __init__( - self, schema_introspector: SchemaIntrospector, batch_size: int = 100 + self, + schema_introspector: SchemaIntrospector, + batch_size: int = 100, + natural_keys: dict[str, list[str]] | None = None, ) -> None: """ Initialize SQL generator. @@ -28,12 +32,17 @@ def __init__( Args: schema_introspector: Schema introspection utility for table metadata batch_size: Number of rows per INSERT statement (0 or -1 = unlimited) + natural_keys: Manual natural key overrides (format: {"schema.table": ["col1", "col2"]}) """ self.introspector = schema_introspector # 0 or -1 means unlimited batch size self.batch_size = batch_size if batch_size > 0 else 999999 + # Manual natural key overrides from config/CLI + self.natural_keys = natural_keys or {} # Cache column type mappings per table to avoid repeated lookups self._column_type_cache: dict[tuple[str, str], dict[str, tuple[str, str]]] = {} + # Cache natural key detection results per table + self._natural_key_cache: dict[tuple[str, str], list[str]] = {} def generate_bulk_insert(self, records: list[RecordData]) -> str: """ @@ -651,6 +660,26 @@ def _generate_batch_with_plpgsql_remapping( ] ) + # Add sequence synchronization to prevent conflicts + if tables_with_remapped_ids: + sql_parts.append(" -- Synchronize sequences to prevent ID conflicts") + for schema, table in tables_with_remapped_ids: + auto_gen_pks = self._get_auto_generated_pk_columns(schema, table) + for pk_col in auto_gen_pks: + try: + seq_name = self._get_sequence_name(schema, table, pk_col) + full_table_name = f'"{schema}"."{table}"' + sql_parts.append( + f" PERFORM setval('{seq_name}', " + f'COALESCE((SELECT MAX("{pk_col}") FROM {full_table_name}), 1));' + ) + except SchemaError: + # Column might not have a sequence (e.g., UUID PKs) + logger.debug( + f"Skipping sequence sync for {schema}.{table}.{pk_col} (no sequence)" + ) + sql_parts.append("") # Blank line after sequence sync + # 5. Generate INSERT statements for each table for (schema, table), table_records in records_by_table.items(): full_table_name = f'"{schema}"."{table}"' @@ -737,6 +766,38 @@ def _has_auto_generated_pks(self, schema: str, table: str) -> bool: """Check if table has any auto-generated PK columns.""" return len(self._get_auto_generated_pk_columns(schema, table)) > 0 + def _get_sequence_name(self, schema: str, table: str, column: str) -> str: + """ + Get the sequence name for an auto-generated column. + + Uses pg_get_serial_sequence() to find the sequence associated with a column. + This works for SERIAL, BIGSERIAL, and columns with explicit DEFAULT nextval(). + + Args: + schema: Schema name + table: Table name + column: Column name + + Returns: + Fully qualified sequence name (e.g., 'public.users_id_seq') + + Raises: + SchemaError: If column has no associated sequence + """ + query = "SELECT pg_get_serial_sequence(%s, %s)" + full_table = f"{schema}.{table}" + + with self.introspector.conn.cursor() as cur: + cur.execute(query, (full_table, column)) + result = cur.fetchone() + + if result and result[0]: + return str(result[0]) + + raise SchemaError( + f"Column {schema}.{table}.{column} has no associated sequence" + ) + def _parse_table_name(self, qualified_name: str) -> tuple[str, str]: """ Parse a fully qualified table name into (schema, table). @@ -838,30 +899,185 @@ def _build_fk_remapping_value( f"WHERE table_name='{target_table_full}' AND old_id='{old_id_str}')" ) + def _build_natural_key_join_condition( + self, natural_keys: list[str], left_alias: str, right_alias: str + ) -> str: + """ + Build JOIN condition for natural key matching that handles NULLs correctly. + + Uses IS NOT DISTINCT FROM to treat NULL as a distinct value that can match. + + Args: + natural_keys: List of column names forming the natural key + left_alias: Alias for left table in JOIN + right_alias: Alias for right table in JOIN + + Returns: + SQL JOIN condition string + + Example: + _build_natural_key_join_condition(["name"], "t", "ti") + → 't."name" IS NOT DISTINCT FROM ti."name"' + + _build_natural_key_join_condition(["tenant_id", "code"], "t", "ti") + → 't."tenant_id" IS NOT DISTINCT FROM ti."tenant_id" AND t."code" IS NOT DISTINCT FROM ti."code"' + """ + conditions = [ + f'{left_alias}."{nk}" IS NOT DISTINCT FROM {right_alias}."{nk}"' + for nk in natural_keys + ] + return " AND ".join(conditions) + + def _detect_natural_keys( + self, + schema: str, + table: str, + ) -> list[str]: + """ + Detect columns that likely represent natural keys for idempotency. + + Natural keys are columns that logically should be unique even without + explicit unique constraints. Used for generating idempotent INSERTs + when ON CONFLICT cannot be used. + + Priority order: + 1. Manual overrides from --natural-keys CLI option (self.natural_keys) + 2. Common unique column names (name, code, slug, email, etc.) + 3. Reference table pattern (2-3 columns with single non-PK VARCHAR) + 4. Composite patterns (tenant_id + code, etc.) + + Args: + schema: Schema name + table: Table name + + Returns: + List of column names that form natural key, or empty list if none detected + """ + # Check cache first + cache_key = (schema, table) + if cache_key in self._natural_key_cache: + return self._natural_key_cache[cache_key] + + # PRIORITY 1: Manual overrides from CLI (self.natural_keys) + if self.natural_keys: + full_table_name = f"{schema}.{table}" + if full_table_name in self.natural_keys: + natural_keys = self.natural_keys[full_table_name] + self._natural_key_cache[cache_key] = natural_keys + logger.debug( + f"Using manual natural keys for {schema}.{table}: {natural_keys}" + ) + return natural_keys + + # Also try without schema prefix (for convenience) + if table in self.natural_keys: + natural_keys = self.natural_keys[table] + self._natural_key_cache[cache_key] = natural_keys + logger.debug( + f"Using manual natural keys for {schema}.{table}: {natural_keys}" + ) + return natural_keys + + # Get table metadata + table_meta = self.introspector.get_table_metadata(schema, table) + + # Filter to non-PK, non-nullable columns + candidate_columns = [ + col + for col in table_meta.columns + if not col.is_primary_key + and not col.nullable + and col.data_type in ("character varying", "text", "varchar") + ] + + if not candidate_columns: + self._natural_key_cache[cache_key] = [] + return [] + + # PRIORITY 2: Common unique column names (single column) + common_unique_names = { + "name", + "code", + "slug", + "email", + "username", + "key", + "identifier", + "handle", + } + common_unique_patterns = ["_code", "_key", "_identifier", "_slug"] + + for col in candidate_columns: + col_lower = col.name.lower() + # Exact match + if col_lower in common_unique_names: + natural_keys = [col.name] + self._natural_key_cache[cache_key] = natural_keys + logger.info( + f"Auto-detected natural key for {schema}.{table}: " + f"{natural_keys} (common name pattern)" + ) + return natural_keys + + # Pattern match + for pattern in common_unique_patterns: + if col_lower.endswith(pattern): + natural_keys = [col.name] + self._natural_key_cache[cache_key] = natural_keys + logger.info( + f"Auto-detected natural key for {schema}.{table}: " + f"{natural_keys} (pattern: *{pattern})" + ) + return natural_keys + + # PRIORITY 3: Reference table pattern + # Table has 2-3 total columns with exactly ONE non-PK non-nullable VARCHAR + total_columns = len(table_meta.columns) + if 2 <= total_columns <= 3 and len(candidate_columns) == 1: + natural_keys = [candidate_columns[0].name] + self._natural_key_cache[cache_key] = natural_keys + logger.info( + f"Auto-detected natural key for {schema}.{table}: " + f"{natural_keys} (reference table pattern)" + ) + return natural_keys + + # PRIORITY 4: No natural keys detected + self._natural_key_cache[cache_key] = [] + return [] + def _build_on_conflict_clause( self, table_meta: Table, insert_columns: list[str], auto_gen_pks: list[str], - ) -> str: + schema: str, + table: str, + ) -> tuple[str, list[str] | None]: """ - Build ON CONFLICT clause for primary keys or unique constraints. + Build ON CONFLICT clause OR detect natural keys for idempotency. Uses DO UPDATE with a no-op update to always get RETURNING values. This makes the SQL idempotent - reusing existing records instead of failing. Priority order: - 1. Non-auto-generated primary keys (string PKs, UUIDs, manual IDs) - 2. Unique constraints (existing behavior) - 3. Empty string if all PKs are auto-generated and no unique constraints + 1. Non-auto-generated primary keys (string PKs, UUIDs, manual IDs) → ON CONFLICT + 2. Unique constraints → ON CONFLICT + 3. Natural keys (auto-detected or manual from self.natural_keys) → CTE pattern + 4. No idempotency available → ERROR Args: table_meta: Table metadata with primary keys and unique constraints insert_columns: Columns being inserted auto_gen_pks: Auto-generated PK columns (excluded from ON CONFLICT) + schema: Schema name (for natural key detection) + table: Table name (for natural key detection) Returns: - ON CONFLICT clause string, or empty string if no conflict target available + Tuple of (on_conflict_sql, natural_keys): + - If on_conflict_sql != "": use traditional ON CONFLICT, natural_keys is None + - If natural_keys is not None: use CTE pattern, on_conflict_sql is "" + - Both empty: error case (should never happen, raises exception) """ # PRIORITY 1: Check for non-auto-generated primary keys # These are string PKs, UUIDs, or manually-set integer PKs @@ -882,7 +1098,7 @@ def _build_on_conflict_clause( f"ON CONFLICT ({conflict_cols}) " f'DO UPDATE SET "{update_col}" = EXCLUDED."{update_col}"' ) - return on_conflict + return (on_conflict, None) # PRIORITY 2: Check for unique constraints (existing logic) unique_constraints = table_meta.unique_constraints @@ -895,30 +1111,36 @@ def _build_on_conflict_clause( if not all(c in auto_gen_pks for c in cols) } - if not non_pk_unique: - return "" - - # Use the first unique constraint for ON CONFLICT - # (could be improved to choose best constraint, but any will work) - constraint_name, constraint_cols = next(iter(non_pk_unique.items())) - - # Check if all constraint columns are in insert_columns - # (if not, we can't use this constraint for ON CONFLICT) - if not all(col in insert_columns for col in constraint_cols): - return "" + if non_pk_unique: + # Use the first unique constraint for ON CONFLICT + # (could be improved to choose best constraint, but any will work) + constraint_name, constraint_cols = next(iter(non_pk_unique.items())) - conflict_cols = ", ".join(f'"{col}"' for col in constraint_cols) + # Check if all constraint columns are in insert_columns + if all(col in insert_columns for col in constraint_cols): + conflict_cols = ", ".join(f'"{col}"' for col in constraint_cols) - # Generate no-op UPDATE clause - # Pick the first column in the constraint for the update - update_col = constraint_cols[0] - on_conflict = ( - f"ON CONFLICT ({conflict_cols}) " - f'DO UPDATE SET "{update_col}" = EXCLUDED."{update_col}"' + # Generate no-op UPDATE clause + # Pick the first column in the constraint for the update + update_col = constraint_cols[0] + on_conflict = ( + f"ON CONFLICT ({conflict_cols}) " + f'DO UPDATE SET "{update_col}" = EXCLUDED."{update_col}"' + ) + return (on_conflict, None) + + # PRIORITY 3: Natural key detection + natural_keys = self._detect_natural_keys(schema, table) + if natural_keys and all(nk in insert_columns for nk in natural_keys): + return ("", natural_keys) + + # PRIORITY 4: No idempotency available - ERROR + raise SchemaError( + f'Cannot generate idempotent SQL for table "{schema}"."{table}". ' + f"Table has auto-generated primary keys with no unique constraints. " + f'Please specify natural keys using: --natural-keys "{schema}.{table}=col1,col2"' ) - return on_conflict - def _generate_insert_with_remapping( self, schema: str, table: str, records: list[RecordData] ) -> str: @@ -957,11 +1179,22 @@ def _generate_insert_with_remapping( values_clause = ",\n".join(values_rows) full_table_name = f'"{schema}"."{table}"' - # Build ON CONFLICT clause for unique constraints - on_conflict = self._build_on_conflict_clause( - table_meta, insert_columns, auto_gen_pks + # Build ON CONFLICT clause for unique constraints (or detect natural keys) + on_conflict, natural_keys = self._build_on_conflict_clause( + table_meta, insert_columns, auto_gen_pks, schema, table ) + # Route to natural key CTE pattern if natural keys detected + if natural_keys: + logger.debug( + f"Using natural key CTE pattern for {schema}.{table} " + f"with keys: {natural_keys}" + ) + return self._generate_insert_with_natural_key_check( + schema, table, records, natural_keys, auto_gen_pks + ) + + # Use traditional ON CONFLICT approach if len(records) == 1: # Single insert: use RETURNING INTO scalar variable sql_lines = [ @@ -1003,6 +1236,138 @@ def _generate_insert_with_remapping( return "\n".join(sql_lines) + def _generate_insert_with_natural_key_check( + self, + schema: str, + table: str, + records: list[RecordData], + natural_keys: list[str], + auto_gen_pks: list[str], + ) -> str: + """ + Generate INSERT with natural key checking using CTE pattern. + + Uses WHERE NOT EXISTS pattern to check for existing records by natural key. + This enables idempotent INSERTs when ON CONFLICT cannot be used + (no unique constraints on natural key columns). + + Algorithm: + 1. Create to_insert CTE with old_ids and data + 2. Find existing records by natural key match (existing CTE) + 3. INSERT only records not in existing (inserted CTE) + 4. Join inserted records back to old_ids via natural key (inserted_with_old_ids CTE) + 5. Combine existing and inserted IDs (all_ids CTE) + 6. Aggregate in old_id order to maintain FK mapping alignment + + Args: + schema: Schema name + table: Table name + records: List of records to insert + natural_keys: List of column names forming natural key + auto_gen_pks: List of auto-generated PK column names + + Returns: + PL/pgSQL code for CTE-based INSERT with natural key checking + """ + if not natural_keys: + raise ValueError("natural_keys must be non-empty") + + if not auto_gen_pks: + raise ValueError("auto_gen_pks must be non-empty for this method") + + # Get all columns EXCEPT auto-generated PKs + first_record = records[0] + all_columns = sorted(first_record.data.keys()) + insert_columns = [col for col in all_columns if col not in auto_gen_pks] + + # Verify natural keys are in insert columns + missing_nk = [nk for nk in natural_keys if nk not in insert_columns] + if missing_nk: + raise SchemaError( + f"Natural key columns {missing_nk} not found in " + f"insert columns for {schema}.{table}" + ) + + # Build column list + columns_sql = ", ".join(f'"{col}"' for col in insert_columns) + natural_keys_sql = ", ".join(f'"{nk}"' for nk in natural_keys) + + # Get column type mapping + column_type_map = self._get_column_types(schema, table) + + # Build VALUES rows and collect old PK values + values_rows = [] + old_pk_values = [] + for record in records: + values = [ + self._format_value(record.data.get(col), column_type_map.get(col)) + for col in insert_columns + ] + values_sql = ", ".join(values) + values_rows.append(f" ({values_sql})") + + old_pks = record.identifier.pk_values + old_pk_values.append(self._serialize_pk_value(old_pks)) + + values_clause = ",\n".join(values_rows) + full_table_name = f'"{schema}"."{table}"' + pk_col = auto_gen_pks[0] # Use first PK column + + # Build natural key JOIN conditions + nk_join_existing = self._build_natural_key_join_condition( + natural_keys, "t", "ti" + ) + nk_join_inserted = self._build_natural_key_join_condition( + natural_keys, "ins", "ti" + ) + + old_ids_array = ", ".join(f"'{val}'" for val in old_pk_values) + + # Generate CTE-based INSERT + sql_lines = [ + f" v_old_ids_{table} := ARRAY[{old_ids_array}];", + " WITH to_insert AS (", + " SELECT", + f" unnest(v_old_ids_{table}) AS old_id,", + " *", + " FROM (VALUES", + f"{values_clause}", + f" ) AS data({columns_sql})", + " ),", + " existing AS (", + f" SELECT t.{pk_col} AS new_id, ti.old_id", + f" FROM {full_table_name} t", + " INNER JOIN to_insert ti", + f" ON {nk_join_existing}", + " ),", + " inserted AS (", + f" INSERT INTO {full_table_name} ({columns_sql})", + f" SELECT {columns_sql}", + " FROM to_insert", + " WHERE old_id NOT IN (SELECT old_id FROM existing)", + f" RETURNING {pk_col}, {natural_keys_sql}", + " ),", + " inserted_with_old_ids AS (", + f" SELECT ins.{pk_col} AS new_id, ti.old_id", + " FROM inserted ins", + " INNER JOIN to_insert ti", + f" ON {nk_join_inserted}", + " ),", + " all_ids AS (", + " SELECT old_id, new_id FROM existing", + " UNION ALL", + " SELECT old_id, new_id FROM inserted_with_old_ids", + " )", + f" SELECT array_agg(new_id ORDER BY old_id) INTO v_new_ids_{table}", + " FROM all_ids;", + " ", + f" FOR i IN 1..array_length(v_new_ids_{table}, 1) LOOP", + f" INSERT INTO _pgslice_id_map VALUES ('{full_table_name}', v_old_ids_{table}[i], v_new_ids_{table}[i]::TEXT);", + " END LOOP;", + ] + + return "\n".join(sql_lines) + def _generate_insert_with_fk_remapping( self, schema: str, @@ -1069,12 +1434,21 @@ def _generate_insert_with_fk_remapping( values_rows.append(f" ({values_sql})") values_clause = ",\n".join(values_rows) - # Build ON CONFLICT clause for idempotency + # Build ON CONFLICT clause for idempotency (or detect natural keys) table_meta = self.introspector.get_table_metadata(schema, table) - on_conflict = self._build_on_conflict_clause( - table_meta, columns, auto_gen_pks + on_conflict, natural_keys = self._build_on_conflict_clause( + table_meta, columns, auto_gen_pks, schema, table ) + # Note: Natural key CTE pattern not implemented for FK remapping case yet + # Fall back to ON CONFLICT (which will error if natural_keys and no unique constraint) + if natural_keys: + logger.warning( + f"Natural keys detected for {schema}.{table} but FK remapping is needed. " + f"Natural key + FK remapping not yet supported. " + f"Attempting ON CONFLICT fallback (may fail)." + ) + sql_parts = [ f" INSERT INTO {full_table_name} ({columns_sql})", " VALUES", @@ -1206,11 +1580,19 @@ def _generate_insert_with_fk_remapping( if has_auto_gen_pks: table_meta = self.introspector.get_table_metadata(schema, table) - # Build ON CONFLICT clause for idempotency - on_conflict = self._build_on_conflict_clause( - table_meta, columns, auto_gen_pks + # Build ON CONFLICT clause for idempotency (or detect natural keys) + on_conflict, natural_keys = self._build_on_conflict_clause( + table_meta, columns, auto_gen_pks, schema, table ) + # Note: Natural key CTE pattern not implemented for FK remapping case yet + if natural_keys: + logger.warning( + f"Natural keys detected for {schema}.{table} but FK remapping is needed. " + f"Natural key + FK remapping not yet supported. " + f"Attempting ON CONFLICT fallback (may fail)." + ) + if len(records) == 1: # Single insert: use RETURNING INTO scalar variable sql_lines = base_sql_lines.copy() @@ -1255,11 +1637,19 @@ def _generate_insert_with_fk_remapping( # No auto-gen PKs: return simple INSERT-SELECT with ON CONFLICT table_meta = self.introspector.get_table_metadata(schema, table) - # Build ON CONFLICT clause for idempotency - on_conflict = self._build_on_conflict_clause( - table_meta, columns, auto_gen_pks + # Build ON CONFLICT clause for idempotency (or detect natural keys) + on_conflict, natural_keys = self._build_on_conflict_clause( + table_meta, columns, auto_gen_pks, schema, table ) + # Note: Natural key CTE pattern not implemented for FK remapping case yet + if natural_keys: + logger.warning( + f"Natural keys detected for {schema}.{table} but FK remapping is needed. " + f"Natural key + FK remapping not yet supported. " + f"Attempting ON CONFLICT fallback (may fail)." + ) + sql_lines = base_sql_lines.copy() # Remove trailing semicolon if present sql_lines[-1] = sql_lines[-1].rstrip(";") diff --git a/tests/unit/dumper/test_sql_generator.py b/tests/unit/dumper/test_sql_generator.py index 775f36c..825e975 100644 --- a/tests/unit/dumper/test_sql_generator.py +++ b/tests/unit/dumper/test_sql_generator.py @@ -764,8 +764,11 @@ def test_insert_with_returning_bulk_records( sql = generator.generate_batch(records, keep_pks=False) - # Verify WITH clause and array aggregation - assert "WITH inserted AS" in sql + # Verify natural key CTE pattern (new format with to_insert, existing, inserted, etc.) + assert "WITH to_insert AS" in sql # Natural key CTE pattern + assert "existing AS" in sql # Checks for existing records + assert "inserted AS" in sql # Actual INSERT statement + assert "IS NOT DISTINCT FROM" in sql # NULL-safe natural key comparison assert "array_agg" in sql assert "v_new_ids" in sql assert "FOR i IN 1..array_length" in sql @@ -976,10 +979,14 @@ def test_build_on_conflict_with_unique_constraints( generator = SQLGenerator(mock_introspector) # Test the helper method directly - result = generator._build_on_conflict_clause(table, ["id", "email"], []) + on_conflict, natural_keys = generator._build_on_conflict_clause( + table, ["id", "email"], [], "public", "users" + ) - if result: # ON CONFLICT clause generated - assert "ON CONFLICT" in result or result == "" + # Should use unique constraint + assert on_conflict != "" + assert natural_keys is None + assert "ON CONFLICT" in on_conflict def test_build_on_conflict_without_constraints( self, mock_introspector: MagicMock @@ -1007,13 +1014,16 @@ def test_build_on_conflict_without_constraints( mock_introspector.get_table_metadata.return_value = table generator = SQLGenerator(mock_introspector) - result = generator._build_on_conflict_clause(table, ["id"], []) + on_conflict, natural_keys = generator._build_on_conflict_clause( + table, ["id"], [], "public", "logs" + ) # Should build ON CONFLICT for non-auto-generated PK - assert "ON CONFLICT" in result - assert '("id")' in result - assert "DO UPDATE SET" in result - assert '"id" = EXCLUDED."id"' in result + assert natural_keys is None + assert "ON CONFLICT" in on_conflict + assert '("id")' in on_conflict + assert "DO UPDATE SET" in on_conflict + assert '"id" = EXCLUDED."id"' in on_conflict def test_build_on_conflict_with_string_pk( self, mock_introspector: MagicMock @@ -1048,17 +1058,20 @@ def test_build_on_conflict_with_string_pk( generator = SQLGenerator(mock_introspector) # Test the helper method directly - result = generator._build_on_conflict_clause( + on_conflict, natural_keys = generator._build_on_conflict_clause( table, ["id", "name"], [], # No auto-gen PKs + "public", + "shipments_shipmentstate", ) # Should generate ON CONFLICT for the string PK - assert "ON CONFLICT" in result - assert '("id")' in result - assert "DO UPDATE SET" in result - assert '"id" = EXCLUDED."id"' in result + assert natural_keys is None + assert "ON CONFLICT" in on_conflict + assert '("id")' in on_conflict + assert "DO UPDATE SET" in on_conflict + assert '"id" = EXCLUDED."id"' in on_conflict def test_build_on_conflict_with_composite_string_pks( self, mock_introspector: MagicMock @@ -1100,16 +1113,21 @@ def test_build_on_conflict_with_composite_string_pks( mock_introspector.get_table_metadata.return_value = table generator = SQLGenerator(mock_introspector) - result = generator._build_on_conflict_clause( - table, ["entity_a_id", "entity_b_id", "created_at"], [] + on_conflict, natural_keys = generator._build_on_conflict_clause( + table, + ["entity_a_id", "entity_b_id", "created_at"], + [], + "public", + "junction_table", ) # Should generate ON CONFLICT with both PKs - assert "ON CONFLICT" in result - assert '("entity_a_id", "entity_b_id")' in result - assert "DO UPDATE SET" in result + assert natural_keys is None + assert "ON CONFLICT" in on_conflict + assert '("entity_a_id", "entity_b_id")' in on_conflict + assert "DO UPDATE SET" in on_conflict # Should use first PK for no-op update - assert '"entity_a_id" = EXCLUDED."entity_a_id"' in result + assert '"entity_a_id" = EXCLUDED."entity_a_id"' in on_conflict def test_build_on_conflict_with_mixed_pks( self, mock_introspector: MagicMock @@ -1152,22 +1170,25 @@ def test_build_on_conflict_with_mixed_pks( generator = SQLGenerator(mock_introspector) # Only "version" is being inserted (id is excluded as auto-gen) - result = generator._build_on_conflict_clause( + on_conflict, natural_keys = generator._build_on_conflict_clause( table, ["version", "data"], ["id"], # id is auto-gen + "public", + "versioned_data", ) # Should generate ON CONFLICT for non-auto-gen PK only - assert "ON CONFLICT" in result - assert '("version")' in result - assert "DO UPDATE SET" in result - assert '"version" = EXCLUDED."version"' in result + assert natural_keys is None + assert "ON CONFLICT" in on_conflict + assert '("version")' in on_conflict + assert "DO UPDATE SET" in on_conflict + assert '"version" = EXCLUDED."version"' in on_conflict def test_build_on_conflict_all_auto_gen_pks_no_unique( self, mock_introspector: MagicMock ) -> None: - """Should return empty string when all PKs are auto-generated and no unique constraints.""" + """Should detect natural key when all PKs are auto-generated and no unique constraints.""" table = Table( schema_name="public", table_name="products", @@ -1196,14 +1217,17 @@ def test_build_on_conflict_all_auto_gen_pks_no_unique( mock_introspector.get_table_metadata.return_value = table generator = SQLGenerator(mock_introspector) - result = generator._build_on_conflict_clause( + on_conflict, natural_keys = generator._build_on_conflict_clause( table, ["name"], ["id"], # id is auto-gen and excluded + "public", + "products", ) - # Should return empty string (no ON CONFLICT needed) - assert result == "" + # Should detect "name" as natural key (PRIORITY 3) + assert on_conflict == "" + assert natural_keys == ["name"] def test_build_on_conflict_string_pk_takes_priority_over_unique( self, mock_introspector: MagicMock @@ -1237,13 +1261,16 @@ def test_build_on_conflict_string_pk_takes_priority_over_unique( mock_introspector.get_table_metadata.return_value = table generator = SQLGenerator(mock_introspector) - result = generator._build_on_conflict_clause(table, ["username", "email"], []) + on_conflict, natural_keys = generator._build_on_conflict_clause( + table, ["username", "email"], [], "public", "users" + ) # Should use PK, not unique constraint - assert "ON CONFLICT" in result - assert '("username")' in result # PK, not email - assert "DO UPDATE SET" in result - assert '"username" = EXCLUDED."username"' in result + assert natural_keys is None # Using ON CONFLICT, not natural keys + assert "ON CONFLICT" in on_conflict + assert '("username")' in on_conflict # PK, not email + assert "DO UPDATE SET" in on_conflict + assert '"username" = EXCLUDED."username"' in on_conflict def test_generate_batch_plpgsql_string_pk_idempotent( self, mock_introspector: MagicMock @@ -2435,3 +2462,498 @@ def test_multiple_reserved_keywords(self, mock_introspector: MagicMock) -> None: assert '"order"' in sql # Verify they're in the column list assert '("group", "id", "order", "references", "user")' in sql + + +class TestNaturalKeyDetection(TestSQLGenerator): + """Tests for natural key detection and idempotency.""" + + def test_detect_natural_keys_common_names( + self, mock_introspector: MagicMock + ) -> None: + """Should detect common unique column names as natural keys.""" + # Test "name" column + table_with_name = Table( + schema_name="public", + table_name="roles", + columns=[ + Column( + name="id", + data_type="integer", + udt_name="int4", + nullable=False, + is_primary_key=True, + is_auto_generated=True, + ), + Column( + name="name", + data_type="text", + udt_name="text", + nullable=False, + ), + ], + primary_keys=["id"], + foreign_keys_outgoing=[], + foreign_keys_incoming=[], + ) + + mock_introspector.get_table_metadata.return_value = table_with_name + generator = SQLGenerator(mock_introspector) + + natural_keys = generator._detect_natural_keys("public", "roles") + + assert natural_keys == ["name"] + + # Test "code" column + table_with_code = Table( + schema_name="public", + table_name="statuses", + columns=[ + Column( + name="id", + data_type="integer", + udt_name="int4", + nullable=False, + is_primary_key=True, + is_auto_generated=True, + ), + Column( + name="code", + data_type="varchar", + udt_name="varchar", + nullable=False, + ), + ], + primary_keys=["id"], + foreign_keys_outgoing=[], + foreign_keys_incoming=[], + ) + + mock_introspector.get_table_metadata.return_value = table_with_code + generator = SQLGenerator(mock_introspector) + + natural_keys = generator._detect_natural_keys("public", "statuses") + + assert natural_keys == ["code"] + + # Test pattern match "_code" + table_with_pattern = Table( + schema_name="public", + table_name="countries", + columns=[ + Column( + name="id", + data_type="integer", + udt_name="int4", + nullable=False, + is_primary_key=True, + is_auto_generated=True, + ), + Column( + name="country_code", + data_type="varchar", + udt_name="varchar", + nullable=False, + ), + ], + primary_keys=["id"], + foreign_keys_outgoing=[], + foreign_keys_incoming=[], + ) + + mock_introspector.get_table_metadata.return_value = table_with_pattern + generator = SQLGenerator(mock_introspector) + + natural_keys = generator._detect_natural_keys("public", "countries") + + assert natural_keys == ["country_code"] + + def test_detect_natural_keys_reference_table( + self, mock_introspector: MagicMock + ) -> None: + """Should detect natural key in reference table pattern (2-3 columns).""" + table = Table( + schema_name="shipments", + table_name="shipmentreprole", + columns=[ + Column( + name="id", + data_type="integer", + udt_name="int4", + nullable=False, + is_primary_key=True, + is_auto_generated=True, + ), + Column( + name="name", + data_type="varchar", + udt_name="varchar", + nullable=False, + ), + ], + primary_keys=["id"], + foreign_keys_outgoing=[], + foreign_keys_incoming=[], + ) + + mock_introspector.get_table_metadata.return_value = table + generator = SQLGenerator(mock_introspector) + + natural_keys = generator._detect_natural_keys("shipments", "shipmentreprole") + + # Should detect reference table pattern (2 columns, 1 non-PK VARCHAR) + assert natural_keys == ["name"] + + def test_detect_natural_keys_manual_override( + self, mock_introspector: MagicMock + ) -> None: + """Should use manual override when provided via CLI.""" + table = Table( + schema_name="public", + table_name="products", + columns=[ + Column( + name="id", + data_type="integer", + udt_name="int4", + nullable=False, + is_primary_key=True, + is_auto_generated=True, + ), + Column( + name="sku", + data_type="varchar", + udt_name="varchar", + nullable=False, + ), + Column( + name="name", + data_type="text", + udt_name="text", + nullable=False, + ), + ], + primary_keys=["id"], + foreign_keys_outgoing=[], + foreign_keys_incoming=[], + ) + + mock_introspector.get_table_metadata.return_value = table + + # Create generator with manual override + manual_keys = {"public.products": ["sku"]} + generator = SQLGenerator(mock_introspector, natural_keys=manual_keys) + + natural_keys = generator._detect_natural_keys("public", "products") + + # Should use manual override, not auto-detected "name" + assert natural_keys == ["sku"] + + # Test without schema prefix + manual_keys_no_schema = {"products": ["sku", "name"]} + generator = SQLGenerator(mock_introspector, natural_keys=manual_keys_no_schema) + + natural_keys = generator._detect_natural_keys("public", "products") + + # Should match table name without schema + assert natural_keys == ["sku", "name"] + + def test_build_on_conflict_with_natural_keys( + self, mock_introspector: MagicMock + ) -> None: + """Should return natural keys when no ON CONFLICT clause possible.""" + table = Table( + schema_name="public", + table_name="roles", + columns=[ + Column( + name="id", + data_type="integer", + udt_name="int4", + nullable=False, + is_primary_key=True, + is_auto_generated=True, + ), + Column( + name="name", + data_type="text", + udt_name="text", + nullable=False, + ), + ], + primary_keys=["id"], + foreign_keys_outgoing=[], + foreign_keys_incoming=[], + unique_constraints={}, + ) + + mock_introspector.get_table_metadata.return_value = table + generator = SQLGenerator(mock_introspector) + + on_conflict, natural_keys = generator._build_on_conflict_clause( + table, ["name"], ["id"], "public", "roles" + ) + + # Should return empty ON CONFLICT and natural keys list + assert on_conflict == "" + assert natural_keys == ["name"] + + def test_generate_insert_with_natural_key_check_single( + self, mock_introspector: MagicMock + ) -> None: + """Should generate CTE-based INSERT for single record with natural key.""" + table = Table( + schema_name="public", + table_name="roles", + columns=[ + Column( + name="id", + data_type="integer", + udt_name="int4", + nullable=False, + is_primary_key=True, + is_auto_generated=True, + ), + Column( + name="name", + data_type="text", + udt_name="text", + nullable=False, + ), + ], + primary_keys=["id"], + foreign_keys_outgoing=[], + foreign_keys_incoming=[], + ) + + mock_introspector.get_table_metadata.return_value = table + generator = SQLGenerator(mock_introspector) + + records = [ + RecordData( + identifier=RecordIdentifier( + table_name="roles", schema_name="public", pk_values=("1",) + ), + data={"id": 1, "name": "Admin"}, + dependencies=[], + ) + ] + + sql = generator._generate_insert_with_natural_key_check( + "public", "roles", records, ["name"], ["id"] + ) + + # Verify CTE structure + assert "WITH to_insert AS" in sql + assert "existing AS" in sql + assert "inserted AS" in sql + assert "all_ids AS" in sql + # Verify NULL-safe comparison + assert "IS NOT DISTINCT FROM" in sql + # Verify ordering for FK mapping + assert "ORDER BY old_id" in sql + # Verify data + assert "'Admin'" in sql + + def test_generate_insert_with_natural_key_check_bulk( + self, mock_introspector: MagicMock + ) -> None: + """Should generate CTE-based INSERT for bulk records with natural key.""" + table = Table( + schema_name="public", + table_name="statuses", + columns=[ + Column( + name="id", + data_type="integer", + udt_name="int4", + nullable=False, + is_primary_key=True, + is_auto_generated=True, + ), + Column( + name="code", + data_type="varchar", + udt_name="varchar", + nullable=False, + ), + Column( + name="description", + data_type="text", + udt_name="text", + nullable=True, + ), + ], + primary_keys=["id"], + foreign_keys_outgoing=[], + foreign_keys_incoming=[], + ) + + mock_introspector.get_table_metadata.return_value = table + generator = SQLGenerator(mock_introspector) + + records = [ + RecordData( + identifier=RecordIdentifier( + table_name="statuses", schema_name="public", pk_values=("1",) + ), + data={"id": 1, "code": "ACTIVE", "description": "Active status"}, + dependencies=[], + ), + RecordData( + identifier=RecordIdentifier( + table_name="statuses", schema_name="public", pk_values=("2",) + ), + data={"id": 2, "code": "INACTIVE", "description": "Inactive status"}, + dependencies=[], + ), + RecordData( + identifier=RecordIdentifier( + table_name="statuses", schema_name="public", pk_values=("3",) + ), + data={"id": 3, "code": "PENDING", "description": None}, + dependencies=[], + ), + ] + + sql = generator._generate_insert_with_natural_key_check( + "public", "statuses", records, ["code"], ["id"] + ) + + # Verify CTE structure + assert "WITH to_insert AS" in sql + assert "existing AS" in sql + assert "inserted AS" in sql + # Verify all records + assert "'ACTIVE'" in sql + assert "'INACTIVE'" in sql + assert "'PENDING'" in sql + # Verify NULL handling + assert "NULL" in sql + # Verify natural key matching + assert '"code" IS NOT DISTINCT FROM' in sql + + def test_generate_insert_with_natural_key_check_composite( + self, mock_introspector: MagicMock + ) -> None: + """Should generate CTE-based INSERT with composite natural key.""" + table = Table( + schema_name="public", + table_name="tenant_settings", + columns=[ + Column( + name="id", + data_type="integer", + udt_name="int4", + nullable=False, + is_primary_key=True, + is_auto_generated=True, + ), + Column( + name="tenant_id", + data_type="integer", + udt_name="int4", + nullable=False, + ), + Column( + name="setting_key", + data_type="varchar", + udt_name="varchar", + nullable=False, + ), + Column( + name="value", + data_type="text", + udt_name="text", + nullable=True, + ), + ], + primary_keys=["id"], + foreign_keys_outgoing=[], + foreign_keys_incoming=[], + ) + + mock_introspector.get_table_metadata.return_value = table + manual_keys = {"tenant_settings": ["tenant_id", "setting_key"]} + generator = SQLGenerator(mock_introspector, natural_keys=manual_keys) + + records = [ + RecordData( + identifier=RecordIdentifier( + table_name="tenant_settings", + schema_name="public", + pk_values=("1",), + ), + data={ + "id": 1, + "tenant_id": 42, + "setting_key": "theme", + "value": "dark", + }, + dependencies=[], + ) + ] + + sql = generator._generate_insert_with_natural_key_check( + "public", + "tenant_settings", + records, + ["tenant_id", "setting_key"], + ["id"], + ) + + # Verify composite key matching (both columns) + assert '"tenant_id" IS NOT DISTINCT FROM' in sql + assert '"setting_key" IS NOT DISTINCT FROM' in sql + # Verify CTE structure + assert "WITH to_insert AS" in sql + + def test_natural_key_error_when_no_detection( + self, mock_introspector: MagicMock + ) -> None: + """Should raise SchemaError when no natural keys can be detected.""" + # Table with auto-gen PK, no unique constraints, and no natural key candidates + table = Table( + schema_name="public", + table_name="logs", + columns=[ + Column( + name="id", + data_type="integer", + udt_name="int4", + nullable=False, + is_primary_key=True, + is_auto_generated=True, + ), + Column( + name="message", + data_type="text", + udt_name="text", + nullable=True, # nullable, so not a candidate + ), + Column( + name="timestamp", + data_type="timestamp", + udt_name="timestamp", + nullable=False, # not VARCHAR/TEXT, so not a candidate + ), + ], + primary_keys=["id"], + foreign_keys_outgoing=[], + foreign_keys_incoming=[], + unique_constraints={}, + ) + + mock_introspector.get_table_metadata.return_value = table + generator = SQLGenerator(mock_introspector) + + # Should raise error with helpful message + with pytest.raises(Exception) as exc_info: + generator._build_on_conflict_clause( + table, ["message", "timestamp"], ["id"], "public", "logs" + ) + + error_msg = str(exc_info.value) + assert "Cannot generate idempotent SQL" in error_msg + assert "public" in error_msg + assert "logs" in error_msg + assert "--natural-keys" in error_msg