Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.fluss.annotation.PublicEvolving;
import org.apache.fluss.exception.TableAlreadyExistException;
import org.apache.fluss.exception.TableNotExistException;
import org.apache.fluss.metadata.Schema;
import org.apache.fluss.metadata.TableChange;
import org.apache.fluss.metadata.TableDescriptor;
import org.apache.fluss.metadata.TablePath;
Expand Down Expand Up @@ -84,5 +85,12 @@ interface Context {

/** Get the fluss principal currently accessing the catalog. */
FlussPrincipal getFlussPrincipal();

/**
* Get the current schema of fluss.
*
* @return the current schema of fluss,
Copy link

Copilot AI Jan 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new method getFlussSchema() should include a @since annotation to document when it was added to the API, consistent with other methods in the interface that have @since annotations.

Suggested change
* @return the current schema of fluss,
* @return the current schema of fluss,
* @since 0.10

Copilot uses AI. Check for mistakes.
Comment on lines +90 to +92
Copy link

Copilot AI Jan 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The JavaDoc comment is incomplete. It has "@return the current schema of fluss," but the sentence is not finished and has a trailing comma. Consider completing the description, for example: "Get the current schema of Fluss. @return the current schema of Fluss".

Suggested change
* Get the current schema of fluss.
*
* @return the current schema of fluss,
* Get the current schema of Fluss.
*
* @return the current schema of Fluss.

Copilot uses AI. Check for mistakes.
*/
Schema getFlussSchema();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,22 @@

package org.apache.fluss.lake.lakestorage;

import org.apache.fluss.metadata.Schema;
import org.apache.fluss.security.acl.FlussPrincipal;

/** A testing implementation of {@link LakeCatalog.Context}. */
public class TestingLakeCatalogContext implements LakeCatalog.Context {

private final Schema schema;

public TestingLakeCatalogContext(Schema schema) {
this.schema = schema;
}

public TestingLakeCatalogContext() {
this(null);
}

@Override
public boolean isCreatingFlussTable() {
return false;
Expand All @@ -31,4 +42,9 @@ public boolean isCreatingFlussTable() {
public FlussPrincipal getFlussPrincipal() {
return null;
}

@Override
public Schema getFlussSchema() {
return schema;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,16 @@
import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.stream.Collectors;

import static org.apache.fluss.lake.paimon.utils.PaimonConversions.toPaimon;
import static org.apache.fluss.lake.paimon.utils.PaimonConversions.toPaimonSchema;
Expand Down Expand Up @@ -113,9 +116,11 @@ public void alterTable(TablePath tablePath, List<TableChange> tableChanges, Cont
throws TableNotExistException {
try {
List<SchemaChange> paimonSchemaChanges = toPaimonSchemaChanges(tableChanges);

org.apache.fluss.metadata.Schema flussSchema = context.getFlussSchema();
List<TableChange> remainingChanges =
checkAndFilterDuplicateTableChanges(tablePath, tableChanges, flussSchema);
// Compare current Paimon table schema with expected target schema before altering
if (shouldAlterTable(tablePath, tableChanges)) {
if (!remainingChanges.isEmpty()) {
alterTable(tablePath, paimonSchemaChanges);
} else {
// If schemas already match, treat as idempotent success
Expand All @@ -133,69 +138,118 @@ public void alterTable(TablePath tablePath, List<TableChange> tableChanges, Cont
}
}

private boolean shouldAlterTable(TablePath tablePath, List<TableChange> tableChanges)
private List<TableChange> checkAndFilterDuplicateTableChanges(
TablePath tablePath,
List<TableChange> tableChanges,
org.apache.fluss.metadata.Schema flussSchema)
throws TableNotExistException {
if (tableChanges.isEmpty()) {
return tableChanges;
}

try {
// Get current Paimon table schema
Table table = paimonCatalog.getTable(toPaimon(tablePath));
FileStoreTable fileStoreTable = (FileStoreTable) table;
Schema currentSchema = fileStoreTable.schema().toSchema();

for (TableChange change : tableChanges) {
if (change instanceof TableChange.AddColumn) {
TableChange.AddColumn addColumn = (TableChange.AddColumn) change;
if (!isColumnAlreadyExists(currentSchema, addColumn)) {
return true;
}
} else {
return true;
List<DataField> paimonFields =
fileStoreTable.schema().toSchema().fields().stream()
.filter(field -> !SYSTEM_COLUMNS.containsKey(field.name()))
.collect(Collectors.toList());
List<org.apache.fluss.metadata.Schema.Column> flussColumns = flussSchema.getColumns();

if (paimonFields.size() < flussColumns.size()) {
throw new InvalidAlterTableException(
String.format(
"Paimon table has less columns (%d) than Fluss schema (%d)",
paimonFields.size(), flussColumns.size()));
}

// Validate schema compatibility
Copy link

Copilot AI Jan 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's an extra space in the comment. Should be "Validate schema compatibility" instead of "Validate schema compatibility".

Suggested change
// Validate schema compatibility
// Validate schema compatibility

Copilot uses AI. Check for mistakes.
validateExistingColumns(paimonFields, flussColumns);

if (paimonFields.size() == flussColumns.size()) {
return tableChanges;
}

// if paimon column size is greater than expected fluss column size, meaning last add
// columns are failed.
// Thus, this time must be retried to keep the schema same, only then can add new
// columns or other operations next time.
Comment on lines +174 to +177
Copy link

Copilot AI Jan 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment states "meaning last add columns are failed" which should be "meaning the last add columns operation failed" for better grammar. Also, "This time must be retried" could be clearer as "This operation must retry adding the same columns".

Suggested change
// if paimon column size is greater than expected fluss column size, meaning last add
// columns are failed.
// Thus, this time must be retried to keep the schema same, only then can add new
// columns or other operations next time.
// If Paimon column size is greater than the expected Fluss column size, it means the
// last add columns operation failed.
// Thus, this operation must retry adding the same columns to keep the schemas the
// same before adding new columns or performing other operations.

Copilot uses AI. Check for mistakes.
String errorMsg =
String.format(
"Paimon table has more columns (%d) than Fluss schema (%d), thus need to add the diff columns at once rather than other table changes %s.",
Copy link

Copilot AI Jan 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error message has a grammatical issue. "thus need to add" should be "thus needs to add" or better yet, rephrase to "therefore you need to add the diff columns all at once, rather than applying other table changes" for better clarity.

Suggested change
"Paimon table has more columns (%d) than Fluss schema (%d), thus need to add the diff columns at once rather than other table changes %s.",
"Paimon table has more columns (%d) than Fluss schema (%d); therefore you need to add the diff columns all at once, rather than applying other table changes: %s.",

Copilot uses AI. Check for mistakes.
paimonFields.size(), flussColumns.size(), tableChanges);
if (flussColumns.size() + tableChanges.size() != paimonFields.size()) {
throw new InvalidAlterTableException(errorMsg);
}

for (int i = 0; i < paimonFields.size() - flussColumns.size(); i++) {
DataField paimonDataField = paimonFields.get(i + flussColumns.size());
TableChange tableChange = tableChanges.get(i);
if (!(tableChange instanceof TableChange.AddColumn
&& ((TableChange.AddColumn) tableChange).getPosition()
== TableChange.ColumnPosition.last()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we could perform this check earlier—it’s not related to the existing Paimon schema. Also, it’s already validated here:

TableChange.ColumnPosition position = addColumn.getPosition();
if (position != TableChange.ColumnPosition.last()) {
throw new IllegalArgumentException("Only support addColumn column at last now.");
}

&& isColumnAlreadyExists(
paimonDataField, (TableChange.AddColumn) tableChange))) {
throw new InvalidAlterTableException(errorMsg);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we could split the checks and provide distinct error messages to help users more easily identify issues related to the Paimon schema.

}
}
Comment on lines +186 to 196
Copy link

Copilot AI Jan 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The method checkAndFilterDuplicateTableChanges only handles TableChange.AddColumn operations in its retry logic (lines 186-196). If the method is called with property changes (like TableChange.SetOption or TableChange.ResetOption) when Paimon has more columns than Fluss, it will incorrectly throw an error at line 194. Consider checking if all tableChanges are AddColumn operations before entering the retry logic, or handle non-AddColumn changes appropriately.

Copilot uses AI. Check for mistakes.
return Collections.emptyList();

return false;
} catch (Catalog.TableNotExistException e) {
throw new TableNotExistException("Table " + tablePath + " does not exist.");
}
}

private boolean isColumnAlreadyExists(Schema currentSchema, TableChange.AddColumn addColumn) {
private void validateExistingColumns(
List<DataField> paimonFields,
List<org.apache.fluss.metadata.Schema.Column> flussColumns) {
for (int i = 0; i < flussColumns.size(); i++) {
if (!paimonFields.get(i).name().equals(flussColumns.get(i).getName())) {
throw new InvalidAlterTableException(
String.format(
"Column mismatch at position %d. Paimon: '%s', Fluss: '%s'",
i, paimonFields.get(i).name(), flussColumns.get(i).getName()));
}
}
}

private boolean isColumnAlreadyExists(
org.apache.paimon.types.DataField field, TableChange.AddColumn addColumn) {
String columnName = addColumn.getName();

for (org.apache.paimon.types.DataField field : currentSchema.fields()) {
if (field.name().equals(columnName)) {
org.apache.paimon.types.DataType expectedType =
addColumn
.getDataType()
.accept(
org.apache.fluss.lake.paimon.utils
.FlussDataTypeToPaimonDataType.INSTANCE);

if (!field.type().equals(expectedType)) {
throw new InvalidAlterTableException(
String.format(
"Column '%s' already exists but with different type. "
+ "Existing: %s, Expected: %s",
columnName, field.type(), expectedType));
}
String existingComment = field.description();
String expectedComment = addColumn.getComment();

boolean commentsMatch =
(existingComment == null && expectedComment == null)
|| (existingComment != null
&& existingComment.equals(expectedComment));

if (!commentsMatch) {
throw new InvalidAlterTableException(
String.format(
"Column %s already exists but with different comment. "
+ "Existing: %s, Expected: %s",
columnName, existingComment, expectedComment));
}
if (field.name().equals(columnName)) {
org.apache.paimon.types.DataType expectedType =
addColumn
.getDataType()
.accept(
org.apache.fluss.lake.paimon.utils.FlussDataTypeToPaimonDataType
.INSTANCE);

return true;
if (!field.type().equals(expectedType)) {
throw new InvalidAlterTableException(
String.format(
"Column '%s' already exists but with different type. "
+ "Existing: %s, Expected: %s",
columnName, field.type(), expectedType));
}
}
String existingComment = field.description();
String expectedComment = addColumn.getComment();

boolean commentsMatch =
(existingComment == null && expectedComment == null)
|| (existingComment != null && existingComment.equals(expectedComment));

if (!commentsMatch) {
throw new InvalidAlterTableException(
String.format(
"Column %s already exists but with different comment. "
+ "Existing: %s, Expected: %s",
columnName, existingComment, expectedComment));
}

return true;
}
return false;
}

Expand Down
Loading
Loading