diff --git a/fluss-common/src/main/java/org/apache/fluss/lake/lakestorage/LakeCatalog.java b/fluss-common/src/main/java/org/apache/fluss/lake/lakestorage/LakeCatalog.java index 4cbccb6c1f..e0c4b95cef 100644 --- a/fluss-common/src/main/java/org/apache/fluss/lake/lakestorage/LakeCatalog.java +++ b/fluss-common/src/main/java/org/apache/fluss/lake/lakestorage/LakeCatalog.java @@ -20,6 +20,7 @@ import org.apache.fluss.annotation.PublicEvolving; import org.apache.fluss.exception.TableAlreadyExistException; import org.apache.fluss.exception.TableNotExistException; +import org.apache.fluss.metadata.Schema; import org.apache.fluss.metadata.TableChange; import org.apache.fluss.metadata.TableDescriptor; import org.apache.fluss.metadata.TablePath; @@ -84,5 +85,12 @@ interface Context { /** Get the fluss principal currently accessing the catalog. */ FlussPrincipal getFlussPrincipal(); + + /** + * Get the current schema of fluss. + * + * @return the current schema of fluss, + */ + Schema getFlussSchema(); } } diff --git a/fluss-common/src/test/java/org/apache/fluss/lake/lakestorage/TestingLakeCatalogContext.java b/fluss-common/src/test/java/org/apache/fluss/lake/lakestorage/TestingLakeCatalogContext.java index 7406b13b6b..8fa580d3e4 100644 --- a/fluss-common/src/test/java/org/apache/fluss/lake/lakestorage/TestingLakeCatalogContext.java +++ b/fluss-common/src/test/java/org/apache/fluss/lake/lakestorage/TestingLakeCatalogContext.java @@ -17,11 +17,22 @@ package org.apache.fluss.lake.lakestorage; +import org.apache.fluss.metadata.Schema; import org.apache.fluss.security.acl.FlussPrincipal; /** A testing implementation of {@link LakeCatalog.Context}. */ public class TestingLakeCatalogContext implements LakeCatalog.Context { + private final Schema schema; + + public TestingLakeCatalogContext(Schema schema) { + this.schema = schema; + } + + public TestingLakeCatalogContext() { + this(null); + } + @Override public boolean isCreatingFlussTable() { return false; @@ -31,4 +42,9 @@ public boolean isCreatingFlussTable() { public FlussPrincipal getFlussPrincipal() { return null; } + + @Override + public Schema getFlussSchema() { + return schema; + } } diff --git a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/PaimonLakeCatalog.java b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/PaimonLakeCatalog.java index 500546e641..602c1af6bc 100644 --- a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/PaimonLakeCatalog.java +++ b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/PaimonLakeCatalog.java @@ -37,13 +37,16 @@ import org.apache.paimon.schema.SchemaChange; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.Table; +import org.apache.paimon.types.DataField; import org.apache.paimon.types.DataType; import org.apache.paimon.types.DataTypes; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Collections; import java.util.LinkedHashMap; import java.util.List; +import java.util.stream.Collectors; import static org.apache.fluss.lake.paimon.utils.PaimonConversions.toPaimon; import static org.apache.fluss.lake.paimon.utils.PaimonConversions.toPaimonSchema; @@ -113,9 +116,11 @@ public void alterTable(TablePath tablePath, List tableChanges, Cont throws TableNotExistException { try { List paimonSchemaChanges = toPaimonSchemaChanges(tableChanges); - + org.apache.fluss.metadata.Schema flussSchema = context.getFlussSchema(); + List remainingChanges = + checkAndFilterDuplicateTableChanges(tablePath, tableChanges, flussSchema); // Compare current Paimon table schema with expected target schema before altering - if (shouldAlterTable(tablePath, tableChanges)) { + if (!remainingChanges.isEmpty()) { alterTable(tablePath, paimonSchemaChanges); } else { // If schemas already match, treat as idempotent success @@ -133,69 +138,118 @@ public void alterTable(TablePath tablePath, List tableChanges, Cont } } - private boolean shouldAlterTable(TablePath tablePath, List tableChanges) + private List checkAndFilterDuplicateTableChanges( + TablePath tablePath, + List tableChanges, + org.apache.fluss.metadata.Schema flussSchema) throws TableNotExistException { + if (tableChanges.isEmpty()) { + return tableChanges; + } + try { + // Get current Paimon table schema Table table = paimonCatalog.getTable(toPaimon(tablePath)); FileStoreTable fileStoreTable = (FileStoreTable) table; - Schema currentSchema = fileStoreTable.schema().toSchema(); - - for (TableChange change : tableChanges) { - if (change instanceof TableChange.AddColumn) { - TableChange.AddColumn addColumn = (TableChange.AddColumn) change; - if (!isColumnAlreadyExists(currentSchema, addColumn)) { - return true; - } - } else { - return true; + List paimonFields = + fileStoreTable.schema().toSchema().fields().stream() + .filter(field -> !SYSTEM_COLUMNS.containsKey(field.name())) + .collect(Collectors.toList()); + List flussColumns = flussSchema.getColumns(); + + if (paimonFields.size() < flussColumns.size()) { + throw new InvalidAlterTableException( + String.format( + "Paimon table has less columns (%d) than Fluss schema (%d)", + paimonFields.size(), flussColumns.size())); + } + + // Validate schema compatibility + validateExistingColumns(paimonFields, flussColumns); + + if (paimonFields.size() == flussColumns.size()) { + return tableChanges; + } + + // if paimon column size is greater than expected fluss column size, meaning last add + // columns are failed. + // Thus, this time must be retried to keep the schema same, only then can add new + // columns or other operations next time. + String errorMsg = + String.format( + "Paimon table has more columns (%d) than Fluss schema (%d), thus need to add the diff columns at once rather than other table changes %s.", + paimonFields.size(), flussColumns.size(), tableChanges); + if (flussColumns.size() + tableChanges.size() != paimonFields.size()) { + throw new InvalidAlterTableException(errorMsg); + } + + for (int i = 0; i < paimonFields.size() - flussColumns.size(); i++) { + DataField paimonDataField = paimonFields.get(i + flussColumns.size()); + TableChange tableChange = tableChanges.get(i); + if (!(tableChange instanceof TableChange.AddColumn + && ((TableChange.AddColumn) tableChange).getPosition() + == TableChange.ColumnPosition.last() + && isColumnAlreadyExists( + paimonDataField, (TableChange.AddColumn) tableChange))) { + throw new InvalidAlterTableException(errorMsg); } } + return Collections.emptyList(); - return false; } catch (Catalog.TableNotExistException e) { throw new TableNotExistException("Table " + tablePath + " does not exist."); } } - private boolean isColumnAlreadyExists(Schema currentSchema, TableChange.AddColumn addColumn) { + private void validateExistingColumns( + List paimonFields, + List flussColumns) { + for (int i = 0; i < flussColumns.size(); i++) { + if (!paimonFields.get(i).name().equals(flussColumns.get(i).getName())) { + throw new InvalidAlterTableException( + String.format( + "Column mismatch at position %d. Paimon: '%s', Fluss: '%s'", + i, paimonFields.get(i).name(), flussColumns.get(i).getName())); + } + } + } + + private boolean isColumnAlreadyExists( + org.apache.paimon.types.DataField field, TableChange.AddColumn addColumn) { String columnName = addColumn.getName(); - for (org.apache.paimon.types.DataField field : currentSchema.fields()) { - if (field.name().equals(columnName)) { - org.apache.paimon.types.DataType expectedType = - addColumn - .getDataType() - .accept( - org.apache.fluss.lake.paimon.utils - .FlussDataTypeToPaimonDataType.INSTANCE); - - if (!field.type().equals(expectedType)) { - throw new InvalidAlterTableException( - String.format( - "Column '%s' already exists but with different type. " - + "Existing: %s, Expected: %s", - columnName, field.type(), expectedType)); - } - String existingComment = field.description(); - String expectedComment = addColumn.getComment(); - - boolean commentsMatch = - (existingComment == null && expectedComment == null) - || (existingComment != null - && existingComment.equals(expectedComment)); - - if (!commentsMatch) { - throw new InvalidAlterTableException( - String.format( - "Column %s already exists but with different comment. " - + "Existing: %s, Expected: %s", - columnName, existingComment, expectedComment)); - } + if (field.name().equals(columnName)) { + org.apache.paimon.types.DataType expectedType = + addColumn + .getDataType() + .accept( + org.apache.fluss.lake.paimon.utils.FlussDataTypeToPaimonDataType + .INSTANCE); - return true; + if (!field.type().equals(expectedType)) { + throw new InvalidAlterTableException( + String.format( + "Column '%s' already exists but with different type. " + + "Existing: %s, Expected: %s", + columnName, field.type(), expectedType)); } - } + String existingComment = field.description(); + String expectedComment = addColumn.getComment(); + + boolean commentsMatch = + (existingComment == null && expectedComment == null) + || (existingComment != null && existingComment.equals(expectedComment)); + if (!commentsMatch) { + throw new InvalidAlterTableException( + String.format( + "Column %s already exists but with different comment. " + + "Existing: %s, Expected: %s", + columnName, existingComment, expectedComment)); + } + + return true; + } return false; } diff --git a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/PaimonLakeCatalogTest.java b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/PaimonLakeCatalogTest.java index ea18b85f6a..405b3b0a5a 100644 --- a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/PaimonLakeCatalogTest.java +++ b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/PaimonLakeCatalogTest.java @@ -34,6 +34,7 @@ import org.junit.jupiter.api.io.TempDir; import java.io.File; +import java.util.Arrays; import java.util.Collections; import java.util.List; @@ -42,6 +43,15 @@ /** Unit test for {@link PaimonLakeCatalog}. */ class PaimonLakeCatalogTest { + private static final Schema FLUSS_SCHEMA = + Schema.newBuilder() + .column("id", DataTypes.BIGINT()) + .column("name", DataTypes.STRING()) + .column("amount", DataTypes.INT()) + .column("address", DataTypes.STRING()) + .build(); + private static final TestingLakeCatalogContext LAKE_CATALOG_CONTEXT = + new TestingLakeCatalogContext(FLUSS_SCHEMA); @TempDir private File tempWarehouseDir; @@ -70,7 +80,7 @@ void testAlterTableProperties() throws Exception { flussPaimonCatalog.alterTable( tablePath, Collections.singletonList(TableChange.set("key", "value")), - new TestingLakeCatalogContext()); + LAKE_CATALOG_CONTEXT); table = flussPaimonCatalog.getPaimonCatalog().getTable(identifier); // we have set the value for key @@ -80,7 +90,7 @@ void testAlterTableProperties() throws Exception { flussPaimonCatalog.alterTable( tablePath, Collections.singletonList(TableChange.reset("key")), - new TestingLakeCatalogContext()); + LAKE_CATALOG_CONTEXT); table = flussPaimonCatalog.getPaimonCatalog().getTable(identifier); // we have reset the value for key @@ -89,14 +99,13 @@ void testAlterTableProperties() throws Exception { @Test void alterTablePropertiesWithNonExistentTable() { - TestingLakeCatalogContext context = new TestingLakeCatalogContext(); // db & table don't exist assertThatThrownBy( () -> flussPaimonCatalog.alterTable( TablePath.of("non_existing_db", "non_existing_table"), Collections.singletonList(TableChange.set("key", "value")), - context)) + LAKE_CATALOG_CONTEXT)) .isInstanceOf(TableNotExistException.class) .hasMessage("Table non_existing_db.non_existing_table does not exist."); @@ -110,7 +119,7 @@ void alterTablePropertiesWithNonExistentTable() { flussPaimonCatalog.alterTable( TablePath.of(database, "non_existing_table"), Collections.singletonList(TableChange.set("key", "value")), - context)) + LAKE_CATALOG_CONTEXT)) .isInstanceOf(TableNotExistException.class) .hasMessage("Table alter_props_db.non_existing_table does not exist."); } @@ -131,7 +140,7 @@ void testAlterTableAddColumnLastNullable() throws Exception { "new_col comment", TableChange.ColumnPosition.last())); - flussPaimonCatalog.alterTable(tablePath, changes, new TestingLakeCatalogContext()); + flussPaimonCatalog.alterTable(tablePath, changes, LAKE_CATALOG_CONTEXT); Table table = flussPaimonCatalog.getPaimonCatalog().getTable(identifier); assertThat(table.rowType().getFieldNames()) @@ -164,7 +173,7 @@ void testAlterTableAddColumnNotLast() { assertThatThrownBy( () -> flussPaimonCatalog.alterTable( - tablePath, changes, new TestingLakeCatalogContext())) + tablePath, changes, LAKE_CATALOG_CONTEXT)) .isInstanceOf(UnsupportedOperationException.class) .hasMessage("Only support to add column at last for paimon table."); } @@ -187,13 +196,13 @@ void testAlterTableAddColumnNotNullable() { assertThatThrownBy( () -> flussPaimonCatalog.alterTable( - tablePath, changes, new TestingLakeCatalogContext())) + tablePath, changes, LAKE_CATALOG_CONTEXT)) .isInstanceOf(UnsupportedOperationException.class) .hasMessage("Only support to add nullable column for paimon table."); } @Test - void testAlterTableAddExistingColumn() { + void testAlterTableAddExistingColumns() { String database = "test_alter_table_add_existing_column_db"; String tableName = "test_alter_table_add_existing_column_table"; TablePath tablePath = TablePath.of(database, tableName); @@ -207,13 +216,38 @@ void testAlterTableAddExistingColumn() { null, TableChange.ColumnPosition.last())); - // no exception thrown when adding existing column - flussPaimonCatalog.alterTable(tablePath, changes, new TestingLakeCatalogContext()); + assertThatThrownBy( + () -> + flussPaimonCatalog.alterTable( + tablePath, changes, LAKE_CATALOG_CONTEXT)) + .isInstanceOf(InvalidAlterTableException.class) + .hasMessageContaining( + "Column address already exists in the test_alter_table_add_existing_column_db.test_alter_table_add_existing_column_table table."); List changes2 = - Collections.singletonList( + Arrays.asList( TableChange.addColumn( - "address", + "new_column", + DataTypes.INT(), + null, + TableChange.ColumnPosition.last()), + TableChange.addColumn( + "new_column2", + DataTypes.STRING(), + null, + TableChange.ColumnPosition.last())); + + // mock add columns to paimon successfully but fail to add columns to fluss. + flussPaimonCatalog.alterTable(tablePath, changes2, LAKE_CATALOG_CONTEXT); + List changes3 = + Arrays.asList( + TableChange.addColumn( + "new_column", + DataTypes.INT(), + null, + TableChange.ColumnPosition.last()), + TableChange.addColumn( + "new_column2", DataTypes.INT(), null, TableChange.ColumnPosition.last())); @@ -221,15 +255,20 @@ void testAlterTableAddExistingColumn() { assertThatThrownBy( () -> flussPaimonCatalog.alterTable( - tablePath, changes2, new TestingLakeCatalogContext())) + tablePath, changes3, LAKE_CATALOG_CONTEXT)) .isInstanceOf(InvalidAlterTableException.class) .hasMessage( - "Column 'address' already exists but with different type. Existing: STRING, Expected: INT"); + "Column 'new_column2' already exists but with different type. Existing: STRING, Expected: INT"); - List changes3 = - Collections.singletonList( + List changes4 = + Arrays.asList( TableChange.addColumn( - "address", + "new_column", + DataTypes.INT(), + null, + TableChange.ColumnPosition.last()), + TableChange.addColumn( + "new_column2", DataTypes.STRING(), "the address comment", TableChange.ColumnPosition.last())); @@ -237,29 +276,74 @@ tablePath, changes2, new TestingLakeCatalogContext())) assertThatThrownBy( () -> flussPaimonCatalog.alterTable( - tablePath, changes3, new TestingLakeCatalogContext())) + tablePath, changes4, LAKE_CATALOG_CONTEXT)) .isInstanceOf(InvalidAlterTableException.class) .hasMessage( - "Column address already exists but with different comment. Existing: null, Expected: the address comment"); + "Column new_column2 already exists but with different comment. Existing: null, Expected: the address comment"); + + // no exception thrown only when adding existing column to match fluss and paimon. + flussPaimonCatalog.alterTable(tablePath, changes2, LAKE_CATALOG_CONTEXT); } - private void createTable(String database, String tableName) { - Schema flussSchema = - Schema.newBuilder() - .column("id", DataTypes.BIGINT()) - .column("name", DataTypes.STRING()) - .column("amount", DataTypes.INT()) - .column("address", DataTypes.STRING()) - .build(); + @Test + void testAlterTableAddColumnWhenPaimonSchemaNotMatch() { + // this rarely happens only when new fluss lake table with an existed paimon table or use + // alter table in paimon side directly. + String database = "test_alter_table_add_column_fluss_wider"; + String tableName = "test_alter_table_add_column_fluss_wider"; + createTable(database, tableName); + TablePath tablePath = TablePath.of(database, tableName); + + List changes = + Collections.singletonList( + TableChange.addColumn( + "new_col", + DataTypes.INT(), + "new_col comment", + TableChange.ColumnPosition.last())); + assertThatThrownBy( + () -> + flussPaimonCatalog.alterTable( + tablePath, + changes, + new TestingLakeCatalogContext( + Schema.newBuilder() + .column("id", DataTypes.BIGINT()) + .column("name", DataTypes.STRING()) + .column("amount", DataTypes.INT()) + .column("address", DataTypes.STRING()) + .column("phone", DataTypes.INT()) + .build()))) + .isInstanceOf(InvalidAlterTableException.class) + .hasMessageContaining("Paimon table has less columns (4) than Fluss schema (5)"); + + assertThatThrownBy( + () -> + flussPaimonCatalog.alterTable( + tablePath, + changes, + new TestingLakeCatalogContext( + Schema.newBuilder() + .column("id", DataTypes.BIGINT()) + .column("amount", DataTypes.INT()) + .column("name", DataTypes.STRING()) + .column("address", DataTypes.STRING()) + .build()))) + .isInstanceOf(InvalidAlterTableException.class) + .hasMessageContaining( + "Column mismatch at position 1. Paimon: 'name', Fluss: 'amount'"); + } + + private void createTable(String database, String tableName) { TableDescriptor td = TableDescriptor.builder() - .schema(flussSchema) + .schema(FLUSS_SCHEMA) .distributedBy(3) // no bucket key .build(); TablePath tablePath = TablePath.of(database, tableName); - flussPaimonCatalog.createTable(tablePath, td, new TestingLakeCatalogContext()); + flussPaimonCatalog.createTable(tablePath, td, LAKE_CATALOG_CONTEXT); } } diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java index ee0092d1b8..648c03298e 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java @@ -43,6 +43,7 @@ import org.apache.fluss.metadata.MergeEngineType; import org.apache.fluss.metadata.PartitionSpec; import org.apache.fluss.metadata.ResolvedPartitionSpec; +import org.apache.fluss.metadata.Schema; import org.apache.fluss.metadata.TableChange; import org.apache.fluss.metadata.TableDescriptor; import org.apache.fluss.metadata.TablePath; @@ -331,7 +332,9 @@ public CompletableFuture createTable(CreateTableRequest req tablePath, tableDescriptor, new DefaultLakeCatalogContext( - true, currentSession().getPrincipal())); + true, + currentSession().getPrincipal(), + tableDescriptor.getSchema())); } catch (TableAlreadyExistException e) { throw new LakeTableAlreadyExistException(e.getMessage(), e); } @@ -366,8 +369,6 @@ public CompletableFuture alterTable(AlterTableRequest reques LakeCatalogDynamicLoader.LakeCatalogContainer lakeCatalogContainer = lakeCatalogDynamicLoader.getLakeCatalogContainer(); - LakeCatalog.Context lakeCatalogContext = - new DefaultLakeCatalogContext(false, currentSession().getPrincipal()); if (!alterSchemaChanges.isEmpty()) { metadataManager.alterTableSchema( @@ -375,7 +376,7 @@ public CompletableFuture alterTable(AlterTableRequest reques alterSchemaChanges, request.isIgnoreIfNotExists(), lakeCatalogContainer.getLakeCatalog(), - lakeCatalogContext); + currentSession().getPrincipal()); } if (!alterTableConfigChanges.isEmpty()) { @@ -386,7 +387,7 @@ public CompletableFuture alterTable(AlterTableRequest reques request.isIgnoreIfNotExists(), lakeCatalogContainer.getLakeCatalog(), lakeTableTieringManager, - lakeCatalogContext); + currentSession().getPrincipal()); } return CompletableFuture.completedFuture(new AlterTableResponse()); @@ -938,11 +939,13 @@ static class DefaultLakeCatalogContext implements LakeCatalog.Context { private final boolean isCreatingFlussTable; private final FlussPrincipal flussPrincipal; + private final Schema schema; public DefaultLakeCatalogContext( - boolean isCreatingFlussTable, FlussPrincipal flussPrincipal) { + boolean isCreatingFlussTable, FlussPrincipal flussPrincipal, Schema schema) { this.isCreatingFlussTable = isCreatingFlussTable; this.flussPrincipal = flussPrincipal; + this.schema = schema; } @Override @@ -954,5 +957,10 @@ public boolean isCreatingFlussTable() { public FlussPrincipal getFlussPrincipal() { return flussPrincipal; } + + @Override + public Schema getFlussSchema() { + return schema; + } } } diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java index 05c5d51de2..f29e0285a2 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java @@ -45,6 +45,7 @@ import org.apache.fluss.metadata.TableInfo; import org.apache.fluss.metadata.TablePartition; import org.apache.fluss.metadata.TablePath; +import org.apache.fluss.security.acl.FlussPrincipal; import org.apache.fluss.server.entity.TablePropertyChanges; import org.apache.fluss.server.zk.ZooKeeperClient; import org.apache.fluss.server.zk.data.DatabaseRegistration; @@ -326,7 +327,7 @@ public void alterTableSchema( List schemaChanges, boolean ignoreIfNotExists, @Nullable LakeCatalog lakeCatalog, - LakeCatalog.Context lakeCatalogContext) + FlussPrincipal flussPrincipal) throws TableNotExistException, TableNotPartitionedException { try { @@ -335,7 +336,9 @@ public void alterTableSchema( // validate the table column changes if (!schemaChanges.isEmpty()) { Schema newSchema = SchemaUpdate.applySchemaChanges(table, schemaChanges); - + LakeCatalog.Context lakeCatalogContext = + new CoordinatorService.DefaultLakeCatalogContext( + false, flussPrincipal, table.getSchema()); // Lake First: sync to Lake before updating Fluss schema syncSchemaChangesToLake( tablePath, table, schemaChanges, lakeCatalog, lakeCatalogContext); @@ -399,7 +402,7 @@ public void alterTableProperties( boolean ignoreIfNotExists, @Nullable LakeCatalog lakeCatalog, LakeTableTieringManager lakeTableTieringManager, - LakeCatalog.Context lakeCatalogContext) { + FlussPrincipal flussPrincipal) { try { // it throws TableNotExistException if the table or database not exists TableRegistration tableReg = getTableRegistration(tablePath); @@ -430,7 +433,7 @@ public void alterTableProperties( newDescriptor, tableChanges, lakeCatalog, - lakeCatalogContext); + flussPrincipal); // update the table to zk TableRegistration updatedTableRegistration = tableReg.newProperties( @@ -470,7 +473,10 @@ private void preAlterTableProperties( TableDescriptor newDescriptor, List tableChanges, LakeCatalog lakeCatalog, - LakeCatalog.Context lakeCatalogContext) { + FlussPrincipal flussPrincipal) { + LakeCatalog.Context lakeCatalogContext = + new CoordinatorService.DefaultLakeCatalogContext( + false, flussPrincipal, newDescriptor.getSchema()); if (isDataLakeEnabled(newDescriptor)) { if (lakeCatalog == null) { throw new InvalidAlterTableException(