Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
129 changes: 87 additions & 42 deletions internal/testing/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -509,18 +509,25 @@ func (m *DatabaseManager) DropDatabase(ctx context.Context, dbName string) error
return nil
}

// CloneExternalDatabase clones the xatu template database (default) to a per-test database.
// This clones schemas only (empty tables), not data.
// For ReplicatedMergeTree tables, it modifies the ZooKeeper paths to be unique per-test.
// CloneExternalDatabase clones specific tables from the xatu template to a per-test database.
// If tableNames is empty, clones all tables (legacy behavior).
func (m *DatabaseManager) CloneExternalDatabase(ctx context.Context, testID string, tableNames []string) (string, error) {
// cloneSpec describes a table clone operation with potentially different source and target names.
// This supports cross-database external models where the source table name differs from the model name
// (e.g., source observoor.cpu_utilization → target ext_123.observoor_cpu_utilization).
type cloneSpec struct {
sourceDB string
sourceTable string
targetTable string
}

// CloneExternalDatabase clones specific external tables from the xatu cluster to a per-test database.
// Supports cross-database external models where the source table lives in a non-default database
// (e.g., observoor.cpu_utilization) and needs to be cloned under the model name in the test database.
func (m *DatabaseManager) CloneExternalDatabase(ctx context.Context, testID string, tableRefs []ExternalTableRef) (string, error) {
extDBName := config.ExternalDBPrefix + testID

logCtx := m.log.WithFields(logrus.Fields{
"cluster": "xatu",
"database": extDBName,
"tables": len(tableNames),
"tables": len(tableRefs),
})

start := time.Now()
Expand All @@ -536,20 +543,32 @@ func (m *DatabaseManager) CloneExternalDatabase(ctx context.Context, testID stri
}
cancel()

// Build list of tables to clone (only specified tables, empty = no tables)
// Clone BOTH _local (ReplicatedMergeTree) AND distributed tables for each model
// Assertions query distributed tables which read from _local tables
tables := make([]tableInfo, 0, len(tableNames)*2)
// Build clone specs for each external table ref.
// Clone BOTH _local (ReplicatedMergeTree) AND distributed tables for each model.
// For cross-database models, the source table name may differ from the target (model) name.
specs := make([]cloneSpec, 0, len(tableRefs)*2)

for _, name := range tableNames {
tables = append(tables,
tableInfo{name: name + "_local"}, // ReplicatedMergeTree table
tableInfo{name: name}, // Distributed table
for _, ref := range tableRefs {
sourceDB := config.DefaultDatabase
sourceTable := ref.ModelName

if ref.SourceDB != "" {
sourceDB = ref.SourceDB
}

if ref.SourceTable != "" {
sourceTable = ref.SourceTable
}

// Clone _local table first (ReplicatedMergeTree), then distributed table
specs = append(specs,
cloneSpec{sourceDB: sourceDB, sourceTable: sourceTable + "_local", targetTable: ref.ModelName + "_local"},
cloneSpec{sourceDB: sourceDB, sourceTable: sourceTable, targetTable: ref.ModelName},
)
}

// If no tables to clone, just return the empty database
if len(tables) == 0 {
if len(specs) == 0 {
logCtx.WithFields(logrus.Fields{
"tables": 0,
"duration": time.Since(start),
Expand All @@ -564,24 +583,24 @@ func (m *DatabaseManager) CloneExternalDatabase(ctx context.Context, testID stri
var (
cloneWg sync.WaitGroup
cloneSem = make(chan struct{}, cloneWorkers)
cloneErrs = make(chan error, len(tables))
cloneErrs = make(chan error, len(specs))
)

for _, table := range tables {
for _, spec := range specs {
cloneWg.Add(1)

go func(t tableInfo) {
go func(s cloneSpec) {
defer cloneWg.Done()

// Acquire semaphore
cloneSem <- struct{}{}
defer func() { <-cloneSem }()

if cloneErr := m.cloneTableWithUniqueReplicaPath(ctx, m.xatuConn,
config.DefaultDatabase, extDBName, t.name, config.XatuClusterName); cloneErr != nil {
cloneErrs <- fmt.Errorf("cloning table %s: %w", t.name, cloneErr)
s.sourceDB, extDBName, s.sourceTable, s.targetTable, config.XatuClusterName); cloneErr != nil {
cloneErrs <- fmt.Errorf("cloning table %s.%s → %s: %w", s.sourceDB, s.sourceTable, s.targetTable, cloneErr)
}
}(table)
}(spec)
}

cloneWg.Wait()
Expand All @@ -595,49 +614,53 @@ func (m *DatabaseManager) CloneExternalDatabase(ctx context.Context, testID stri
}

logCtx.WithFields(logrus.Fields{
"tables": len(tables),
"tables": len(specs),
"duration": time.Since(start),
}).Info("external database cloned")

return extDBName, nil
}

// cloneTableWithUniqueReplicaPath clones a table, modifying ReplicatedMergeTree paths to be unique.
// sourceTable and targetTable may differ for cross-database external models where the source table name
// (e.g., cpu_utilization) differs from the model name used in the test database (e.g., observoor_cpu_utilization).
func (m *DatabaseManager) cloneTableWithUniqueReplicaPath(
ctx context.Context,
conn *sql.DB,
sourceDB, targetDB, tableName, clusterName string,
sourceDB, targetDB, sourceTable, targetTable, clusterName string,
) error {
// Get the CREATE TABLE statement
showSQL := fmt.Sprintf("SHOW CREATE TABLE `%s`.`%s`", sourceDB, tableName)
// Get the CREATE TABLE statement from the source
showSQL := fmt.Sprintf("SHOW CREATE TABLE `%s`.`%s`", sourceDB, sourceTable)

queryCtx, cancel := context.WithTimeout(ctx, m.config.QueryTimeout)
defer cancel()

var createStmt string
if err := conn.QueryRowContext(queryCtx, showSQL).Scan(&createStmt); err != nil {
return fmt.Errorf("getting CREATE TABLE statement: %w", err)
return fmt.Errorf("getting CREATE TABLE statement for %s.%s: %w", sourceDB, sourceTable, err)
}

// Modify the statement:
// 1. Replace database name in the CREATE TABLE line
// 1. Replace database and table name in the CREATE TABLE line
// 2. Replace ZooKeeper paths in ReplicatedMergeTree to include target database
modifiedStmt := m.modifyCreateTableForClone(createStmt, sourceDB, targetDB, tableName, clusterName)
// 3. Rename table references when sourceTable != targetTable
modifiedStmt := m.modifyCreateTableForClone(createStmt, sourceDB, targetDB, sourceTable, targetTable, clusterName)

execCtx, execCancel := context.WithTimeout(ctx, m.config.QueryTimeout)
defer execCancel()

if _, err := conn.ExecContext(execCtx, modifiedStmt); err != nil {
return fmt.Errorf("executing modified CREATE TABLE: %w", err)
return fmt.Errorf("executing modified CREATE TABLE for %s: %w", targetTable, err)
}

return nil
}

// modifyCreateTableForClone modifies a CREATE TABLE/VIEW statement for cloning to a new database.
// It updates the table name and modifies ReplicatedMergeTree ZooKeeper paths.
// It updates the database/table name and modifies ReplicatedMergeTree ZooKeeper paths.
// When sourceTable != targetTable (cross-database models), it also renames table references.
func (m *DatabaseManager) modifyCreateTableForClone(
createStmt, sourceDB, targetDB, tableName, clusterName string,
createStmt, sourceDB, targetDB, sourceTable, targetTable, clusterName string,
) string {
result := createStmt

Expand All @@ -647,33 +670,33 @@ func (m *DatabaseManager) modifyCreateTableForClone(
// Materialized Views: CREATE MATERIALIZED VIEW default.mv_name

// Replace TABLE references
oldTableRef := fmt.Sprintf("CREATE TABLE %s.%s", sourceDB, tableName)
oldTableRef := fmt.Sprintf("CREATE TABLE %s.%s", sourceDB, sourceTable)
newTableRef := fmt.Sprintf("CREATE TABLE IF NOT EXISTS `%s`.`%s` ON CLUSTER %s",
targetDB, tableName, clusterName)
targetDB, targetTable, clusterName)
result = strings.Replace(result, oldTableRef, newTableRef, 1)

// Also handle backtick-quoted version for TABLE
oldTableRefQuoted := fmt.Sprintf("CREATE TABLE `%s`.`%s`", sourceDB, tableName)
oldTableRefQuoted := fmt.Sprintf("CREATE TABLE `%s`.`%s`", sourceDB, sourceTable)
result = strings.Replace(result, oldTableRefQuoted, newTableRef, 1)

// Replace MATERIALIZED VIEW references
oldMVRef := fmt.Sprintf("CREATE MATERIALIZED VIEW %s.%s", sourceDB, tableName)
oldMVRef := fmt.Sprintf("CREATE MATERIALIZED VIEW %s.%s", sourceDB, sourceTable)
newMVRef := fmt.Sprintf("CREATE MATERIALIZED VIEW IF NOT EXISTS `%s`.`%s` ON CLUSTER %s",
targetDB, tableName, clusterName)
targetDB, targetTable, clusterName)
result = strings.Replace(result, oldMVRef, newMVRef, 1)

// Backtick-quoted MATERIALIZED VIEW
oldMVRefQuoted := fmt.Sprintf("CREATE MATERIALIZED VIEW `%s`.`%s`", sourceDB, tableName)
oldMVRefQuoted := fmt.Sprintf("CREATE MATERIALIZED VIEW `%s`.`%s`", sourceDB, sourceTable)
result = strings.Replace(result, oldMVRefQuoted, newMVRef, 1)

// Replace VIEW references (non-materialized)
oldViewRef := fmt.Sprintf("CREATE VIEW %s.%s", sourceDB, tableName)
oldViewRef := fmt.Sprintf("CREATE VIEW %s.%s", sourceDB, sourceTable)
newViewRef := fmt.Sprintf("CREATE VIEW IF NOT EXISTS `%s`.`%s` ON CLUSTER %s",
targetDB, tableName, clusterName)
targetDB, targetTable, clusterName)
result = strings.Replace(result, oldViewRef, newViewRef, 1)

// Backtick-quoted VIEW
oldViewRefQuoted := fmt.Sprintf("CREATE VIEW `%s`.`%s`", sourceDB, tableName)
oldViewRefQuoted := fmt.Sprintf("CREATE VIEW `%s`.`%s`", sourceDB, sourceTable)
result = strings.Replace(result, oldViewRefQuoted, newViewRef, 1)

// For Materialized Views with TO clause (e.g., CREATE MATERIALIZED VIEW ... TO default.target_table)
Expand Down Expand Up @@ -712,6 +735,28 @@ func (m *DatabaseManager) modifyCreateTableForClone(
newDistRef := fmt.Sprintf("'%s'", targetDB)
result = strings.ReplaceAll(result, oldDistRef, newDistRef)

// For cross-database models where source table name differs from target table name,
// rename the Distributed engine's local table reference.
// Example: Distributed('{cluster}', 'ext_123', 'cpu_utilization_local', ...) needs to become
// Distributed('{cluster}', 'ext_123', 'observoor_cpu_utilization_local', ...)
// Only target quoted references in the Distributed engine to avoid unintended global replacements.
if sourceTable != targetTable {
// Quoted reference (most common): 'cpu_utilization_local' → 'observoor_cpu_utilization_local'
oldQuotedLocal := fmt.Sprintf("'%s_local'", sourceTable)
newQuotedLocal := fmt.Sprintf("'%s_local'", targetTable)
result = strings.ReplaceAll(result, oldQuotedLocal, newQuotedLocal)

// Unquoted reference: cpu_utilization_local, → observoor_cpu_utilization_local,
oldUnquotedLocal := fmt.Sprintf(", %s_local,", sourceTable)
newUnquotedLocal := fmt.Sprintf(", %s_local,", targetTable)
result = strings.ReplaceAll(result, oldUnquotedLocal, newUnquotedLocal)

// Quoted base table ref: 'cpu_utilization' → 'observoor_cpu_utilization'
oldQuotedBase := fmt.Sprintf("'%s'", sourceTable)
newQuotedBase := fmt.Sprintf("'%s'", targetTable)
result = strings.ReplaceAll(result, oldQuotedBase, newQuotedBase)
}

return result
}

Expand Down Expand Up @@ -780,7 +825,7 @@ func (m *DatabaseManager) CloneCBTDatabase(ctx context.Context, testID string, t
defer func() { <-cloneSem }()

if cloneErr := m.cloneTableWithUniqueReplicaPath(ctx, m.cbtConn,
config.CBTTemplateDatabase, cbtDBName, t.name, config.CBTClusterName); cloneErr != nil {
config.CBTTemplateDatabase, cbtDBName, t.name, t.name, config.CBTClusterName); cloneErr != nil {
cloneErrs <- fmt.Errorf("cloning table %s: %w", t.name, cloneErr)
}
}(table)
Expand Down
Loading
Loading