feat: Add row lineage metadata columns to Iceberg reader#33
feat: Add row lineage metadata columns to Iceberg reader#33Joe-Abraham wants to merge 3 commits intomainfrom
Conversation
Reviewer's GuideAdds Iceberg V3 row lineage support to the Hive Iceberg split reader by expanding the file schema to include hidden metadata columns, computing _row_id from first_row_id and row positions (including with positional deletes), inheriting and backfilling _last_updated_sequence_number from data_sequence_number, and covering these behaviors with focused tests. Sequence diagram for Iceberg V3 row lineage handling in prepareSplit and nextsequenceDiagram
participant Engine as QueryEngine
participant SplitReader as IcebergSplitReader
participant HiveSplit as HiveSplitInfo
participant Manifest as ManifestEntry
participant Parquet as ParquetReader
Engine->>SplitReader: prepareSplit(split, readerOutputType)
SplitReader->>SplitReader: fileSchema = baseReaderOpts_.fileSchema()
alt fileSchema exists
SplitReader->>SplitReader: add _row_id, _last_updated_sequence_number to fileSchema if requested
SplitReader->>SplitReader: baseReaderOpts_.setFileSchema(expandedSchema)
end
SplitReader->>SplitReader: createReader()
SplitReader->>HiveSplit: read infoColumns[$first_row_id]
alt first_row_id >= 0
SplitReader->>SplitReader: firstRowId_ = first_row_id
else
SplitReader->>SplitReader: firstRowId_ = nullopt
end
SplitReader->>HiveSplit: read infoColumns[$data_sequence_number]
SplitReader->>SplitReader: dataSequenceNumber_ = data_sequence_number or nullopt
SplitReader->>SplitReader: rowType = getAdaptedRowType()
SplitReader->>SplitReader: scanSpec_ uses adaptColumns()
alt _last_updated_sequence_number not in file and dataSequenceNumber_ set
SplitReader->>SplitReader: set childSpec constant to dataSequenceNumber_
else _last_updated_sequence_number in file
SplitReader->>SplitReader: readLastUpdatedSeqNumFromFile_ = true
SplitReader->>SplitReader: lastUpdatedSeqNumOutputIndex_ = index in readerOutputType_
end
alt firstRowId_ set and _row_id requested
SplitReader->>SplitReader: computeRowId_ = true
SplitReader->>SplitReader: rowIdOutputIndex_ = index in readerOutputType_
end
Engine->>SplitReader: next(size, output)
SplitReader->>Parquet: next(actualSize, output, mutation)
Parquet-->>SplitReader: rowsScanned, output vectors
alt readLastUpdatedSeqNumFromFile_ and dataSequenceNumber_ set
SplitReader->>SplitReader: load _last_updated_sequence_number vector
alt vector is constant and null
SplitReader->>SplitReader: replace with constant(dataSequenceNumber_)
else vector mayHaveNulls
SplitReader->>SplitReader: create new flat vector
SplitReader->>SplitReader: replace nulls with dataSequenceNumber_
end
end
alt computeRowId_ and firstRowId_ set
SplitReader->>SplitReader: load _row_id vector
alt constant null vector
SplitReader->>SplitReader: compute _row_id = firstRowId_ + filePosition
SplitReader->>SplitReader: use mutation.deletedRows to skip deleted rows
else vector mayHaveNulls
SplitReader->>SplitReader: create new flat vector
SplitReader->>SplitReader: for nulls compute firstRowId_ + filePosition
SplitReader->>SplitReader: respect mutation.deletedRows bitmap
end
end
SplitReader-->>Engine: rowsScanned, output with lineage columns
Updated class diagram for IcebergSplitReader and IcebergMetadataColumnclassDiagram
class IcebergSplitReader {
+prepareSplit(split, readerOutputType, metadataFilter, runtimeStats, fileReadOps) void
+next(size, output) uint64_t
+adaptColumns(fileType, tableSchema) std::vector~TypePtr~
-std::list~PositionalDeleteFileReader~ positionalDeleteFileReaders_
-BufferPtr deleteBitmap_
-bool readLastUpdatedSeqNumFromFile_
-std::optional~column_index_t~ lastUpdatedSeqNumOutputIndex_
-std::optional~int64_t~ dataSequenceNumber_
-std::optional~int64_t~ firstRowId_
-bool computeRowId_
-std::optional~column_index_t~ rowIdOutputIndex_
}
class IcebergMetadataColumn {
+int id
+std::string name
+TypePtr type
+std::string comment
+static const char* kPosColumnName
+static const char* kRowIdColumnName
+static const char* kLastUpdatedSequenceNumberColumnName
+static icebergDeletionFileRowPosColumn() std::shared_ptr~IcebergMetadataColumn~
+static icebergRowIdColumn() std::shared_ptr~IcebergMetadataColumn~
+static icebergLastUpdatedSequenceNumberColumn() std::shared_ptr~IcebergMetadataColumn~
}
class SplitReader {
<<interface>>
+prepareSplit(split, readerOutputType, metadataFilter, runtimeStats, fileReadOps) void
+next(size, output) uint64_t
+adaptColumns(fileType, tableSchema) std::vector~TypePtr~
}
IcebergSplitReader --|> SplitReader
IcebergSplitReader ..> IcebergMetadataColumn : uses
File-Level Changes
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
There was a problem hiding this comment.
Hey - I've found 4 issues, and left some high level feedback:
- In
next(), the code mutates vectors returned fromBaseVector::loadedVectorShared(flatVec->set(...)and replacing children) without first ensuring they are writable/uniquely owned; consider usingensureWritable/creating new vectors to avoid corrupting shared buffers. - The logic for parsing
$data_sequence_numberfrominfoColumnsis duplicated inprepareSplit()andadaptColumns(); consider centralizing this into a helper or reusing the memberdataSequenceNumber_to keep the behavior consistent. - In
adaptColumns(), the fallbacktableSchema->findChild(fieldName)can return null when the field is missing from bothreaderOutputType_andtableSchema; it would be safer to add a check or assertion before passing this type intoBaseVector::createNullConstant.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- In `next()`, the code mutates vectors returned from `BaseVector::loadedVectorShared` (`flatVec->set(...)` and replacing children) without first ensuring they are writable/uniquely owned; consider using `ensureWritable`/creating new vectors to avoid corrupting shared buffers.
- The logic for parsing `$data_sequence_number` from `infoColumns` is duplicated in `prepareSplit()` and `adaptColumns()`; consider centralizing this into a helper or reusing the member `dataSequenceNumber_` to keep the behavior consistent.
- In `adaptColumns()`, the fallback `tableSchema->findChild(fieldName)` can return null when the field is missing from both `readerOutputType_` and `tableSchema`; it would be safer to add a check or assertion before passing this type into `BaseVector::createNullConstant`.
## Individual Comments
### Comment 1
<location path="velox/connectors/hive/iceberg/IcebergSplitReader.cpp" line_range="249-258" />
<code_context>
+ // Load lazy vector if needed — the Parquet reader wraps columns in
+ // LazyVector, which must be loaded before checking encoding.
+ const auto& loadedSeqNum = BaseVector::loadedVectorShared(seqNumChild);
+ if (loadedSeqNum->isConstantEncoding()) {
+ auto* simpleVec = loadedSeqNum->as<SimpleVector<int64_t>>();
+ if (simpleVec && simpleVec->isNullAt(0)) {
+ seqNumChild = std::make_shared<ConstantVector<int64_t>>(
+ connectorQueryCtx_->memoryPool(),
+ rowsScanned,
+ false,
+ BIGINT(),
+ static_cast<int64_t>(*dataSequenceNumber_));
+ }
+ } else if (auto* flatVec = loadedSeqNum->asFlatVector<int64_t>()) {
+ for (vector_size_t i = 0; i < rowsScanned; ++i) {
+ if (flatVec->isNullAt(i)) {
</code_context>
<issue_to_address>
**issue (bug_risk):** Handle non-constant, non-flat encodings for `_last_updated_sequence_number` to avoid missing null inheritance.
This logic only handles constant and flat encodings. If `_last_updated_sequence_number` is dictionary-encoded (or wrapped in another encoding that `asFlatVector` doesn’t support), nulls will remain and the inheritance rule (null → manifest sequence number) won’t be applied, leading to incorrect lineage. To make this encoding-agnostic, either peel dictionary/sequence wrappers until you reach a writable flat vector, or use `BaseVector::ensureWritable` on the child before iterating and filling nulls.
</issue_to_address>
### Comment 2
<location path="velox/connectors/hive/iceberg/IcebergSplitReader.cpp" line_range="282-291" />
<code_context>
+ if (loadedRowId->isConstantEncoding()) {
</code_context>
<issue_to_address>
**issue (bug_risk):** Extend `_row_id` computation to support dictionary/other encodings, not only constant and flat vectors.
Right now this only handles constant and flat encodings: if `_row_id` is a dictionary/sequence vector, `asFlatVector<int64_t>()` returns null and the fill step is skipped, so `_row_id` stays null even though `firstRowId_` is set. To avoid silently missing row-id population for those cases, normalize the child to a writable flat vector (e.g., via `ensureWritable` or peeling dictionary layers) before applying the fill logic.
</issue_to_address>
### Comment 3
<location path="velox/connectors/hive/iceberg/tests/IcebergReadTest.cpp" line_range="981-990" />
<code_context>
+TEST_F(HiveIcebergTest, readRowIdColumn) {
</code_context>
<issue_to_address>
**suggestion (testing):** Add coverage for `_row_id` when it exists in the file but has nulls that must be backfilled from `first_row_id + _pos`
`IcebergSplitReader::next` has explicit handling for physically present `_row_id` values that are null, backfilling them as `first_row_id + _pos` (for both constant/flat encodings and with delete bitmaps). We currently cover only: fully present `_row_id`, fully absent `_row_id`, and computed `_row_id` with deletes. Please add a test where the data file contains `_row_id` with a mix of non‑null and null entries, `first_row_id` is set on the split, and the expected results confirm that existing non‑null values are preserved while nulls are backfilled as `first_row_id + _pos`. If feasible, a variant with positional deletes would also exercise the `mutation.deletedRows` branch.
Suggested implementation:
```cpp
// Write data file with c0 and _row_id columns. Use a mix of non-null and null
// values for _row_id so that IcebergSplitReader::next must backfill the nulls
// as first_row_id + _pos while preserving existing non-null values.
std::vector<RowVectorPtr> dataVectors;
dataVectors.push_back(makeRowVector(
fileRowType->names(),
{
makeFlatVector<int64_t>({0, 1, 2, 3, 4}),
makeNullableFlatVector<int64_t>(
{1000,
std::nullopt,
1002,
std::nullopt,
1004}),
```
The above change ensures the data file contains a `_row_id` column with a mix of non-null and null entries. To fully implement the requested coverage:
1. Make sure the split used in this test has `first_row_id` (or equivalent field) set so that `IcebergSplitReader::next` will backfill null `_row_id` entries as `first_row_id + _pos`. If the current test does not set this, update the split construction accordingly (e.g., when calling the helper that creates the `IcebergSplit`, pass a non-zero `firstRowId`).
2. Update the assertions for this test so that the expected `_row_id` values:
- Preserve the non-null values from the file (`1000`, `1002`, `1004` in the example), and
- Backfill the null positions with `first_row_id + _pos` based on the configured `first_row_id`.
For example, if `first_row_id` is set to `2000`, and the positions are `[0,1,2,3,4]`, the expected `_row_id` column would be `[1000, 2001, 1002, 2003, 1004]`.
3. If your test suite already has utilities to exercise positional deletes (e.g., creating delete files and wiring them into an `IcebergSplit` with `mutation.deletedRows` populated), add a second test variant (e.g., `readRowIdColumnWithNullsAndDeletes`) that reuses a similar data vector but also configures a positional delete file. The expected results should again validate that:
- Rows marked as deleted are filtered out, and
- Remaining rows have `_row_id` values where non-null physical values are preserved and nulls are backfilled via `first_row_id + _pos`, covering the `mutation.deletedRows` branch in `IcebergSplitReader::next`.
</issue_to_address>
### Comment 4
<location path="velox/connectors/hive/iceberg/tests/IcebergReadTest.cpp" line_range="1168-1081" />
<code_context>
+TEST_F(HiveIcebergTest, readLastUpdatedSequenceNumberAllNulls) {
</code_context>
<issue_to_address>
**suggestion (testing):** Add a test for `_last_updated_sequence_number` behavior when `$data_sequence_number` is missing
Current tests only cover cases where `$data_sequence_number` is present in `infoColumns`, so they only validate inheritance/null replacement when that column exists. The implementation also handles the case where `$data_sequence_number` is missing or unparsable, in which `_last_updated_sequence_number` should not be rewritten and nulls should remain null.
Please add a test that:
- Writes a file with `_last_updated_sequence_number` containing nulls.
- Creates a split **without** `$data_sequence_number`.
- Reads `c0` and `_last_updated_sequence_number` and asserts that the nulls are preserved.
This will cover the "no data sequence number" path and guard against unintended inheritance in that scenario.
Suggested implementation:
```cpp
AssertQueryBuilder(plan).splits({split}).assertResults(expected);
}
// Tests reading _last_updated_sequence_number from a data file with null
// values when $data_sequence_number is missing from the info columns. In this
// case, null values should be preserved and must NOT be replaced or
// inherited.
TEST_F(HiveIcebergTest, readLastUpdatedSequenceNumberAllNullsNoDataSequenceNumber) {
folly::SingletonVault::singleton()->registrationComplete();
auto fileRowType =
ROW({"c0", "_last_updated_sequence_number"}, {BIGINT(), BIGINT()});
// Write data file with _last_updated_sequence_number all set to null.
std::vector<RowVectorPtr> dataVectors;
dataVectors.push_back(makeRowVector(
fileRowType->names(),
{
makeFlatVector<int64_t>({1, 2, 3}),
BaseVector::createNullConstant(BIGINT(), 3, pool()),
}));
auto filePath = TempFilePath::create();
writeToFile(filePath->path, fileRowType, dataVectors);
// Build a plan that reads c0 and _last_updated_sequence_number.
auto plan = PlanBuilder()
.tableScan(
fileRowType,
{"c0", "_last_updated_sequence_number"})
.planNode();
// Create a split WITHOUT $data_sequence_number in the info columns to cover
// the "no data sequence number" code path.
auto split = exec::Split(makeHiveConnectorSplit(
filePath->path,
dwio::common::FileFormat::DWRF,
{},
{} /* infoColumns: no $data_sequence_number */));
// Expect _last_updated_sequence_number to remain null for all rows.
auto expected = makeRowVector(
{"c0", "_last_updated_sequence_number"},
{
makeFlatVector<int64_t>({1, 2, 3}),
BaseVector::createNullConstant(BIGINT(), 3, pool()),
});
AssertQueryBuilder(plan).splits({split}).assertResults(expected);
}
// Tests reading _last_updated_sequence_number from a data file with null
// values. Per the Iceberg V3 spec, null values should be replaced with the
// data sequence number from the file's manifest entry (provided via
// $data_sequence_number info column).
TEST_F(HiveIcebergTest, readLastUpdatedSequenceNumberAllNulls) {
```
The new test assumes the following helpers and types already exist in this file/test base (they are commonly present in Velox tests but please adjust to match your code):
1. `writeToFile(const std::string& path, const RowTypePtr& rowType, const std::vector<RowVectorPtr>& dataVectors);`
2. `makeHiveConnectorSplit(const std::string& path,
dwio::common::FileFormat format,
const std::unordered_map<std::string, std::optional<std::string>>& partitionKeys,
const std::unordered_map<std::string, std::optional<std::string>>& infoColumns);`
3. `PlanBuilder().tableScan(rowType, columnNames)` overload (if your `tableScan` requires additional arguments, e.g. filters or subfield specs, add the appropriate parameters as in the neighboring tests).
4. `exec::Split` wrapper around `makeHiveConnectorSplit`; if your tests pass splits differently (e.g. directly using `core::Split` or a vector of `exec::Split`), adapt the `split` construction to match the pattern used in the existing `readLastUpdatedSequenceNumberAllNulls` test.
To keep consistency with your codebase, you may want to:
- Use the same number of rows and `c0` values as in `readLastUpdatedSequenceNumberAllNulls`.
- Use the exact same `PlanBuilder`/`split` construction as that test, changing only the infoColumns argument so that `$data_sequence_number` is omitted.
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
| TEST_F(HiveIcebergTest, readRowIdColumn) { | ||
| folly::SingletonVault::singleton()->registrationComplete(); | ||
|
|
||
| auto rowIdColumn = IcebergMetadataColumn::icebergRowIdColumn(); | ||
| auto fileRowType = ROW({"c0", "_row_id"}, {BIGINT(), BIGINT()}); | ||
|
|
||
| // Write data file with c0 and _row_id columns. | ||
| std::vector<RowVectorPtr> dataVectors; | ||
| dataVectors.push_back(makeRowVector( | ||
| fileRowType->names(), |
There was a problem hiding this comment.
suggestion (testing): Add coverage for _row_id when it exists in the file but has nulls that must be backfilled from first_row_id + _pos
IcebergSplitReader::next has explicit handling for physically present _row_id values that are null, backfilling them as first_row_id + _pos (for both constant/flat encodings and with delete bitmaps). We currently cover only: fully present _row_id, fully absent _row_id, and computed _row_id with deletes. Please add a test where the data file contains _row_id with a mix of non‑null and null entries, first_row_id is set on the split, and the expected results confirm that existing non‑null values are preserved while nulls are backfilled as first_row_id + _pos. If feasible, a variant with positional deletes would also exercise the mutation.deletedRows branch.
Suggested implementation:
// Write data file with c0 and _row_id columns. Use a mix of non-null and null
// values for _row_id so that IcebergSplitReader::next must backfill the nulls
// as first_row_id + _pos while preserving existing non-null values.
std::vector<RowVectorPtr> dataVectors;
dataVectors.push_back(makeRowVector(
fileRowType->names(),
{
makeFlatVector<int64_t>({0, 1, 2, 3, 4}),
makeNullableFlatVector<int64_t>(
{1000,
std::nullopt,
1002,
std::nullopt,
1004}),
The above change ensures the data file contains a _row_id column with a mix of non-null and null entries. To fully implement the requested coverage:
- Make sure the split used in this test has
first_row_id(or equivalent field) set so thatIcebergSplitReader::nextwill backfill null_row_identries asfirst_row_id + _pos. If the current test does not set this, update the split construction accordingly (e.g., when calling the helper that creates theIcebergSplit, pass a non-zerofirstRowId). - Update the assertions for this test so that the expected
_row_idvalues:- Preserve the non-null values from the file (
1000,1002,1004in the example), and - Backfill the null positions with
first_row_id + _posbased on the configuredfirst_row_id.
For example, iffirst_row_idis set to2000, and the positions are[0,1,2,3,4], the expected_row_idcolumn would be[1000, 2001, 1002, 2003, 1004].
- Preserve the non-null values from the file (
- If your test suite already has utilities to exercise positional deletes (e.g., creating delete files and wiring them into an
IcebergSplitwithmutation.deletedRowspopulated), add a second test variant (e.g.,readRowIdColumnWithNullsAndDeletes) that reuses a similar data vector but also configures a positional delete file. The expected results should again validate that:- Rows marked as deleted are filtered out, and
- Remaining rows have
_row_idvalues where non-null physical values are preserved and nulls are backfilled viafirst_row_id + _pos, covering themutation.deletedRowsbranch inIcebergSplitReader::next.
| makeFlatVector<int64_t>({100, 101, 102}), | ||
| })); | ||
|
|
||
| AssertQueryBuilder(plan).splits({split}).assertResults(expected); |
There was a problem hiding this comment.
suggestion (testing): Add a test for _last_updated_sequence_number behavior when $data_sequence_number is missing
Current tests only cover cases where $data_sequence_number is present in infoColumns, so they only validate inheritance/null replacement when that column exists. The implementation also handles the case where $data_sequence_number is missing or unparsable, in which _last_updated_sequence_number should not be rewritten and nulls should remain null.
Please add a test that:
- Writes a file with
_last_updated_sequence_numbercontaining nulls. - Creates a split without
$data_sequence_number. - Reads
c0and_last_updated_sequence_numberand asserts that the nulls are preserved.
This will cover the "no data sequence number" path and guard against unintended inheritance in that scenario.
Suggested implementation:
AssertQueryBuilder(plan).splits({split}).assertResults(expected);
}
// Tests reading _last_updated_sequence_number from a data file with null
// values when $data_sequence_number is missing from the info columns. In this
// case, null values should be preserved and must NOT be replaced or
// inherited.
TEST_F(HiveIcebergTest, readLastUpdatedSequenceNumberAllNullsNoDataSequenceNumber) {
folly::SingletonVault::singleton()->registrationComplete();
auto fileRowType =
ROW({"c0", "_last_updated_sequence_number"}, {BIGINT(), BIGINT()});
// Write data file with _last_updated_sequence_number all set to null.
std::vector<RowVectorPtr> dataVectors;
dataVectors.push_back(makeRowVector(
fileRowType->names(),
{
makeFlatVector<int64_t>({1, 2, 3}),
BaseVector::createNullConstant(BIGINT(), 3, pool()),
}));
auto filePath = TempFilePath::create();
writeToFile(filePath->path, fileRowType, dataVectors);
// Build a plan that reads c0 and _last_updated_sequence_number.
auto plan = PlanBuilder()
.tableScan(
fileRowType,
{"c0", "_last_updated_sequence_number"})
.planNode();
// Create a split WITHOUT $data_sequence_number in the info columns to cover
// the "no data sequence number" code path.
auto split = exec::Split(makeHiveConnectorSplit(
filePath->path,
dwio::common::FileFormat::DWRF,
{},
{} /* infoColumns: no $data_sequence_number */));
// Expect _last_updated_sequence_number to remain null for all rows.
auto expected = makeRowVector(
{"c0", "_last_updated_sequence_number"},
{
makeFlatVector<int64_t>({1, 2, 3}),
BaseVector::createNullConstant(BIGINT(), 3, pool()),
});
AssertQueryBuilder(plan).splits({split}).assertResults(expected);
}
// Tests reading _last_updated_sequence_number from a data file with null
// values. Per the Iceberg V3 spec, null values should be replaced with the
// data sequence number from the file's manifest entry (provided via
// $data_sequence_number info column).
TEST_F(HiveIcebergTest, readLastUpdatedSequenceNumberAllNulls) {
The new test assumes the following helpers and types already exist in this file/test base (they are commonly present in Velox tests but please adjust to match your code):
writeToFile(const std::string& path, const RowTypePtr& rowType, const std::vector<RowVectorPtr>& dataVectors);makeHiveConnectorSplit(const std::string& path, dwio::common::FileFormat format, const std::unordered_map<std::string, std::optional<std::string>>& partitionKeys, const std::unordered_map<std::string, std::optional<std::string>>& infoColumns);PlanBuilder().tableScan(rowType, columnNames)overload (if yourtableScanrequires additional arguments, e.g. filters or subfield specs, add the appropriate parameters as in the neighboring tests).exec::Splitwrapper aroundmakeHiveConnectorSplit; if your tests pass splits differently (e.g. directly usingcore::Splitor a vector ofexec::Split), adapt thesplitconstruction to match the pattern used in the existingreadLastUpdatedSequenceNumberAllNullstest.
To keep consistency with your codebase, you may want to:
- Use the same number of rows and
c0values as inreadLastUpdatedSequenceNumberAllNulls. - Use the exact same
PlanBuilder/splitconstruction as that test, changing only the infoColumns argument so that$data_sequence_numberis omitted.
There was a problem hiding this comment.
Pull request overview
Adds Iceberg V3 row lineage support in the Hive Iceberg connector by introducing hidden metadata columns and read-time derivation/backfilling logic.
Changes:
- Add Iceberg metadata column definitions for
_row_idand_last_updated_sequence_number. - Expand reader file schema to include requested row lineage columns and implement read-time computation/backfill in
IcebergSplitReader. - Add unit tests covering reading/inheriting/computing row lineage columns (including deletes).
Reviewed changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated 9 comments.
| File | Description |
|---|---|
velox/connectors/hive/iceberg/tests/IcebergReadTest.cpp |
Adds new tests for reading/computing _row_id and inheriting/backfilling _last_updated_sequence_number. |
velox/connectors/hive/iceberg/IcebergSplitReader.h |
Documents new row lineage handling and adds state needed to drive read-time backfill/computation. |
velox/connectors/hive/iceberg/IcebergSplitReader.cpp |
Implements file schema expansion plus read-time null replacement and _row_id computation. |
velox/connectors/hive/iceberg/IcebergMetadataColumns.h |
Defines the two new Iceberg V3 row lineage metadata columns and their IDs/names. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| auto fileRowType = ROW({"c0"}, {BIGINT()}); | ||
|
|
There was a problem hiding this comment.
fileRowType is declared but never used in this test. If warnings are promoted to errors, this will break the build; please remove it or mark it [[maybe_unused]].
| auto fileRowType = ROW({"c0"}, {BIGINT()}); | ||
|
|
There was a problem hiding this comment.
fileRowType is declared but never used in this test. Please remove it or mark it [[maybe_unused]] to avoid unused-variable warnings (which may be -Werror in CI).
| auto fileRowType = ROW({"c0"}, {BIGINT()}); | ||
|
|
There was a problem hiding this comment.
fileRowType is declared but never used in this test. Please remove it or mark it [[maybe_unused]] to avoid unused-variable warnings (which may be -Werror in CI).
| // True if _last_updated_sequence_number is read from the data file (not set | ||
| // as a constant). Set in adaptColumns(). | ||
| bool readLastUpdatedSeqNumFromFile_{false}; |
There was a problem hiding this comment.
Comment mismatch: readLastUpdatedSeqNumFromFile_ is documented as “Set in adaptColumns()”, but it’s actually initialized and set in prepareSplit() based on the scan spec and $data_sequence_number. Please update the comment to match where the flag is set/maintained.
| auto rowIdColumn = IcebergMetadataColumn::icebergRowIdColumn(); | ||
| auto fileRowType = ROW({"c0", "_row_id"}, {BIGINT(), BIGINT()}); |
There was a problem hiding this comment.
rowIdColumn is declared but never used. With warnings treated as errors in some builds, this can fail compilation; please remove it or mark it [[maybe_unused]].
| testing::internal::GetFileSize( | ||
| std::fopen(deleteFilePath->getPath().c_str(), "r")), | ||
| {}, | ||
| {}, | ||
| {{posColumn->id, "3"}}); |
There was a problem hiding this comment.
The positional delete upper bound is stored as Base64-encoded 8-byte little-endian binary (see existing skipDeleteFileByPositionUpperBound test). Passing the literal string "3" here will fail decoding/size checks in IcebergSplitReader::prepareSplit() and cause the test to crash. Please encode the bound using the same helper logic as the existing test.
| testing::internal::GetFileSize( | ||
| std::fopen(deleteFilePath->getPath().c_str(), "r")), | ||
| {}, |
There was a problem hiding this comment.
This uses testing::internal::GetFileSize(std::fopen(...)) without closing the FILE*, which can leak file descriptors and trip sanitizers. Prefer using filesystems::getFileSystem(...)->openFileForRead(...)->size() (as done elsewhere in this file) or std::filesystem::file_size instead of fopen/gtest internals.
| auto rowsScanned = baseRowReader_->next(actualSize, output, &mutation); | ||
|
|
||
| // For Iceberg V3 row lineage: replace null values in | ||
| // _last_updated_sequence_number with the data sequence number from the | ||
| // file's manifest entry. Per the spec, null means the value should be | ||
| // inherited from the manifest entry's sequence number. | ||
| if (readLastUpdatedSeqNumFromFile_ && dataSequenceNumber_.has_value() && | ||
| lastUpdatedSeqNumOutputIndex_.has_value() && rowsScanned > 0) { | ||
| auto* rowOutput = output->as<RowVector>(); | ||
| if (rowOutput) { | ||
| auto& seqNumChild = rowOutput->childAt(*lastUpdatedSeqNumOutputIndex_); | ||
| // Load lazy vector if needed — the Parquet reader wraps columns in | ||
| // LazyVector, which must be loaded before checking encoding. | ||
| const auto& loadedSeqNum = BaseVector::loadedVectorShared(seqNumChild); | ||
| if (loadedSeqNum->isConstantEncoding()) { | ||
| auto* simpleVec = loadedSeqNum->as<SimpleVector<int64_t>>(); | ||
| if (simpleVec && simpleVec->isNullAt(0)) { | ||
| seqNumChild = std::make_shared<ConstantVector<int64_t>>( | ||
| connectorQueryCtx_->memoryPool(), | ||
| rowsScanned, | ||
| false, | ||
| BIGINT(), | ||
| static_cast<int64_t>(*dataSequenceNumber_)); | ||
| } | ||
| } else if (auto* flatVec = loadedSeqNum->asFlatVector<int64_t>()) { | ||
| for (vector_size_t i = 0; i < rowsScanned; ++i) { | ||
| if (flatVec->isNullAt(i)) { | ||
| flatVec->set(i, *dataSequenceNumber_); | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
baseRowReader_->next() returns the number of rows scanned (i.e. actualSize), not the number of rows produced in output after filters/deletes. This block iterates up to rowsScanned and even creates vectors sized rowsScanned, which will go out-of-bounds or produce child vectors with the wrong length when output->size() < rowsScanned (common with filters or positional deletes). Use rowOutput->size() (or output->size()) for loops and created vector sizes, and handle dictionary-wrapped results (e.g. after filtering) so null replacement still runs.
| // For Iceberg V3 row lineage: compute _row_id = first_row_id + _pos. | ||
| // When the data file doesn't contain _row_id physically, compute it from | ||
| // the manifest entry's first_row_id plus the row's position in the file. | ||
| // When positional deletes are applied, we track actual file positions | ||
| // using the deletion bitmap. | ||
| if (computeRowId_ && firstRowId_.has_value() && | ||
| rowIdOutputIndex_.has_value() && rowsScanned > 0) { | ||
| auto* rowOutput = output->as<RowVector>(); | ||
| if (rowOutput) { | ||
| auto& rowIdChild = rowOutput->childAt(*rowIdOutputIndex_); | ||
| // Load lazy vector if needed — the Parquet reader wraps columns in | ||
| // LazyVector, which must be loaded before checking encoding. | ||
| const auto& loadedRowId = BaseVector::loadedVectorShared(rowIdChild); | ||
| if (loadedRowId->isConstantEncoding()) { | ||
| auto* simpleVec = loadedRowId->as<SimpleVector<int64_t>>(); | ||
| if (simpleVec && simpleVec->isNullAt(0)) { | ||
| auto pool = connectorQueryCtx_->memoryPool(); | ||
| auto flatVec = BaseVector::create<FlatVector<int64_t>>( | ||
| BIGINT(), rowsScanned, pool); | ||
| int64_t batchStartPos = splitOffset_ + baseReadOffset_; | ||
| if (mutation.deletedRows == nullptr) { | ||
| // No deletions: positions are contiguous. | ||
| for (vector_size_t i = 0; i < rowsScanned; ++i) { | ||
| flatVec->set( | ||
| i, static_cast<int64_t>(*firstRowId_) + batchStartPos + i); | ||
| } | ||
| } else { | ||
| // With deletions: use the bitmap to find actual file positions | ||
| // of surviving rows. | ||
| vector_size_t outputIdx = 0; | ||
| for (vector_size_t j = 0; | ||
| j < static_cast<vector_size_t>(actualSize) && | ||
| outputIdx < rowsScanned; | ||
| ++j) { | ||
| if (!bits::isBitSet(mutation.deletedRows, j)) { | ||
| flatVec->set( | ||
| outputIdx++, | ||
| static_cast<int64_t>(*firstRowId_) + batchStartPos + j); | ||
| } | ||
| } | ||
| } | ||
| rowIdChild = flatVec; | ||
| } | ||
| } else if (auto* flatVec = loadedRowId->asFlatVector<int64_t>()) { | ||
| // Column is in the file but has some null values — replace nulls | ||
| // with first_row_id + _pos. | ||
| int64_t batchStartPos = splitOffset_ + baseReadOffset_; | ||
| if (mutation.deletedRows == nullptr) { | ||
| for (vector_size_t i = 0; i < rowsScanned; ++i) { | ||
| if (flatVec->isNullAt(i)) { | ||
| flatVec->set( | ||
| i, static_cast<int64_t>(*firstRowId_) + batchStartPos + i); | ||
| } | ||
| } | ||
| } else { | ||
| vector_size_t outputIdx = 0; | ||
| for (vector_size_t j = 0; | ||
| j < static_cast<vector_size_t>(actualSize) && | ||
| outputIdx < rowsScanned; | ||
| ++j) { | ||
| if (!bits::isBitSet(mutation.deletedRows, j)) { | ||
| if (flatVec->isNullAt(outputIdx)) { | ||
| flatVec->set( | ||
| outputIdx, | ||
| static_cast<int64_t>(*firstRowId_) + batchStartPos + j); | ||
| } | ||
| ++outputIdx; | ||
| } | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
The _row_id computation currently assumes output row i corresponds to file position batchStartPos + i (or reconstructs positions using only the delete bitmap). This breaks as soon as any filter (or randomSkip) is applied because the output is dictionary-wrapped with indices into the scanned batch; positions are no longer contiguous and aren’t derivable from i. Consider using the dictionary indices (or a decoded vector) to map each output row back to its scanned-row offset j, then compute first_row_id + batchStartPos + j (this also removes the need for the separate delete-bitmap walk). Also, avoid using rowsScanned for vector sizing/iteration here for the same reason as above.
c0b4a99 to
7920d3a
Compare
|
@sourcery-ai review |
There was a problem hiding this comment.
Hey - I've found 4 issues, and left some high level feedback:
- In the null-replacement paths for
_last_updated_sequence_numberand_row_id,seqNumChild/rowIdChildare cast toSimpleVector<int64_t>even whenmayHaveNulls()is true, but the vectors can still be dictionary/sequence encoded at that point; consider peeling encodings or usingBaseVector::ensureWritable/DecodedVectorinstead of relying onas<SimpleVector>to truly handle any vector encoding. - The
_row_idcomputation with positional deletes assumes a direct mapping betweenactualSize/mutation.deletedRowsand the output vector indices; it would be safer to document or assert this contract, or derive positions based on the same index space as the producedrowIdChildto avoid subtle off-by-one or encoding-related mismatches.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- In the null-replacement paths for `_last_updated_sequence_number` and `_row_id`, `seqNumChild`/`rowIdChild` are cast to `SimpleVector<int64_t>` even when `mayHaveNulls()` is true, but the vectors can still be dictionary/sequence encoded at that point; consider peeling encodings or using `BaseVector::ensureWritable`/`DecodedVector` instead of relying on `as<SimpleVector>` to truly handle any vector encoding.
- The `_row_id` computation with positional deletes assumes a direct mapping between `actualSize`/`mutation.deletedRows` and the output vector indices; it would be safer to document or assert this contract, or derive positions based on the same index space as the produced `rowIdChild` to avoid subtle off-by-one or encoding-related mismatches.
## Individual Comments
### Comment 1
<location path="velox/connectors/hive/iceberg/IcebergSplitReader.cpp" line_range="260-275" />
<code_context>
+ BIGINT(),
+ static_cast<int64_t>(*dataSequenceNumber_));
+ }
+ } else if (seqNumChild->mayHaveNulls()) {
+ // Handle any vector encoding (flat, dictionary, etc.) by creating
+ // a new flat vector with null values replaced.
+ auto pool = connectorQueryCtx_->memoryPool();
+ auto* simpleVec = seqNumChild->as<SimpleVector<int64_t>>();
+ auto newFlat =
+ BaseVector::create<FlatVector<int64_t>>(BIGINT(), vectorSize, pool);
+ for (vector_size_t i = 0; i < vectorSize; ++i) {
+ if (simpleVec->isNullAt(i)) {
+ newFlat->set(i, *dataSequenceNumber_);
+ } else {
+ newFlat->set(i, simpleVec->valueAt(i));
+ }
+ }
+ seqNumChild = newFlat;
+ }
+ }
</code_context>
<issue_to_address>
**suggestion (performance):** Avoid always copying the vector when there are no actual nulls present
In the `mayHaveNulls()` branch we always allocate and populate a new `FlatVector<int64_t>` for `_last_updated_sequence_number`, which can be costly for wide vectors and high‑throughput workloads. Since `mayHaveNulls()` is conservative, consider first checking for actual nulls (e.g., via the nulls buffer / null count) and, if none are present, reuse the original vector instead of copying.
```suggestion
} else if (seqNumChild->mayHaveNulls()) {
// Fast-path: 'mayHaveNulls' is conservative; only rewrite when we
// actually have nulls to replace.
if (seqNumChild->hasNulls()) {
// Handle any vector encoding (flat, dictionary, etc.) by creating
// a new flat vector with null values replaced.
auto pool = connectorQueryCtx_->memoryPool();
auto* simpleVec = seqNumChild->as<SimpleVector<int64_t>>();
auto newFlat =
BaseVector::create<FlatVector<int64_t>>(BIGINT(), vectorSize, pool);
for (vector_size_t i = 0; i < vectorSize; ++i) {
if (simpleVec->isNullAt(i)) {
newFlat->set(i, *dataSequenceNumber_);
} else {
newFlat->set(i, simpleVec->valueAt(i));
}
}
seqNumChild = newFlat;
}
}
```
</issue_to_address>
### Comment 2
<location path="velox/connectors/hive/iceberg/IcebergSplitReader.cpp" line_range="320-329" />
<code_context>
+ } else if (rowIdChild->mayHaveNulls()) {
</code_context>
<issue_to_address>
**suggestion (performance):** Similar optimization opportunity for _row_id null replacement path
This mirrors the `_last_updated_sequence_number` path and will always allocate and fill a new `FlatVector<int64_t>` whenever `mayHaveNulls()` is true. When the column is nullable but the batch contains no actual nulls, this extra copy is unnecessary. As above, consider checking for the presence of any nulls first and skipping the copy if there are none.
Suggested implementation:
```cpp
} else if (rowIdChild->mayHaveNulls()) {
// Column is in the file but has some null values — replace nulls
// with first_row_id + _pos. Handle any vector encoding.
auto* simpleVec = rowIdChild->as<SimpleVector<int64_t>>();
// Fast path: nullable type but no actual nulls and no deleted rows,
// so we can reuse the existing vector without copying.
bool hasNulls = false;
for (vector_size_t i = 0; i < vectorSize; ++i) {
if (simpleVec->isNullAt(i)) {
hasNulls = true;
break;
}
}
if (!hasNulls && mutation.deletedRows == nullptr) {
// No nulls to replace and no deleted rows to filter out.
// Keep rowIdChild as-is.
} else {
// We either have nulls to replace or deleted rows to filter, so
// materialize into a new flat vector.
auto pool = connectorQueryCtx_->memoryPool();
auto newFlat = BaseVector::create<FlatVector<int64_t>>(
BIGINT(), vectorSize, pool);
int64_t batchStartPos = splitOffset_ + baseReadOffset_;
if (mutation.deletedRows == nullptr) {
// Replace nulls in-place, preserve existing non-null values.
for (vector_size_t i = 0; i < vectorSize; ++i) {
if (simpleVec->isNullAt(i)) {
newFlat->set(
i,
static_cast<int64_t>(*firstRowId_) + batchStartPos + i);
} else {
newFlat->set(i, simpleVec->valueAt(i));
}
}
} else {
// We also need to account for deleted rows. Skip deleted rows
// while replacing nulls and preserving existing non-null values.
vector_size_t outputIdx = 0;
for (vector_size_t j = 0; j < vectorSize; ++j) {
if (!bits::isBitSet(mutation.deletedRows, j)) {
if (simpleVec->isNullAt(j)) {
newFlat->set(
outputIdx++,
static_cast<int64_t>(*firstRowId_) + batchStartPos + j);
} else {
newFlat->set(outputIdx++, simpleVec->valueAt(j));
}
}
}
}
rowIdChild = newFlat;
}
```
The replacement block assumes the intent of the original null-replacement path:
1. Nulls are replaced with `first_row_id + splitOffset_ + baseReadOffset_ + position`.
2. Non-null values are preserved.
3. When `mutation.deletedRows` is non-null, deleted rows are skipped while applying the above logic.
If there is additional logic following the truncated `if (simpleVec->isNullAt(i)) {` in your original file (e.g., special encoding handling), re-integrate it inside the new `if (mutation.deletedRows == nullptr)` / `else` branches as appropriate, preserving the new early-exit optimization that skips allocation when there are no nulls and no deleted rows.
</issue_to_address>
### Comment 3
<location path="velox/connectors/hive/iceberg/IcebergSplitReader.cpp" line_range="423-429" />
<code_context>
+ // metadata column like _row_id (in readerOutputType_ but not
+ // in tableSchema). Try readerOutputType_ first, then fall
+ // back to tableSchema.
+ auto outputIdx = readerOutputType_->getChildIdxIfExists(fieldName);
+ auto colType = outputIdx.has_value()
+ ? readerOutputType_->childAt(*outputIdx)
+ : tableSchema->findChild(fieldName);
childSpec->setConstantValue(
BaseVector::createNullConstant(
- tableSchema->findChild(fieldName),
- 1,
- connectorQueryCtx_->memoryPool()));
+ colType, 1, connectorQueryCtx_->memoryPool()));
}
}
</code_context>
<issue_to_address>
**issue (bug_risk):** Guard against null `colType` when constructing a null constant for missing columns
In this fallback branch, `colType` may be null if the field name is not found in either `readerOutputType_` or `tableSchema`. Passing a null type into `createNullConstant(colType, ...)` would likely crash. Consider adding a `VELOX_CHECK` to enforce non-null here or handling the null case explicitly with a clear error path instead of failing implicitly.
</issue_to_address>
### Comment 4
<location path="velox/connectors/hive/iceberg/tests/IcebergReadTest.cpp" line_range="1236" />
<code_context>
+// Tests reading _last_updated_sequence_number with mixed values. Non-null
+// values should be preserved, while null values should be replaced with the
+// data sequence number.
+TEST_F(HiveIcebergTest, readLastUpdatedSequenceNumberMixed) {
+ folly::SingletonVault::singleton()->registrationComplete();
+
</code_context>
<issue_to_address>
**suggestion (testing):** Add a test covering the case where _last_updated_sequence_number has nulls but $data_sequence_number is not provided, to confirm nulls stay null.
Since null replacement is gated on `dataSequenceNumber_.has_value()`, no backfill should occur when `$data_sequence_number` is absent and `_last_updated_sequence_number` contains nulls. Please add a test, modeled on `readLastUpdatedSequenceNumberMixed`, that omits `$data_sequence_number` from the split’s `infoColumns` and asserts that nulls remain null and non-null values are unchanged, to guard against regressions assuming `$data_sequence_number` is always present.
Suggested implementation:
```cpp
// Tests reading _last_updated_sequence_number with mixed values. Non-null
// values should be preserved, while null values should be replaced with the
// data sequence number.
// Tests reading _last_updated_sequence_number with mixed values when
// $data_sequence_number is not provided in the split's infoColumns. In this
// case, null values should remain null and non-null values should be
// preserved as-is, with no backfill.
TEST_F(HiveIcebergTest, readLastUpdatedSequenceNumberMixedWithoutDataSequenceNumber) {
folly::SingletonVault::singleton()->registrationComplete();
// File schema contains a payload column and the _last_updated_sequence_number
// column with mixed null and non-null values.
auto fileRowType = ROW(
{"c0", "_last_updated_sequence_number"},
{BIGINT(), BIGINT()});
// Create a file with mixed null / non-null values in
// _last_updated_sequence_number.
auto file = makeRowVector(
{makeFlatVector<int64_t>({1, 2, 3, 4, 5}),
makeFlatVector<int64_t>(
{std::optional<int64_t>{std::nullopt},
std::optional<int64_t>{10},
std::optional<int64_t>{std::nullopt},
std::optional<int64_t>{20},
std::optional<int64_t>{30}})});
auto filePath = TempFilePath::create();
writeToFile(filePath->path, fileRowType, {file});
// Build a split that does NOT include $data_sequence_number in infoColumns.
// This ensures dataSequenceNumber_.has_value() is false, so no backfill
// into _last_updated_sequence_number should occur.
auto split = HiveConnectorSplitBuilder(filePath->path)
.fileFormat(common::FileFormat::PARQUET)
.tableType(IcebergTableType::kData)
.fileSchema(fileRowType)
// Intentionally no $data_sequence_number info column here.
.build();
// Project only _last_updated_sequence_number so we can assert its values are
// unchanged (nulls remain null, non-nulls are preserved).
auto plan = PlanBuilder()
.tableScan(
asRowType(ROW({"_last_updated_sequence_number"},
{BIGINT()})))
.planNode();
auto expected = makeRowVector({
makeFlatVector<int64_t>(
{std::optional<int64_t>{std::nullopt},
std::optional<int64_t>{10},
std::optional<int64_t>{std::nullopt},
std::optional<int64_t>{20},
std::optional<int64_t>{30}}),
});
AssertQueryBuilder(plan).splits({split}).assertResults(expected);
}
// Tests reading _row_id column from a data file. When the column exists in the
// data file, its values should be read as-is. The dataColumns (table schema)
// does NOT include _row_id since it's a hidden metadata column, but the
// file schema expansion in prepareSplit() adds it so the Parquet reader can
// read it from the file.
TEST_F(HiveIcebergTest, readRowIdColumn) {
folly::SingletonVault::singleton()->registrationComplete();
auto fileRowType = ROW({"c0", "_row_id"}, {BIGINT(), BIGINT()});
```
This edit assumes the following helpers and conventions already exist in the file, matching other tests (including `readLastUpdatedSequenceNumberMixed`):
1. `TempFilePath`, `writeToFile`, `HiveConnectorSplitBuilder`, `PlanBuilder`, `AssertQueryBuilder`, `makeRowVector`, `makeFlatVector`, `ROW`, `asRowType`, `BIGINT`, and `common::FileFormat::PARQUET` are available and used as shown in neighboring tests.
2. If `readLastUpdatedSequenceNumberMixed` uses a different schema (e.g. more columns, different names) or a different way of building the scan plan, mirror that structure in this new test so expectations align exactly.
3. If `_last_updated_sequence_number` is not directly projected in other tests (e.g. wrapped in a projection or filter), you may want to align this test’s `PlanBuilder` usage with whatever pattern is used in `readLastUpdatedSequenceNumberMixed` to be consistent and to exercise the same code path.
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
| } else if (seqNumChild->mayHaveNulls()) { | ||
| // Handle any vector encoding (flat, dictionary, etc.) by creating | ||
| // a new flat vector with null values replaced. | ||
| auto pool = connectorQueryCtx_->memoryPool(); | ||
| auto* simpleVec = seqNumChild->as<SimpleVector<int64_t>>(); | ||
| auto newFlat = | ||
| BaseVector::create<FlatVector<int64_t>>(BIGINT(), vectorSize, pool); | ||
| for (vector_size_t i = 0; i < vectorSize; ++i) { | ||
| if (simpleVec->isNullAt(i)) { | ||
| newFlat->set(i, *dataSequenceNumber_); | ||
| } else { | ||
| newFlat->set(i, simpleVec->valueAt(i)); | ||
| } | ||
| } | ||
| seqNumChild = newFlat; | ||
| } |
There was a problem hiding this comment.
suggestion (performance): Avoid always copying the vector when there are no actual nulls present
In the mayHaveNulls() branch we always allocate and populate a new FlatVector<int64_t> for _last_updated_sequence_number, which can be costly for wide vectors and high‑throughput workloads. Since mayHaveNulls() is conservative, consider first checking for actual nulls (e.g., via the nulls buffer / null count) and, if none are present, reuse the original vector instead of copying.
| } else if (seqNumChild->mayHaveNulls()) { | |
| // Handle any vector encoding (flat, dictionary, etc.) by creating | |
| // a new flat vector with null values replaced. | |
| auto pool = connectorQueryCtx_->memoryPool(); | |
| auto* simpleVec = seqNumChild->as<SimpleVector<int64_t>>(); | |
| auto newFlat = | |
| BaseVector::create<FlatVector<int64_t>>(BIGINT(), vectorSize, pool); | |
| for (vector_size_t i = 0; i < vectorSize; ++i) { | |
| if (simpleVec->isNullAt(i)) { | |
| newFlat->set(i, *dataSequenceNumber_); | |
| } else { | |
| newFlat->set(i, simpleVec->valueAt(i)); | |
| } | |
| } | |
| seqNumChild = newFlat; | |
| } | |
| } else if (seqNumChild->mayHaveNulls()) { | |
| // Fast-path: 'mayHaveNulls' is conservative; only rewrite when we | |
| // actually have nulls to replace. | |
| if (seqNumChild->hasNulls()) { | |
| // Handle any vector encoding (flat, dictionary, etc.) by creating | |
| // a new flat vector with null values replaced. | |
| auto pool = connectorQueryCtx_->memoryPool(); | |
| auto* simpleVec = seqNumChild->as<SimpleVector<int64_t>>(); | |
| auto newFlat = | |
| BaseVector::create<FlatVector<int64_t>>(BIGINT(), vectorSize, pool); | |
| for (vector_size_t i = 0; i < vectorSize; ++i) { | |
| if (simpleVec->isNullAt(i)) { | |
| newFlat->set(i, *dataSequenceNumber_); | |
| } else { | |
| newFlat->set(i, simpleVec->valueAt(i)); | |
| } | |
| } | |
| seqNumChild = newFlat; | |
| } | |
| } |
| } else if (rowIdChild->mayHaveNulls()) { | ||
| // Column is in the file but has some null values — replace nulls | ||
| // with first_row_id + _pos. Handle any vector encoding. | ||
| auto pool = connectorQueryCtx_->memoryPool(); | ||
| auto* simpleVec = rowIdChild->as<SimpleVector<int64_t>>(); | ||
| auto newFlat = | ||
| BaseVector::create<FlatVector<int64_t>>(BIGINT(), vectorSize, pool); | ||
| int64_t batchStartPos = splitOffset_ + baseReadOffset_; | ||
| if (mutation.deletedRows == nullptr) { | ||
| for (vector_size_t i = 0; i < vectorSize; ++i) { |
There was a problem hiding this comment.
suggestion (performance): Similar optimization opportunity for _row_id null replacement path
This mirrors the _last_updated_sequence_number path and will always allocate and fill a new FlatVector<int64_t> whenever mayHaveNulls() is true. When the column is nullable but the batch contains no actual nulls, this extra copy is unnecessary. As above, consider checking for the presence of any nulls first and skipping the copy if there are none.
Suggested implementation:
} else if (rowIdChild->mayHaveNulls()) {
// Column is in the file but has some null values — replace nulls
// with first_row_id + _pos. Handle any vector encoding.
auto* simpleVec = rowIdChild->as<SimpleVector<int64_t>>();
// Fast path: nullable type but no actual nulls and no deleted rows,
// so we can reuse the existing vector without copying.
bool hasNulls = false;
for (vector_size_t i = 0; i < vectorSize; ++i) {
if (simpleVec->isNullAt(i)) {
hasNulls = true;
break;
}
}
if (!hasNulls && mutation.deletedRows == nullptr) {
// No nulls to replace and no deleted rows to filter out.
// Keep rowIdChild as-is.
} else {
// We either have nulls to replace or deleted rows to filter, so
// materialize into a new flat vector.
auto pool = connectorQueryCtx_->memoryPool();
auto newFlat = BaseVector::create<FlatVector<int64_t>>(
BIGINT(), vectorSize, pool);
int64_t batchStartPos = splitOffset_ + baseReadOffset_;
if (mutation.deletedRows == nullptr) {
// Replace nulls in-place, preserve existing non-null values.
for (vector_size_t i = 0; i < vectorSize; ++i) {
if (simpleVec->isNullAt(i)) {
newFlat->set(
i,
static_cast<int64_t>(*firstRowId_) + batchStartPos + i);
} else {
newFlat->set(i, simpleVec->valueAt(i));
}
}
} else {
// We also need to account for deleted rows. Skip deleted rows
// while replacing nulls and preserving existing non-null values.
vector_size_t outputIdx = 0;
for (vector_size_t j = 0; j < vectorSize; ++j) {
if (!bits::isBitSet(mutation.deletedRows, j)) {
if (simpleVec->isNullAt(j)) {
newFlat->set(
outputIdx++,
static_cast<int64_t>(*firstRowId_) + batchStartPos + j);
} else {
newFlat->set(outputIdx++, simpleVec->valueAt(j));
}
}
}
}
rowIdChild = newFlat;
}
The replacement block assumes the intent of the original null-replacement path:
- Nulls are replaced with
first_row_id + splitOffset_ + baseReadOffset_ + position. - Non-null values are preserved.
- When
mutation.deletedRowsis non-null, deleted rows are skipped while applying the above logic.
If there is additional logic following the truncated if (simpleVec->isNullAt(i)) { in your original file (e.g., special encoding handling), re-integrate it inside the new if (mutation.deletedRows == nullptr) / else branches as appropriate, preserving the new early-exit optimization that skips allocation when there are no nulls and no deleted rows.
| auto outputIdx = readerOutputType_->getChildIdxIfExists(fieldName); | ||
| auto colType = outputIdx.has_value() | ||
| ? readerOutputType_->childAt(*outputIdx) | ||
| : tableSchema->findChild(fieldName); | ||
| childSpec->setConstantValue( | ||
| BaseVector::createNullConstant( | ||
| tableSchema->findChild(fieldName), | ||
| 1, | ||
| connectorQueryCtx_->memoryPool())); | ||
| colType, 1, connectorQueryCtx_->memoryPool())); |
There was a problem hiding this comment.
issue (bug_risk): Guard against null colType when constructing a null constant for missing columns
In this fallback branch, colType may be null if the field name is not found in either readerOutputType_ or tableSchema. Passing a null type into createNullConstant(colType, ...) would likely crash. Consider adding a VELOX_CHECK to enforce non-null here or handling the null case explicitly with a clear error path instead of failing implicitly.
| // Tests reading _last_updated_sequence_number with mixed values. Non-null | ||
| // values should be preserved, while null values should be replaced with the | ||
| // data sequence number. | ||
| TEST_F(HiveIcebergTest, readLastUpdatedSequenceNumberMixed) { |
There was a problem hiding this comment.
suggestion (testing): Add a test covering the case where _last_updated_sequence_number has nulls but $data_sequence_number is not provided, to confirm nulls stay null.
Since null replacement is gated on dataSequenceNumber_.has_value(), no backfill should occur when $data_sequence_number is absent and _last_updated_sequence_number contains nulls. Please add a test, modeled on readLastUpdatedSequenceNumberMixed, that omits $data_sequence_number from the split’s infoColumns and asserts that nulls remain null and non-null values are unchanged, to guard against regressions assuming $data_sequence_number is always present.
Suggested implementation:
// Tests reading _last_updated_sequence_number with mixed values. Non-null
// values should be preserved, while null values should be replaced with the
// data sequence number.
// Tests reading _last_updated_sequence_number with mixed values when
// $data_sequence_number is not provided in the split's infoColumns. In this
// case, null values should remain null and non-null values should be
// preserved as-is, with no backfill.
TEST_F(HiveIcebergTest, readLastUpdatedSequenceNumberMixedWithoutDataSequenceNumber) {
folly::SingletonVault::singleton()->registrationComplete();
// File schema contains a payload column and the _last_updated_sequence_number
// column with mixed null and non-null values.
auto fileRowType = ROW(
{"c0", "_last_updated_sequence_number"},
{BIGINT(), BIGINT()});
// Create a file with mixed null / non-null values in
// _last_updated_sequence_number.
auto file = makeRowVector(
{makeFlatVector<int64_t>({1, 2, 3, 4, 5}),
makeFlatVector<int64_t>(
{std::optional<int64_t>{std::nullopt},
std::optional<int64_t>{10},
std::optional<int64_t>{std::nullopt},
std::optional<int64_t>{20},
std::optional<int64_t>{30}})});
auto filePath = TempFilePath::create();
writeToFile(filePath->path, fileRowType, {file});
// Build a split that does NOT include $data_sequence_number in infoColumns.
// This ensures dataSequenceNumber_.has_value() is false, so no backfill
// into _last_updated_sequence_number should occur.
auto split = HiveConnectorSplitBuilder(filePath->path)
.fileFormat(common::FileFormat::PARQUET)
.tableType(IcebergTableType::kData)
.fileSchema(fileRowType)
// Intentionally no $data_sequence_number info column here.
.build();
// Project only _last_updated_sequence_number so we can assert its values are
// unchanged (nulls remain null, non-nulls are preserved).
auto plan = PlanBuilder()
.tableScan(
asRowType(ROW({"_last_updated_sequence_number"},
{BIGINT()})))
.planNode();
auto expected = makeRowVector({
makeFlatVector<int64_t>(
{std::optional<int64_t>{std::nullopt},
std::optional<int64_t>{10},
std::optional<int64_t>{std::nullopt},
std::optional<int64_t>{20},
std::optional<int64_t>{30}}),
});
AssertQueryBuilder(plan).splits({split}).assertResults(expected);
}
// Tests reading _row_id column from a data file. When the column exists in the
// data file, its values should be read as-is. The dataColumns (table schema)
// does NOT include _row_id since it's a hidden metadata column, but the
// file schema expansion in prepareSplit() adds it so the Parquet reader can
// read it from the file.
TEST_F(HiveIcebergTest, readRowIdColumn) {
folly::SingletonVault::singleton()->registrationComplete();
auto fileRowType = ROW({"c0", "_row_id"}, {BIGINT(), BIGINT()});
This edit assumes the following helpers and conventions already exist in the file, matching other tests (including readLastUpdatedSequenceNumberMixed):
TempFilePath,writeToFile,HiveConnectorSplitBuilder,PlanBuilder,AssertQueryBuilder,makeRowVector,makeFlatVector,ROW,asRowType,BIGINT, andcommon::FileFormat::PARQUETare available and used as shown in neighboring tests.- If
readLastUpdatedSequenceNumberMixeduses a different schema (e.g. more columns, different names) or a different way of building the scan plan, mirror that structure in this new test so expectations align exactly. - If
_last_updated_sequence_numberis not directly projected in other tests (e.g. wrapped in a projection or filter), you may want to align this test’sPlanBuilderusage with whatever pattern is used inreadLastUpdatedSequenceNumberMixedto be consistent and to exercise the same code path.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 4 out of 4 changed files in this pull request and generated 5 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| IcebergDeleteFile deleteFile( | ||
| FileContent::kPositionalDeletes, | ||
| deleteFilePath->getPath(), | ||
| fileFomat_, | ||
| 2, | ||
| testing::internal::GetFileSize( | ||
| std::fopen(deleteFilePath->getPath().c_str(), "r")), | ||
| {}, | ||
| {}, | ||
| {{posColumn->id, "3"}}); | ||
|
|
There was a problem hiding this comment.
In readRowIdColumnComputedWithDeletes, the positional delete file’s upperBounds entry is set to the literal string "3". IcebergSplitReader::prepareSplit() expects this value to be Base64-encoded 8-byte little-endian (and VELOX_CHECKs decoded size == 8), so this test will fail/crash when it tries to decode. Encode the bound the same way as skipDeleteFileByPositionUpperBound does, or omit upperBounds entirely for this test.
| // All null — compute _row_id = first_row_id + _pos for all rows. | ||
| auto pool = connectorQueryCtx_->memoryPool(); | ||
| auto flatVec = | ||
| BaseVector::create<FlatVector<int64_t>>(BIGINT(), vectorSize, pool); | ||
| int64_t batchStartPos = splitOffset_ + baseReadOffset_; | ||
| if (mutation.deletedRows == nullptr) { | ||
| for (vector_size_t i = 0; i < vectorSize; ++i) { | ||
| flatVec->set( | ||
| i, static_cast<int64_t>(*firstRowId_) + batchStartPos + i); | ||
| } | ||
| } else { | ||
| // With deletions: use the bitmap to find actual file positions | ||
| // of surviving rows. | ||
| vector_size_t outputIdx = 0; | ||
| for (vector_size_t j = 0; | ||
| j < static_cast<vector_size_t>(actualSize) && | ||
| outputIdx < vectorSize; | ||
| ++j) { | ||
| if (!bits::isBitSet(mutation.deletedRows, j)) { | ||
| flatVec->set( | ||
| outputIdx++, | ||
| static_cast<int64_t>(*firstRowId_) + batchStartPos + j); | ||
| } | ||
| } | ||
| } | ||
| rowIdChild = flatVec; | ||
| } else if (rowIdChild->mayHaveNulls()) { | ||
| // Column is in the file but has some null values — replace nulls | ||
| // with first_row_id + _pos. Handle any vector encoding. | ||
| auto pool = connectorQueryCtx_->memoryPool(); | ||
| auto* simpleVec = rowIdChild->as<SimpleVector<int64_t>>(); | ||
| auto newFlat = | ||
| BaseVector::create<FlatVector<int64_t>>(BIGINT(), vectorSize, pool); | ||
| int64_t batchStartPos = splitOffset_ + baseReadOffset_; | ||
| if (mutation.deletedRows == nullptr) { | ||
| for (vector_size_t i = 0; i < vectorSize; ++i) { | ||
| if (simpleVec->isNullAt(i)) { | ||
| newFlat->set( | ||
| i, static_cast<int64_t>(*firstRowId_) + batchStartPos + i); | ||
| } else { |
There was a problem hiding this comment.
_row_id computation assumes returned rows are in contiguous file positions (uses batchStartPos + i / + j). This breaks when the DWIO reader filters out rows (e.g. remainingFilter / row-group pruning): output index no longer matches _pos, and in the delete-bitmap branch j includes filtered rows that never appear in output. Consider deriving the file row number for each output row (e.g. via RowReaderOptions::setRowNumberColumnInfo and using that row-number vector) and computing _row_id = first_row_id + rowNumber instead of using output indices.
| if (modified) { | ||
| baseReaderOpts_.setFileSchema(ROW(std::move(names), std::move(types))); | ||
| } | ||
| } | ||
|
|
||
| createReader(); | ||
| if (emptySplit_) { | ||
| return; | ||
| } |
There was a problem hiding this comment.
prepareSplit receives fileReadOps, but the implementation calls createReader() without forwarding it. This means split-specific read options won’t be applied for Iceberg reads. Pass fileReadOps into createReader(fileReadOps) (and keep the row-lineage file-schema expansion before that call).
| // True if _last_updated_sequence_number is read from the data file (not set | ||
| // as a constant). Set in adaptColumns(). | ||
| bool readLastUpdatedSeqNumFromFile_{false}; | ||
|
|
||
| // The child index of _last_updated_sequence_number in readerOutputType_. | ||
| // Used to locate the column in the output for null-value replacement. | ||
| std::optional<column_index_t> lastUpdatedSeqNumOutputIndex_; | ||
|
|
There was a problem hiding this comment.
The comment says readLastUpdatedSeqNumFromFile_ is “Set in adaptColumns()”, but it’s actually determined in prepareSplit() after getAdaptedRowType() (based on whether the ScanSpec child is constant). Please update the comment (or move the flag assignment into adaptColumns() if that’s the intent) to avoid misleading future changes.
| // Read c0 and _row_id; _row_id should be first_row_id + _pos. | ||
| // dataColumns only includes c0 (the actual table schema), not _row_id | ||
| // (metadata column). This matches real-world behavior where metadata columns | ||
| // are not part of the table schema. | ||
| auto tableDataColumns = ROW({"c0"}, {BIGINT()}); | ||
| auto outputType = ROW({"c0", "_row_id"}, {BIGINT(), BIGINT()}); | ||
| auto plan = PlanBuilder() | ||
| .startTableScan() | ||
| .connectorId(kIcebergConnectorId) | ||
| .outputType(outputType) | ||
| .dataColumns(tableDataColumns) | ||
| .endTableScan() | ||
| .planNode(); | ||
|
|
||
| // _row_id = first_row_id + _pos = 100 + 0, 100 + 1, 100 + 2. | ||
| std::vector<RowVectorPtr> expected; | ||
| expected.push_back(makeRowVector( | ||
| outputType->names(), | ||
| { | ||
| makeFlatVector<int64_t>({1, 2, 3}), | ||
| makeFlatVector<int64_t>({100, 101, 102}), | ||
| })); | ||
|
|
||
| AssertQueryBuilder(plan).splits({split}).assertResults(expected); | ||
| } |
There was a problem hiding this comment.
Row-lineage tests cover basic _row_id computation and positional deletes, but they don’t exercise cases where the reader filters rows (e.g. remainingFilter / row-group pruning). Since _row_id is defined using file position (_pos), adding a test with a filter that skips rows would catch incorrect computations when output indices no longer match file positions.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 4 out of 4 changed files in this pull request and generated 8 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| auto fileRowType = ROW({"c0"}, {BIGINT()}); | ||
|
|
There was a problem hiding this comment.
fileRowType is declared but never used in this test, which can trigger -Wunused-variable warnings (and may fail builds if warnings are treated as errors). Please remove it or use it when constructing the row vectors.
| auto fileRowType = ROW({"c0"}, {BIGINT()}); |
| auto fileRowType = ROW({"c0"}, {BIGINT()}); | ||
|
|
There was a problem hiding this comment.
fileRowType is declared but never used in this test, which can trigger -Wunused-variable warnings (and may fail builds if warnings are treated as errors). Please remove it or use it when constructing the row vectors.
| auto fileRowType = ROW({"c0"}, {BIGINT()}); |
| auto fileRowType = ROW({"c0"}, {BIGINT()}); | ||
|
|
There was a problem hiding this comment.
fileRowType is declared but never used in this test, which can trigger -Wunused-variable warnings (and may fail builds if warnings are treated as errors). Please remove it or use it when constructing the row vectors.
| auto fileRowType = ROW({"c0"}, {BIGINT()}); |
| static std::shared_ptr<IcebergMetadataColumn> icebergRowIdColumn() { | ||
| return std::make_shared<IcebergMetadataColumn>( | ||
| 2147483540, | ||
| kRowIdColumnName, |
There was a problem hiding this comment.
The new metadata column IDs for _row_id and _last_updated_sequence_number are hard-coded magic numbers. For consistency with the existing delete-file metadata IDs above, consider introducing named static constexpr int32_t constants (and ideally a short comment pointing to the Iceberg spec section/source for these IDs). This makes it easier to validate and update these values later.
| int64_t batchStartPos = splitOffset_ + baseReadOffset_; | ||
| if (mutation.deletedRows == nullptr) { | ||
| for (vector_size_t i = 0; i < vectorSize; ++i) { | ||
| flatVec->set( | ||
| i, static_cast<int64_t>(*firstRowId_) + batchStartPos + i); | ||
| } |
There was a problem hiding this comment.
_row_id is computed using sequential output indices (e.g., batchStartPos + i / batchStartPos + j), but RowReader::next() can return a compacted output (e.g., dictionary-wrapped) after applying scan filters and deletions. In those cases, output row 0 may correspond to an arbitrary original row position within the scanned batch, so this computation can produce incorrect _row_id values. Consider deriving the per-output-row file position from the vector's dictionary indices (e.g., via DecodedVector) and computing first_row_id + (batchStartPos + originalRowOffset) for each output row, instead of assuming output index == file position.
| vector_size_t outputIdx = 0; | ||
| for (vector_size_t j = 0; | ||
| j < static_cast<vector_size_t>(actualSize) && | ||
| outputIdx < vectorSize; | ||
| ++j) { | ||
| if (!bits::isBitSet(mutation.deletedRows, j)) { | ||
| if (simpleVec->isNullAt(outputIdx)) { | ||
| newFlat->set( | ||
| outputIdx, | ||
| static_cast<int64_t>(*firstRowId_) + batchStartPos + j); | ||
| } else { | ||
| newFlat->set(outputIdx, simpleVec->valueAt(outputIdx)); | ||
| } | ||
| ++outputIdx; | ||
| } |
There was a problem hiding this comment.
The deleted-rows handling iterates j < actualSize when translating positions, but the deletion bitmap and the number of rows scanned for the batch are conceptually tied to what RowReader::next() actually scanned/returned. Using actualSize here can become inconsistent at end-of-split or if the reader scans fewer rows than requested, and it also doesn't account for additional scan filters that may compact the output. Prefer computing positions per output row via decoded indices (or at least bound by rowsScanned) rather than iterating up to actualSize.
| BufferPtr deleteBitmap_; | ||
|
|
||
| // True if _last_updated_sequence_number is read from the data file (not set | ||
| // as a constant). Set in adaptColumns(). |
There was a problem hiding this comment.
Comment says readLastUpdatedSeqNumFromFile_ is set in adaptColumns(), but in the current implementation it is initialized in prepareSplit() after getAdaptedRowType(). Please update the comment to match reality (or move the initialization if the intent was to keep this logic inside adaptColumns()).
| // as a constant). Set in adaptColumns(). | |
| // as a constant). Initialized in prepareSplit(). |
| testing::internal::GetFileSize( | ||
| std::fopen(deleteFilePath->getPath().c_str(), "r")), |
There was a problem hiding this comment.
This uses testing::internal::GetFileSize(std::fopen(...)) without closing the returned FILE*. This can leak a file descriptor in the test process. Prefer using the Velox filesystem APIs already used elsewhere in this test (e.g., openFileForRead(...)->size()) or ensure the FILE* is closed.
| testing::internal::GetFileSize( | |
| std::fopen(deleteFilePath->getPath().c_str(), "r")), | |
| filesystems::getFileSystem(deleteFilePath->getPath(), nullptr) | |
| ->openFileForRead(deleteFilePath->getPath()) | |
| ->size(), |
… row ID in IcebergSplitReader
Summary by Sourcery
Add support for Iceberg V3 row lineage metadata in the Hive Iceberg split reader and cover it with tests.
New Features:
Tests: