-
Notifications
You must be signed in to change notification settings - Fork 481
[lake/paimon] Compare paimon schema and Fluss schema before alter table. #2331
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This pull request enhances the Paimon lake catalog integration by comparing Paimon and Fluss schemas before altering tables. This prevents schema inconsistencies that can occur when a previous alter operation partially succeeds (e.g., Paimon succeeds but Fluss fails).
Key Changes:
- Added
getFlussSchema()method toLakeCatalog.Contextinterface to provide current Fluss schema during alter operations - Implemented schema validation logic in
PaimonLakeCatalogto detect and handle schema mismatches between Paimon and Fluss - Updated
MetadataManagerandCoordinatorServiceto pass schema information via the context
Reviewed changes
Copilot reviewed 6 out of 6 changed files in this pull request and generated 7 comments.
Show a summary per file
| File | Description |
|---|---|
| fluss-common/src/main/java/org/apache/fluss/lake/lakestorage/LakeCatalog.java | Added getFlussSchema() method to Context interface |
| fluss-common/src/test/java/org/apache/fluss/lake/lakestorage/TestingLakeCatalogContext.java | Updated test context to support schema parameter |
| fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java | Updated to create context with schema, and pass FlussPrincipal instead of pre-created context |
| fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java | Changed method signatures to accept FlussPrincipal and create context locally with current schema |
| fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/PaimonLakeCatalog.java | Added schema comparison logic to detect and handle retry scenarios |
| fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/PaimonLakeCatalogTest.java | Enhanced tests to cover schema mismatch scenarios and retry logic |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| // columns or other operations next time. | ||
| String errorMsg = | ||
| String.format( | ||
| "Paimon table has more columns (%d) than Fluss schema (%d), thus need to add the diff columns at once rather than other table changes %s.", |
Copilot
AI
Jan 8, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The error message has a grammatical issue. "thus need to add" should be "thus needs to add" or better yet, rephrase to "therefore you need to add the diff columns all at once, rather than applying other table changes" for better clarity.
| "Paimon table has more columns (%d) than Fluss schema (%d), thus need to add the diff columns at once rather than other table changes %s.", | |
| "Paimon table has more columns (%d) than Fluss schema (%d); therefore you need to add the diff columns all at once, rather than applying other table changes: %s.", |
| // if paimon column size is greater than expected fluss column size, meaning last add | ||
| // columns are failed. | ||
| // Thus, this time must be retried to keep the schema same, only then can add new | ||
| // columns or other operations next time. |
Copilot
AI
Jan 8, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The comment states "meaning last add columns are failed" which should be "meaning the last add columns operation failed" for better grammar. Also, "This time must be retried" could be clearer as "This operation must retry adding the same columns".
| // if paimon column size is greater than expected fluss column size, meaning last add | |
| // columns are failed. | |
| // Thus, this time must be retried to keep the schema same, only then can add new | |
| // columns or other operations next time. | |
| // If Paimon column size is greater than the expected Fluss column size, it means the | |
| // last add columns operation failed. | |
| // Thus, this operation must retry adding the same columns to keep the schemas the | |
| // same before adding new columns or performing other operations. |
| for (int i = 0; i < paimonFields.size() - flussColumns.size(); i++) { | ||
| DataField paimonDataField = paimonFields.get(i + flussColumns.size()); | ||
| TableChange tableChange = tableChanges.get(i); | ||
| if (!(tableChange instanceof TableChange.AddColumn | ||
| && ((TableChange.AddColumn) tableChange).getPosition() | ||
| == TableChange.ColumnPosition.last() | ||
| && isColumnAlreadyExists( | ||
| paimonDataField, (TableChange.AddColumn) tableChange))) { | ||
| throw new InvalidAlterTableException(errorMsg); | ||
| } | ||
| } |
Copilot
AI
Jan 8, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The method checkAndFilterDuplicateTableChanges only handles TableChange.AddColumn operations in its retry logic (lines 186-196). If the method is called with property changes (like TableChange.SetOption or TableChange.ResetOption) when Paimon has more columns than Fluss, it will incorrectly throw an error at line 194. Consider checking if all tableChanges are AddColumn operations before entering the retry logic, or handle non-AddColumn changes appropriately.
| /** | ||
| * Get the current schema of fluss. | ||
| * | ||
| * @return the current schema of fluss, |
Copilot
AI
Jan 8, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The new method getFlussSchema() should include a @since annotation to document when it was added to the API, consistent with other methods in the interface that have @since annotations.
| * @return the current schema of fluss, | |
| * @return the current schema of fluss, | |
| * @since 0.10 |
| paimonFields.size(), flussColumns.size())); | ||
| } | ||
|
|
||
| // Validate schema compatibility |
Copilot
AI
Jan 8, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's an extra space in the comment. Should be "Validate schema compatibility" instead of "Validate schema compatibility".
| // Validate schema compatibility | |
| // Validate schema compatibility |
| * Get the current schema of fluss. | ||
| * | ||
| * @return the current schema of fluss, |
Copilot
AI
Jan 8, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The JavaDoc comment is incomplete. It has "@return the current schema of fluss," but the sentence is not finished and has a trailing comma. Consider completing the description, for example: "Get the current schema of Fluss. @return the current schema of Fluss".
| * Get the current schema of fluss. | |
| * | |
| * @return the current schema of fluss, | |
| * Get the current schema of Fluss. | |
| * | |
| * @return the current schema of Fluss. |
| TableChange tableChange = tableChanges.get(i); | ||
| if (!(tableChange instanceof TableChange.AddColumn | ||
| && ((TableChange.AddColumn) tableChange).getPosition() | ||
| == TableChange.ColumnPosition.last() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we could perform this check earlier—it’s not related to the existing Paimon schema. Also, it’s already validated here:
fluss/fluss-server/src/main/java/org/apache/fluss/server/coordinator/SchemaUpdate.java
Lines 101 to 104 in 256bd7f
| TableChange.ColumnPosition position = addColumn.getPosition(); | |
| if (position != TableChange.ColumnPosition.last()) { | |
| throw new IllegalArgumentException("Only support addColumn column at last now."); | |
| } |
| == TableChange.ColumnPosition.last() | ||
| && isColumnAlreadyExists( | ||
| paimonDataField, (TableChange.AddColumn) tableChange))) { | ||
| throw new InvalidAlterTableException(errorMsg); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we could split the checks and provide distinct error messages to help users more easily identify issues related to the Paimon schema.
Purpose
Linked issue: close #2330
Brief change log
Tests
API and Format
Documentation