From 1a1bcd80f484076ff80ce35a6193eea22bfb4d6d Mon Sep 17 00:00:00 2001 From: Joe Abraham Date: Mon, 9 Mar 2026 16:46:36 +0530 Subject: [PATCH 1/7] feat: Add support for row lineage metadata columns in Iceberg V3 --- .../hive/iceberg/IcebergMetadataColumns.h | 21 + .../hive/iceberg/IcebergSplitReader.cpp | 231 ++++++++++- .../hive/iceberg/IcebergSplitReader.h | 33 ++ .../hive/iceberg/tests/IcebergReadTest.cpp | 381 ++++++++++++++++++ 4 files changed, 663 insertions(+), 3 deletions(-) diff --git a/velox/connectors/hive/iceberg/IcebergMetadataColumns.h b/velox/connectors/hive/iceberg/IcebergMetadataColumns.h index becd7d5de3c2..24b895c10ebf 100644 --- a/velox/connectors/hive/iceberg/IcebergMetadataColumns.h +++ b/velox/connectors/hive/iceberg/IcebergMetadataColumns.h @@ -55,6 +55,27 @@ struct IcebergMetadataColumn { BIGINT(), "Ordinal position of a deleted row in the data file"); } + + static constexpr const char* kRowIdColumnName = "_row_id"; + static constexpr const char* kLastUpdatedSequenceNumberColumnName = + "_last_updated_sequence_number"; + + static std::shared_ptr icebergRowIdColumn() { + return std::make_shared( + 2147483540, + kRowIdColumnName, + BIGINT(), + "Implicit row ID that is automatically assigned"); + } + + static std::shared_ptr + icebergLastUpdatedSequenceNumberColumn() { + return std::make_shared( + 2147483539, + kLastUpdatedSequenceNumberColumnName, + BIGINT(), + "Sequence number when the row was last updated"); + } }; } // namespace facebook::velox::connector::hive::iceberg diff --git a/velox/connectors/hive/iceberg/IcebergSplitReader.cpp b/velox/connectors/hive/iceberg/IcebergSplitReader.cpp index a8821a75204f..b6985a461e60 100644 --- a/velox/connectors/hive/iceberg/IcebergSplitReader.cpp +++ b/velox/connectors/hive/iceberg/IcebergSplitReader.cpp @@ -60,12 +60,86 @@ void IcebergSplitReader::prepareSplit( std::shared_ptr metadataFilter, dwio::common::RuntimeStatistics& runtimeStats, const folly::F14FastMap& fileReadOps) { + // Expand the file schema to include row lineage metadata columns + // (_row_id and _last_updated_sequence_number) if they're requested + // in the output. These are hidden metadata columns physically stored + // in Iceberg V3 data files but not listed in the table's logical + // schema (dataColumns). The Parquet reader needs them in the file + // schema to read them from the file. + auto fileSchema = baseReaderOpts_.fileSchema(); + if (fileSchema) { + auto names = fileSchema->names(); + auto types = fileSchema->children(); + bool modified = false; + for (const auto* colName : + {IcebergMetadataColumn::kRowIdColumnName, + IcebergMetadataColumn::kLastUpdatedSequenceNumberColumnName}) { + if (readerOutputType_->containsChild(colName) && + !fileSchema->containsChild(colName)) { + names.push_back(std::string(colName)); + types.push_back(BIGINT()); + modified = true; + } + } + if (modified) { + baseReaderOpts_.setFileSchema(ROW(std::move(names), std::move(types))); + } + } + createReader(); if (emptySplit_) { return; } + + // Initialize firstRowId_ BEFORE getAdaptedRowType() because adaptColumns() + // needs it to decide whether to set _row_id as a constant or leave it + // for next() to compute. + firstRowId_ = std::nullopt; + if (auto it = hiveSplit_->infoColumns.find("$first_row_id"); + it != hiveSplit_->infoColumns.end()) { + auto value = folly::to(it->second); + if (value >= 0) { + firstRowId_ = value; + } + } + auto rowType = getAdaptedRowType(); + // Initialize row lineage tracking for _last_updated_sequence_number. + dataSequenceNumber_ = std::nullopt; + if (auto it = hiveSplit_->infoColumns.find("$data_sequence_number"); + it != hiveSplit_->infoColumns.end()) { + dataSequenceNumber_ = folly::to(it->second); + } + readLastUpdatedSeqNumFromFile_ = false; + lastUpdatedSeqNumOutputIndex_ = std::nullopt; + if (dataSequenceNumber_.has_value()) { + auto* seqNumSpec = scanSpec_->childByName( + IcebergMetadataColumn::kLastUpdatedSequenceNumberColumnName); + if (seqNumSpec && !seqNumSpec->isConstant()) { + auto idx = readerOutputType_->getChildIdxIfExists( + IcebergMetadataColumn::kLastUpdatedSequenceNumberColumnName); + if (idx.has_value()) { + readLastUpdatedSeqNumFromFile_ = true; + lastUpdatedSeqNumOutputIndex_ = *idx; + } + } + } + + // Initialize row lineage tracking for _row_id. + // firstRowId_ was set above (before adaptColumns). Now check if _row_id + // needs to be computed in next(). + computeRowId_ = false; + rowIdOutputIndex_ = std::nullopt; + if (firstRowId_.has_value()) { + auto idx = readerOutputType_->getChildIdxIfExists( + IcebergMetadataColumn::kRowIdColumnName); + if (idx.has_value()) { + computeRowId_ = true; + rowIdOutputIndex_ = *idx; + } + } + if (checkIfSplitIsEmpty(runtimeStats)) { VELOX_CHECK(emptySplit_); return; @@ -160,12 +234,126 @@ uint64_t IcebergSplitReader::next(uint64_t size, VectorPtr& output) { auto rowsScanned = baseRowReader_->next(actualSize, output, &mutation); + // For Iceberg V3 row lineage: replace null values in + // _last_updated_sequence_number with the data sequence number from the + // file's manifest entry. Per the spec, null means the value should be + // inherited from the manifest entry's sequence number. + if (readLastUpdatedSeqNumFromFile_ && dataSequenceNumber_.has_value() && + lastUpdatedSeqNumOutputIndex_.has_value() && rowsScanned > 0) { + auto* rowOutput = output->as(); + if (rowOutput) { + auto& seqNumChild = rowOutput->childAt(*lastUpdatedSeqNumOutputIndex_); + // Load lazy vector if needed — the Parquet reader wraps columns in + // LazyVector, which must be loaded before checking encoding. + const auto& loadedSeqNum = BaseVector::loadedVectorShared(seqNumChild); + if (loadedSeqNum->isConstantEncoding()) { + auto* simpleVec = loadedSeqNum->as>(); + if (simpleVec && simpleVec->isNullAt(0)) { + seqNumChild = std::make_shared>( + connectorQueryCtx_->memoryPool(), + rowsScanned, + false, + BIGINT(), + static_cast(*dataSequenceNumber_)); + } + } else if (auto* flatVec = loadedSeqNum->asFlatVector()) { + for (vector_size_t i = 0; i < rowsScanned; ++i) { + if (flatVec->isNullAt(i)) { + flatVec->set(i, *dataSequenceNumber_); + } + } + } + } + } + + // For Iceberg V3 row lineage: compute _row_id = first_row_id + _pos. + // When the data file doesn't contain _row_id physically, compute it from + // the manifest entry's first_row_id plus the row's position in the file. + // When positional deletes are applied, we track actual file positions + // using the deletion bitmap. + if (computeRowId_ && firstRowId_.has_value() && + rowIdOutputIndex_.has_value() && rowsScanned > 0) { + auto* rowOutput = output->as(); + if (rowOutput) { + auto& rowIdChild = rowOutput->childAt(*rowIdOutputIndex_); + // Load lazy vector if needed — the Parquet reader wraps columns in + // LazyVector, which must be loaded before checking encoding. + const auto& loadedRowId = BaseVector::loadedVectorShared(rowIdChild); + if (loadedRowId->isConstantEncoding()) { + auto* simpleVec = loadedRowId->as>(); + if (simpleVec && simpleVec->isNullAt(0)) { + auto pool = connectorQueryCtx_->memoryPool(); + auto flatVec = BaseVector::create>( + BIGINT(), rowsScanned, pool); + int64_t batchStartPos = splitOffset_ + baseReadOffset_; + if (mutation.deletedRows == nullptr) { + // No deletions: positions are contiguous. + for (vector_size_t i = 0; i < rowsScanned; ++i) { + flatVec->set( + i, static_cast(*firstRowId_) + batchStartPos + i); + } + } else { + // With deletions: use the bitmap to find actual file positions + // of surviving rows. + vector_size_t outputIdx = 0; + for (vector_size_t j = 0; + j < static_cast(actualSize) && + outputIdx < rowsScanned; + ++j) { + if (!bits::isBitSet(mutation.deletedRows, j)) { + flatVec->set( + outputIdx++, + static_cast(*firstRowId_) + batchStartPos + j); + } + } + } + rowIdChild = flatVec; + } + } else if (auto* flatVec = loadedRowId->asFlatVector()) { + // Column is in the file but has some null values — replace nulls + // with first_row_id + _pos. + int64_t batchStartPos = splitOffset_ + baseReadOffset_; + if (mutation.deletedRows == nullptr) { + for (vector_size_t i = 0; i < rowsScanned; ++i) { + if (flatVec->isNullAt(i)) { + flatVec->set( + i, static_cast(*firstRowId_) + batchStartPos + i); + } + } + } else { + vector_size_t outputIdx = 0; + for (vector_size_t j = 0; + j < static_cast(actualSize) && + outputIdx < rowsScanned; + ++j) { + if (!bits::isBitSet(mutation.deletedRows, j)) { + if (flatVec->isNullAt(outputIdx)) { + flatVec->set( + outputIdx, + static_cast(*firstRowId_) + batchStartPos + j); + } + ++outputIdx; + } + } + } + } + } + } + return rowsScanned; } std::vector IcebergSplitReader::adaptColumns( const RowTypePtr& fileType, const RowTypePtr& tableSchema) const { + // Resolve the data sequence number from split info columns for + // _last_updated_sequence_number inheritance. + std::optional dataSeqNum; + if (auto it = hiveSplit_->infoColumns.find("$data_sequence_number"); + it != hiveSplit_->infoColumns.end()) { + dataSeqNum = folly::to(it->second); + } + std::vector columnTypes = fileType->children(); auto& childrenSpecs = scanSpec_->children(); // Iceberg table stores all column's data in data file. @@ -197,15 +385,52 @@ std::vector IcebergSplitReader::adaptColumns( // files, partition column values are stored in partition metadata // rather than in the data file itself, following Hive's partitioning // convention. + // 3. _last_updated_sequence_number: For Iceberg V3 row lineage, if + // the column is not in the file, inherit the data sequence number + // from the file's manifest entry. + // 4. _row_id: For Iceberg V3 row lineage, if the column is not in + // the file, set as NULL constant here. When first_row_id is + // available, next() will replace NULL with first_row_id + _pos. if (auto it = hiveSplit_->partitionKeys.find(fieldName); it != hiveSplit_->partitionKeys.end()) { setPartitionValue(childSpec.get(), fieldName, it->second); + } else if ( + fieldName == + IcebergMetadataColumn::kLastUpdatedSequenceNumberColumnName && + dataSeqNum.has_value()) { + childSpec->setConstantValue( + std::make_shared>( + connectorQueryCtx_->memoryPool(), + 1, + false, + BIGINT(), + static_cast(*dataSeqNum))); + } else if ( + fieldName == IcebergMetadataColumn::kRowIdColumnName && + firstRowId_.has_value()) { + // _row_id will be computed in next() as first_row_id + _pos. + // Set a NULL constant here as a placeholder; next() will replace + // it with computed values. + auto outputIdx = readerOutputType_->getChildIdxIfExists(fieldName); + auto colType = outputIdx.has_value() + ? readerOutputType_->childAt(*outputIdx) + : BIGINT(); + childSpec->setConstantValue( + BaseVector::createNullConstant( + colType, 1, connectorQueryCtx_->memoryPool())); } else { + // Column missing from both the file and partition keys. This + // can be a schema evolution column (in tableSchema) or a + // metadata column like _row_id (in readerOutputType_ but not + // in tableSchema). Try readerOutputType_ first, then fall + // back to tableSchema. + auto outputIdx = readerOutputType_->getChildIdxIfExists(fieldName); + auto colType = outputIdx.has_value() + ? readerOutputType_->childAt(*outputIdx) + : tableSchema->findChild(fieldName); childSpec->setConstantValue( BaseVector::createNullConstant( - tableSchema->findChild(fieldName), - 1, - connectorQueryCtx_->memoryPool())); + colType, 1, connectorQueryCtx_->memoryPool())); } } } diff --git a/velox/connectors/hive/iceberg/IcebergSplitReader.h b/velox/connectors/hive/iceberg/IcebergSplitReader.h index f727f5c86ff2..4e6e8956f02d 100644 --- a/velox/connectors/hive/iceberg/IcebergSplitReader.h +++ b/velox/connectors/hive/iceberg/IcebergSplitReader.h @@ -89,6 +89,17 @@ class IcebergSplitReader : public SplitReader { /// Column was added to the table schema after this data file was /// written. Set as NULL constant since the old file doesn't contain /// this column. + /// c) Row lineage (_last_updated_sequence_number): + /// For Iceberg V3 row lineage, if the column is not in the file, + /// inherit the data sequence number from the file's manifest entry + /// (provided via $data_sequence_number info column). Per the spec, + /// null values indicate the value should be inherited. + /// d) Row lineage (_row_id): + /// Per the spec, null _row_id values are assigned as + /// first_row_id + _pos. When first_row_id is available from + /// the split info column $first_row_id, the value is computed + /// in next(). When first_row_id is not available (e.g., + /// pre-V3 tables), NULL is returned. std::vector adaptColumns( const RowTypePtr& fileType, const RowTypePtr& tableSchema) const override; @@ -101,5 +112,27 @@ class IcebergSplitReader : public SplitReader { std::list> positionalDeleteFileReaders_; BufferPtr deleteBitmap_; + + // True if _last_updated_sequence_number is read from the data file (not set + // as a constant). Set in adaptColumns(). + bool readLastUpdatedSeqNumFromFile_{false}; + + // The child index of _last_updated_sequence_number in readerOutputType_. + // Used to locate the column in the output for null-value replacement. + std::optional lastUpdatedSeqNumOutputIndex_; + + // Data sequence number from the file's manifest entry, used to replace null + // values in _last_updated_sequence_number during reads. + std::optional dataSequenceNumber_; + + // First row ID from the manifest entry, used to compute _row_id. + // When available (>= 0), _row_id = first_row_id + _pos for rows not in file. + std::optional firstRowId_; + + // True if _row_id should be computed as first_row_id + _pos in next(). + bool computeRowId_{false}; + + // The child index of _row_id in readerOutputType_. + std::optional rowIdOutputIndex_; }; } // namespace facebook::velox::connector::hive::iceberg diff --git a/velox/connectors/hive/iceberg/tests/IcebergReadTest.cpp b/velox/connectors/hive/iceberg/tests/IcebergReadTest.cpp index 29acc03a2e10..59d29e622bc6 100644 --- a/velox/connectors/hive/iceberg/tests/IcebergReadTest.cpp +++ b/velox/connectors/hive/iceberg/tests/IcebergReadTest.cpp @@ -973,6 +973,387 @@ TEST_F(HiveIcebergTest, skipDeleteFileByPositionUpperBound) { assertQuery(plan, {split}, "SELECT * FROM tmp", 0); } +// Tests reading _row_id column from a data file. When the column exists in the +// data file, its values should be read as-is. The dataColumns (table schema) +// does NOT include _row_id since it's a hidden metadata column, but the +// file schema expansion in prepareSplit() adds it so the Parquet reader can +// read it from the file. +TEST_F(HiveIcebergTest, readRowIdColumn) { + folly::SingletonVault::singleton()->registrationComplete(); + + auto rowIdColumn = IcebergMetadataColumn::icebergRowIdColumn(); + auto fileRowType = ROW({"c0", "_row_id"}, {BIGINT(), BIGINT()}); + + // Write data file with c0 and _row_id columns. + std::vector dataVectors; + dataVectors.push_back(makeRowVector( + fileRowType->names(), + { + makeFlatVector({1, 2, 3, 4, 5}), + makeFlatVector({100, 101, 102, 103, 104}), + })); + auto dataFilePath = TempFilePath::create(); + writeToFile(dataFilePath->getPath(), dataVectors); + + auto icebergSplits = makeIcebergSplits(dataFilePath->getPath()); + + // dataColumns is the table schema, which does NOT include _row_id. + // _row_id is a hidden metadata column only in the output type. + auto tableDataColumns = ROW({"c0"}, {BIGINT()}); + auto outputType = ROW({"c0", "_row_id"}, {BIGINT(), BIGINT()}); + auto plan = PlanBuilder() + .startTableScan() + .connectorId(kIcebergConnectorId) + .outputType(outputType) + .dataColumns(tableDataColumns) + .endTableScan() + .planNode(); + + std::vector expected; + expected.push_back(makeRowVector( + outputType->names(), + { + makeFlatVector({1, 2, 3, 4, 5}), + makeFlatVector({100, 101, 102, 103, 104}), + })); + + AssertQueryBuilder(plan).splits(icebergSplits).assertResults(expected); +} + +// Tests reading _row_id column with schema evolution. When the data file does +// not contain the _row_id column (pre-V3 data) but first_row_id is available, +// _row_id should be computed as first_row_id + _pos. +TEST_F(HiveIcebergTest, readRowIdColumnMissing) { + folly::SingletonVault::singleton()->registrationComplete(); + + auto fileRowType = ROW({"c0"}, {BIGINT()}); + + // Write data file without _row_id column. + std::vector dataVectors; + dataVectors.push_back(makeRowVector({ + makeFlatVector({1, 2, 3}), + })); + auto dataFilePath = TempFilePath::create(); + writeToFile(dataFilePath->getPath(), dataVectors); + + auto file = filesystems::getFileSystem(dataFilePath->getPath(), nullptr) + ->openFileForRead(dataFilePath->getPath()); + // Provide $first_row_id = 100 via infoColumns. + std::unordered_map infoColumns = { + {"$first_row_id", "100"}}; + auto split = std::make_shared( + kIcebergConnectorId, + dataFilePath->getPath(), + fileFomat_, + 0, + file->size(), + std::unordered_map>{}, + std::nullopt, + std::unordered_map{}, + nullptr, + /*cacheable=*/true, + std::vector{}, + infoColumns); + + // Read c0 and _row_id; _row_id should be first_row_id + _pos. + // dataColumns only includes c0 (the actual table schema), not _row_id + // (metadata column). This matches real-world behavior where metadata columns + // are not part of the table schema. + auto tableDataColumns = ROW({"c0"}, {BIGINT()}); + auto outputType = ROW({"c0", "_row_id"}, {BIGINT(), BIGINT()}); + auto plan = PlanBuilder() + .startTableScan() + .connectorId(kIcebergConnectorId) + .outputType(outputType) + .dataColumns(tableDataColumns) + .endTableScan() + .planNode(); + + // _row_id = first_row_id + _pos = 100 + 0, 100 + 1, 100 + 2. + std::vector expected; + expected.push_back(makeRowVector( + outputType->names(), + { + makeFlatVector({1, 2, 3}), + makeFlatVector({100, 101, 102}), + })); + + AssertQueryBuilder(plan).splits({split}).assertResults(expected); +} + +// Tests _row_id = first_row_id + _pos computation with positional deletes. +// When some rows are deleted, _row_id values for surviving rows should still +// reflect their original file positions, not their output index. +TEST_F(HiveIcebergTest, readRowIdColumnComputedWithDeletes) { + folly::SingletonVault::singleton()->registrationComplete(); + + auto pathColumn = IcebergMetadataColumn::icebergDeleteFilePathColumn(); + auto posColumn = IcebergMetadataColumn::icebergDeletePosColumn(); + + auto fileRowType = ROW({"c0"}, {BIGINT()}); + + // Write data file with 5 rows. + std::vector dataVectors; + dataVectors.push_back(makeRowVector({ + makeFlatVector({10, 20, 30, 40, 50}), + })); + auto dataFilePath = TempFilePath::create(); + writeToFile(dataFilePath->getPath(), dataVectors); + + // Create a positional delete file deleting rows at positions 1 and 3. + auto deleteFilePath = TempFilePath::create(); + std::vector deleteVectors = {makeRowVector( + {pathColumn->name, posColumn->name}, + {makeFlatVector( + 2, [&](auto) { return dataFilePath->getPath(); }), + makeFlatVector({1, 3})})}; + writeToFile(deleteFilePath->getPath(), deleteVectors); + + IcebergDeleteFile deleteFile( + FileContent::kPositionalDeletes, + deleteFilePath->getPath(), + fileFomat_, + 2, + testing::internal::GetFileSize( + std::fopen(deleteFilePath->getPath().c_str(), "r")), + {}, + {}, + {{posColumn->id, "3"}}); + + auto file = filesystems::getFileSystem(dataFilePath->getPath(), nullptr) + ->openFileForRead(dataFilePath->getPath()); + std::unordered_map infoColumns = { + {"$first_row_id", "200"}}; + auto split = std::make_shared( + kIcebergConnectorId, + dataFilePath->getPath(), + fileFomat_, + 0, + file->size(), + std::unordered_map>{}, + std::nullopt, + std::unordered_map{}, + nullptr, + /*cacheable=*/true, + std::vector{deleteFile}, + infoColumns); + + auto tableDataColumns = ROW({"c0"}, {BIGINT()}); + auto outputType = ROW({"c0", "_row_id"}, {BIGINT(), BIGINT()}); + auto plan = PlanBuilder() + .startTableScan() + .connectorId(kIcebergConnectorId) + .outputType(outputType) + .dataColumns(tableDataColumns) + .endTableScan() + .planNode(); + + // After deletes, surviving rows are at positions 0, 2, 4. + // _row_id = first_row_id + _pos = 200+0, 200+2, 200+4. + std::vector expected; + expected.push_back(makeRowVector( + outputType->names(), + { + makeFlatVector({10, 30, 50}), + makeFlatVector({200, 202, 204}), + })); + + AssertQueryBuilder(plan).splits({split}).assertResults(expected); +} + +// Tests reading _last_updated_sequence_number from a data file with null +// values. Per the Iceberg V3 spec, null values should be replaced with the +// data sequence number from the file's manifest entry (provided via +// $data_sequence_number info column). +TEST_F(HiveIcebergTest, readLastUpdatedSequenceNumberAllNulls) { + folly::SingletonVault::singleton()->registrationComplete(); + + auto fileRowType = + ROW({"c0", "_last_updated_sequence_number"}, {BIGINT(), BIGINT()}); + + // Write data file with _last_updated_sequence_number all set to null. + std::vector dataVectors; + dataVectors.push_back(makeRowVector( + fileRowType->names(), + { + makeFlatVector({1, 2, 3, 4, 5}), + makeNullableFlatVector( + {std::nullopt, + std::nullopt, + std::nullopt, + std::nullopt, + std::nullopt}), + })); + auto dataFilePath = TempFilePath::create(); + writeToFile(dataFilePath->getPath(), dataVectors); + + // Create split with $data_sequence_number info column. + auto file = filesystems::getFileSystem(dataFilePath->getPath(), nullptr) + ->openFileForRead(dataFilePath->getPath()); + std::unordered_map infoColumns = { + {"$data_sequence_number", "42"}}; + auto split = std::make_shared( + kIcebergConnectorId, + dataFilePath->getPath(), + fileFomat_, + 0, + file->size(), + std::unordered_map>{}, + std::nullopt, + std::unordered_map{}, + nullptr, + /*cacheable=*/true, + std::vector{}, + infoColumns); + + // dataColumns is the table schema, which does NOT include + // _last_updated_sequence_number (hidden metadata column). + auto tableDataColumns = ROW({"c0"}, {BIGINT()}); + auto outputType = + ROW({"c0", "_last_updated_sequence_number"}, {BIGINT(), BIGINT()}); + auto plan = PlanBuilder() + .startTableScan() + .connectorId(kIcebergConnectorId) + .outputType(outputType) + .dataColumns(tableDataColumns) + .endTableScan() + .planNode(); + + // All null values should be replaced with 42 (the data sequence number). + std::vector expected; + expected.push_back(makeRowVector( + outputType->names(), + { + makeFlatVector({1, 2, 3, 4, 5}), + makeFlatVector({42, 42, 42, 42, 42}), + })); + + AssertQueryBuilder(plan).splits({split}).assertResults(expected); +} + +// Tests reading _last_updated_sequence_number with mixed values. Non-null +// values should be preserved, while null values should be replaced with the +// data sequence number. +TEST_F(HiveIcebergTest, readLastUpdatedSequenceNumberMixed) { + folly::SingletonVault::singleton()->registrationComplete(); + + auto fileRowType = + ROW({"c0", "_last_updated_sequence_number"}, {BIGINT(), BIGINT()}); + + // Write data file with mixed null and non-null values. + std::vector dataVectors; + dataVectors.push_back(makeRowVector( + fileRowType->names(), + { + makeFlatVector({1, 2, 3, 4, 5}), + makeNullableFlatVector( + {std::nullopt, 5, std::nullopt, 10, std::nullopt}), + })); + auto dataFilePath = TempFilePath::create(); + writeToFile(dataFilePath->getPath(), dataVectors); + + auto file = filesystems::getFileSystem(dataFilePath->getPath(), nullptr) + ->openFileForRead(dataFilePath->getPath()); + std::unordered_map infoColumns = { + {"$data_sequence_number", "42"}}; + auto split = std::make_shared( + kIcebergConnectorId, + dataFilePath->getPath(), + fileFomat_, + 0, + file->size(), + std::unordered_map>{}, + std::nullopt, + std::unordered_map{}, + nullptr, + /*cacheable=*/true, + std::vector{}, + infoColumns); + + // dataColumns is the table schema, which does NOT include + // _last_updated_sequence_number (hidden metadata column). + auto tableDataColumns = ROW({"c0"}, {BIGINT()}); + auto outputType = + ROW({"c0", "_last_updated_sequence_number"}, {BIGINT(), BIGINT()}); + auto plan = PlanBuilder() + .startTableScan() + .connectorId(kIcebergConnectorId) + .outputType(outputType) + .dataColumns(tableDataColumns) + .endTableScan() + .planNode(); + + // null values → 42, non-null values preserved. + std::vector expected; + expected.push_back(makeRowVector( + outputType->names(), + { + makeFlatVector({1, 2, 3, 4, 5}), + makeFlatVector({42, 5, 42, 10, 42}), + })); + + AssertQueryBuilder(plan).splits({split}).assertResults(expected); +} + +// Tests reading _last_updated_sequence_number when the column is not in the +// data file but $data_sequence_number is available in the split info columns. +// In this case, the value should be inherited as a constant. +TEST_F(HiveIcebergTest, readLastUpdatedSequenceNumberInherited) { + folly::SingletonVault::singleton()->registrationComplete(); + + auto fileRowType = ROW({"c0"}, {BIGINT()}); + + // Write data file without _last_updated_sequence_number column. + std::vector dataVectors; + dataVectors.push_back(makeRowVector({ + makeFlatVector({1, 2, 3}), + })); + auto dataFilePath = TempFilePath::create(); + writeToFile(dataFilePath->getPath(), dataVectors); + + auto file = filesystems::getFileSystem(dataFilePath->getPath(), nullptr) + ->openFileForRead(dataFilePath->getPath()); + std::unordered_map infoColumns = { + {"$data_sequence_number", "99"}}; + auto split = std::make_shared( + kIcebergConnectorId, + dataFilePath->getPath(), + fileFomat_, + 0, + file->size(), + std::unordered_map>{}, + std::nullopt, + std::unordered_map{}, + nullptr, + /*cacheable=*/true, + std::vector{}, + infoColumns); + + // dataColumns is the actual table schema, without metadata columns. + auto tableDataColumns = ROW({"c0"}, {BIGINT()}); + auto outputType = + ROW({"c0", "_last_updated_sequence_number"}, {BIGINT(), BIGINT()}); + auto plan = PlanBuilder() + .startTableScan() + .connectorId(kIcebergConnectorId) + .outputType(outputType) + .dataColumns(tableDataColumns) + .endTableScan() + .planNode(); + + // _last_updated_sequence_number should be inherited from data sequence + // number. + std::vector expected; + expected.push_back(makeRowVector( + outputType->names(), + { + makeFlatVector({1, 2, 3}), + makeFlatVector({99, 99, 99}), + })); + + AssertQueryBuilder(plan).splits({split}).assertResults(expected); +} + #ifdef VELOX_ENABLE_PARQUET TEST_F(HiveIcebergTest, positionalDeleteFileWithRowGroupFilter) { // This file contains three row groups, each with about 100 rows. From c0b4a99b0ba52fdfcbe45f448b0a209d5a3327da Mon Sep 17 00:00:00 2001 From: Joe Abraham Date: Mon, 9 Mar 2026 17:04:12 +0530 Subject: [PATCH 2/7] Remove `_row_id` metadata column handling and use `dataSequenceNumber_` member for `_last_updated_sequence_number`. --- .../hive/iceberg/IcebergSplitReader.cpp | 25 ++----------------- 1 file changed, 2 insertions(+), 23 deletions(-) diff --git a/velox/connectors/hive/iceberg/IcebergSplitReader.cpp b/velox/connectors/hive/iceberg/IcebergSplitReader.cpp index b6985a461e60..9ddd49b67876 100644 --- a/velox/connectors/hive/iceberg/IcebergSplitReader.cpp +++ b/velox/connectors/hive/iceberg/IcebergSplitReader.cpp @@ -346,14 +346,6 @@ uint64_t IcebergSplitReader::next(uint64_t size, VectorPtr& output) { std::vector IcebergSplitReader::adaptColumns( const RowTypePtr& fileType, const RowTypePtr& tableSchema) const { - // Resolve the data sequence number from split info columns for - // _last_updated_sequence_number inheritance. - std::optional dataSeqNum; - if (auto it = hiveSplit_->infoColumns.find("$data_sequence_number"); - it != hiveSplit_->infoColumns.end()) { - dataSeqNum = folly::to(it->second); - } - std::vector columnTypes = fileType->children(); auto& childrenSpecs = scanSpec_->children(); // Iceberg table stores all column's data in data file. @@ -397,27 +389,14 @@ std::vector IcebergSplitReader::adaptColumns( } else if ( fieldName == IcebergMetadataColumn::kLastUpdatedSequenceNumberColumnName && - dataSeqNum.has_value()) { + dataSequenceNumber_.has_value()) { childSpec->setConstantValue( std::make_shared>( connectorQueryCtx_->memoryPool(), 1, false, BIGINT(), - static_cast(*dataSeqNum))); - } else if ( - fieldName == IcebergMetadataColumn::kRowIdColumnName && - firstRowId_.has_value()) { - // _row_id will be computed in next() as first_row_id + _pos. - // Set a NULL constant here as a placeholder; next() will replace - // it with computed values. - auto outputIdx = readerOutputType_->getChildIdxIfExists(fieldName); - auto colType = outputIdx.has_value() - ? readerOutputType_->childAt(*outputIdx) - : BIGINT(); - childSpec->setConstantValue( - BaseVector::createNullConstant( - colType, 1, connectorQueryCtx_->memoryPool())); + static_cast(*dataSequenceNumber_))); } else { // Column missing from both the file and partition keys. This // can be a schema evolution column (in tableSchema) or a From 8b5358484debf56ffd0cf7904a168e1661de6f31 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Thu, 26 Feb 2026 06:18:56 +0000 Subject: [PATCH 3/7] Initial plan From e110255e48385be6558d2c02d6e31bc1a7f7f1e2 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Thu, 26 Feb 2026 06:18:56 +0000 Subject: [PATCH 4/7] Initial plan From bda69ff1f8d112bd125855f469155bea6f4a442c Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Tue, 10 Mar 2026 05:53:14 +0000 Subject: [PATCH 5/7] Fix dataSequenceNumber_ ordering and make null replacement encoding-agnostic Two key fixes: 1. Move dataSequenceNumber_ initialization before getAdaptedRowType() so adaptColumns() can use it when _last_updated_sequence_number is missing from the data file (ordering bug caused it to always be std::nullopt during adaptColumns). 2. Replace encoding-specific null replacement (asFlatVector only) with encoding-agnostic approach using SimpleVector::valueAt() and creating a new FlatVector. This handles FlatVector, DictionaryVector, and any other SimpleVector subclass returned by the Parquet reader after lazy loading. Also use seqNumChild = loadedVectorShared(seqNumChild) to replace the LazyVector wrapper in the RowVector's child directly. Co-authored-by: Joe-Abraham <53977252+Joe-Abraham@users.noreply.github.com> --- .../hive/iceberg/IcebergSplitReader.cpp | 131 ++++++++++-------- .../hive/iceberg/tests/IcebergReadTest.cpp | 1 - 2 files changed, 74 insertions(+), 58 deletions(-) diff --git a/velox/connectors/hive/iceberg/IcebergSplitReader.cpp b/velox/connectors/hive/iceberg/IcebergSplitReader.cpp index 9ddd49b67876..4a2151e7d047 100644 --- a/velox/connectors/hive/iceberg/IcebergSplitReader.cpp +++ b/velox/connectors/hive/iceberg/IcebergSplitReader.cpp @@ -91,9 +91,8 @@ void IcebergSplitReader::prepareSplit( return; } - // Initialize firstRowId_ BEFORE getAdaptedRowType() because adaptColumns() - // needs it to decide whether to set _row_id as a constant or leave it - // for next() to compute. + // Initialize row lineage fields BEFORE getAdaptedRowType() because + // adaptColumns() needs them to decide how to handle missing columns. firstRowId_ = std::nullopt; if (auto it = hiveSplit_->infoColumns.find("$first_row_id"); it != hiveSplit_->infoColumns.end()) { @@ -103,14 +102,18 @@ void IcebergSplitReader::prepareSplit( } } - auto rowType = getAdaptedRowType(); - - // Initialize row lineage tracking for _last_updated_sequence_number. dataSequenceNumber_ = std::nullopt; if (auto it = hiveSplit_->infoColumns.find("$data_sequence_number"); it != hiveSplit_->infoColumns.end()) { dataSequenceNumber_ = folly::to(it->second); } + + auto rowType = getAdaptedRowType(); + + // After adaptColumns(), check if row lineage columns need null replacement + // in next(). If adaptColumns() set them as constants (column missing from + // file), no replacement is needed. If the column is read from the file + // (not constant), null values must be replaced per spec. readLastUpdatedSeqNumFromFile_ = false; lastUpdatedSeqNumOutputIndex_ = std::nullopt; if (dataSequenceNumber_.has_value()) { @@ -126,9 +129,6 @@ void IcebergSplitReader::prepareSplit( } } - // Initialize row lineage tracking for _row_id. - // firstRowId_ was set above (before adaptColumns). Now check if _row_id - // needs to be computed in next(). computeRowId_ = false; rowIdOutputIndex_ = std::nullopt; if (firstRowId_.has_value()) { @@ -243,25 +243,35 @@ uint64_t IcebergSplitReader::next(uint64_t size, VectorPtr& output) { auto* rowOutput = output->as(); if (rowOutput) { auto& seqNumChild = rowOutput->childAt(*lastUpdatedSeqNumOutputIndex_); - // Load lazy vector if needed — the Parquet reader wraps columns in - // LazyVector, which must be loaded before checking encoding. - const auto& loadedSeqNum = BaseVector::loadedVectorShared(seqNumChild); - if (loadedSeqNum->isConstantEncoding()) { - auto* simpleVec = loadedSeqNum->as>(); - if (simpleVec && simpleVec->isNullAt(0)) { + // Load lazy vector and replace the child reference so we can access + // the actual data. The Parquet reader wraps columns in LazyVector. + seqNumChild = BaseVector::loadedVectorShared(seqNumChild); + auto vectorSize = seqNumChild->size(); + + if (seqNumChild->isConstantEncoding()) { + if (seqNumChild->isNullAt(0)) { seqNumChild = std::make_shared>( connectorQueryCtx_->memoryPool(), - rowsScanned, + vectorSize, false, BIGINT(), static_cast(*dataSequenceNumber_)); } - } else if (auto* flatVec = loadedSeqNum->asFlatVector()) { - for (vector_size_t i = 0; i < rowsScanned; ++i) { - if (flatVec->isNullAt(i)) { - flatVec->set(i, *dataSequenceNumber_); + } else if (seqNumChild->mayHaveNulls()) { + // Handle any vector encoding (flat, dictionary, etc.) by creating + // a new flat vector with null values replaced. + auto pool = connectorQueryCtx_->memoryPool(); + auto* simpleVec = seqNumChild->as>(); + auto newFlat = BaseVector::create>( + BIGINT(), vectorSize, pool); + for (vector_size_t i = 0; i < vectorSize; ++i) { + if (simpleVec->isNullAt(i)) { + newFlat->set(i, *dataSequenceNumber_); + } else { + newFlat->set(i, simpleVec->valueAt(i)); } } + seqNumChild = newFlat; } } } @@ -276,66 +286,73 @@ uint64_t IcebergSplitReader::next(uint64_t size, VectorPtr& output) { auto* rowOutput = output->as(); if (rowOutput) { auto& rowIdChild = rowOutput->childAt(*rowIdOutputIndex_); - // Load lazy vector if needed — the Parquet reader wraps columns in - // LazyVector, which must be loaded before checking encoding. - const auto& loadedRowId = BaseVector::loadedVectorShared(rowIdChild); - if (loadedRowId->isConstantEncoding()) { - auto* simpleVec = loadedRowId->as>(); - if (simpleVec && simpleVec->isNullAt(0)) { - auto pool = connectorQueryCtx_->memoryPool(); - auto flatVec = BaseVector::create>( - BIGINT(), rowsScanned, pool); - int64_t batchStartPos = splitOffset_ + baseReadOffset_; - if (mutation.deletedRows == nullptr) { - // No deletions: positions are contiguous. - for (vector_size_t i = 0; i < rowsScanned; ++i) { + // Load lazy vector and replace the child reference. + rowIdChild = BaseVector::loadedVectorShared(rowIdChild); + auto vectorSize = rowIdChild->size(); + + if (rowIdChild->isConstantEncoding() && rowIdChild->isNullAt(0)) { + // All null — compute _row_id = first_row_id + _pos for all rows. + auto pool = connectorQueryCtx_->memoryPool(); + auto flatVec = BaseVector::create>( + BIGINT(), vectorSize, pool); + int64_t batchStartPos = splitOffset_ + baseReadOffset_; + if (mutation.deletedRows == nullptr) { + for (vector_size_t i = 0; i < vectorSize; ++i) { + flatVec->set( + i, static_cast(*firstRowId_) + batchStartPos + i); + } + } else { + // With deletions: use the bitmap to find actual file positions + // of surviving rows. + vector_size_t outputIdx = 0; + for (vector_size_t j = 0; + j < static_cast(actualSize) && + outputIdx < vectorSize; + ++j) { + if (!bits::isBitSet(mutation.deletedRows, j)) { flatVec->set( - i, static_cast(*firstRowId_) + batchStartPos + i); - } - } else { - // With deletions: use the bitmap to find actual file positions - // of surviving rows. - vector_size_t outputIdx = 0; - for (vector_size_t j = 0; - j < static_cast(actualSize) && - outputIdx < rowsScanned; - ++j) { - if (!bits::isBitSet(mutation.deletedRows, j)) { - flatVec->set( - outputIdx++, - static_cast(*firstRowId_) + batchStartPos + j); - } + outputIdx++, + static_cast(*firstRowId_) + batchStartPos + j); } } - rowIdChild = flatVec; } - } else if (auto* flatVec = loadedRowId->asFlatVector()) { + rowIdChild = flatVec; + } else if (rowIdChild->mayHaveNulls()) { // Column is in the file but has some null values — replace nulls - // with first_row_id + _pos. + // with first_row_id + _pos. Handle any vector encoding. + auto pool = connectorQueryCtx_->memoryPool(); + auto* simpleVec = rowIdChild->as>(); + auto newFlat = BaseVector::create>( + BIGINT(), vectorSize, pool); int64_t batchStartPos = splitOffset_ + baseReadOffset_; if (mutation.deletedRows == nullptr) { - for (vector_size_t i = 0; i < rowsScanned; ++i) { - if (flatVec->isNullAt(i)) { - flatVec->set( + for (vector_size_t i = 0; i < vectorSize; ++i) { + if (simpleVec->isNullAt(i)) { + newFlat->set( i, static_cast(*firstRowId_) + batchStartPos + i); + } else { + newFlat->set(i, simpleVec->valueAt(i)); } } } else { vector_size_t outputIdx = 0; for (vector_size_t j = 0; j < static_cast(actualSize) && - outputIdx < rowsScanned; + outputIdx < vectorSize; ++j) { if (!bits::isBitSet(mutation.deletedRows, j)) { - if (flatVec->isNullAt(outputIdx)) { - flatVec->set( + if (simpleVec->isNullAt(outputIdx)) { + newFlat->set( outputIdx, static_cast(*firstRowId_) + batchStartPos + j); + } else { + newFlat->set(outputIdx, simpleVec->valueAt(outputIdx)); } ++outputIdx; } } } + rowIdChild = newFlat; } } } diff --git a/velox/connectors/hive/iceberg/tests/IcebergReadTest.cpp b/velox/connectors/hive/iceberg/tests/IcebergReadTest.cpp index 59d29e622bc6..6666763e79fa 100644 --- a/velox/connectors/hive/iceberg/tests/IcebergReadTest.cpp +++ b/velox/connectors/hive/iceberg/tests/IcebergReadTest.cpp @@ -981,7 +981,6 @@ TEST_F(HiveIcebergTest, skipDeleteFileByPositionUpperBound) { TEST_F(HiveIcebergTest, readRowIdColumn) { folly::SingletonVault::singleton()->registrationComplete(); - auto rowIdColumn = IcebergMetadataColumn::icebergRowIdColumn(); auto fileRowType = ROW({"c0", "_row_id"}, {BIGINT(), BIGINT()}); // Write data file with c0 and _row_id columns. From 8745950d0e9eb829b7adf7dee5593c463174b0c5 Mon Sep 17 00:00:00 2001 From: Joe Abraham Date: Wed, 11 Mar 2026 12:43:32 +0530 Subject: [PATCH 6/7] refactor: Replace manual null handling with DecodedVector for row ID and last updated sequence number --- .../hive/iceberg/IcebergMetadataColumns.h | 18 ++-- .../hive/iceberg/IcebergSplitReader.cpp | 97 ++++++++++--------- .../hive/iceberg/tests/IcebergReadTest.cpp | 40 ++++---- 3 files changed, 81 insertions(+), 74 deletions(-) diff --git a/velox/connectors/hive/iceberg/IcebergMetadataColumns.h b/velox/connectors/hive/iceberg/IcebergMetadataColumns.h index 24b895c10ebf..043c466a88e0 100644 --- a/velox/connectors/hive/iceberg/IcebergMetadataColumns.h +++ b/velox/connectors/hive/iceberg/IcebergMetadataColumns.h @@ -28,10 +28,16 @@ struct IcebergMetadataColumn { std::shared_ptr type; std::string doc; - // Position delete file's metadata column ID, see - // https://iceberg.apache.org/spec/#position-delete-files. + // Reserved Field IDs for Iceberg tables, refer + // https://iceberg.apache.org/spec/#reserved-field-ids static constexpr int32_t kPosId = 2'147'483'545; static constexpr int32_t kFilePathId = 2'147'483'546; + static constexpr int32_t kRowId = 2'147'483'540; + static constexpr int32_t kLastUpdatedSequenceNumber = 2'147'483'539; + + static constexpr const char* kRowIdColumnName = "_row_id"; + static constexpr const char* kLastUpdatedSequenceNumberColumnName = + "_last_updated_sequence_number"; IcebergMetadataColumn( int _id, @@ -56,13 +62,9 @@ struct IcebergMetadataColumn { "Ordinal position of a deleted row in the data file"); } - static constexpr const char* kRowIdColumnName = "_row_id"; - static constexpr const char* kLastUpdatedSequenceNumberColumnName = - "_last_updated_sequence_number"; - static std::shared_ptr icebergRowIdColumn() { return std::make_shared( - 2147483540, + kRowId, kRowIdColumnName, BIGINT(), "Implicit row ID that is automatically assigned"); @@ -71,7 +73,7 @@ struct IcebergMetadataColumn { static std::shared_ptr icebergLastUpdatedSequenceNumberColumn() { return std::make_shared( - 2147483539, + kLastUpdatedSequenceNumber, kLastUpdatedSequenceNumberColumnName, BIGINT(), "Sequence number when the row was last updated"); diff --git a/velox/connectors/hive/iceberg/IcebergSplitReader.cpp b/velox/connectors/hive/iceberg/IcebergSplitReader.cpp index 4a2151e7d047..e6323031b82b 100644 --- a/velox/connectors/hive/iceberg/IcebergSplitReader.cpp +++ b/velox/connectors/hive/iceberg/IcebergSplitReader.cpp @@ -23,6 +23,7 @@ #include "velox/connectors/hive/iceberg/IcebergMetadataColumns.h" #include "velox/connectors/hive/iceberg/IcebergSplit.h" #include "velox/dwio/common/BufferUtil.h" +#include "velox/vector/DecodedVector.h" using namespace facebook::velox::dwio::common; @@ -243,8 +244,7 @@ uint64_t IcebergSplitReader::next(uint64_t size, VectorPtr& output) { auto* rowOutput = output->as(); if (rowOutput) { auto& seqNumChild = rowOutput->childAt(*lastUpdatedSeqNumOutputIndex_); - // Load lazy vector and replace the child reference so we can access - // the actual data. The Parquet reader wraps columns in LazyVector. + // Load lazy vector - the Parquet reader wraps columns in LazyVector. seqNumChild = BaseVector::loadedVectorShared(seqNumChild); auto vectorSize = seqNumChild->size(); @@ -258,20 +258,21 @@ uint64_t IcebergSplitReader::next(uint64_t size, VectorPtr& output) { static_cast(*dataSequenceNumber_)); } } else if (seqNumChild->mayHaveNulls()) { - // Handle any vector encoding (flat, dictionary, etc.) by creating - // a new flat vector with null values replaced. - auto pool = connectorQueryCtx_->memoryPool(); - auto* simpleVec = seqNumChild->as>(); - auto newFlat = BaseVector::create>( - BIGINT(), vectorSize, pool); - for (vector_size_t i = 0; i < vectorSize; ++i) { - if (simpleVec->isNullAt(i)) { - newFlat->set(i, *dataSequenceNumber_); - } else { - newFlat->set(i, simpleVec->valueAt(i)); + // Use DecodedVector to handle any encoding (flat, dictionary, etc.). + DecodedVector decoded(*seqNumChild); + if (decoded.mayHaveNulls()) { + auto pool = connectorQueryCtx_->memoryPool(); + auto newFlat = BaseVector::create>( + BIGINT(), vectorSize, pool); + for (vector_size_t i = 0; i < vectorSize; ++i) { + if (decoded.isNullAt(i)) { + newFlat->set(i, *dataSequenceNumber_); + } else { + newFlat->set(i, decoded.valueAt(i)); + } } + seqNumChild = newFlat; } - seqNumChild = newFlat; } } } @@ -286,15 +287,15 @@ uint64_t IcebergSplitReader::next(uint64_t size, VectorPtr& output) { auto* rowOutput = output->as(); if (rowOutput) { auto& rowIdChild = rowOutput->childAt(*rowIdOutputIndex_); - // Load lazy vector and replace the child reference. + // Load lazy vector - the Parquet reader wraps columns in LazyVector. rowIdChild = BaseVector::loadedVectorShared(rowIdChild); auto vectorSize = rowIdChild->size(); if (rowIdChild->isConstantEncoding() && rowIdChild->isNullAt(0)) { - // All null — compute _row_id = first_row_id + _pos for all rows. + // All null - compute _row_id = first_row_id + _pos for all rows. auto pool = connectorQueryCtx_->memoryPool(); - auto flatVec = BaseVector::create>( - BIGINT(), vectorSize, pool); + auto flatVec = + BaseVector::create>(BIGINT(), vectorSize, pool); int64_t batchStartPos = splitOffset_ + baseReadOffset_; if (mutation.deletedRows == nullptr) { for (vector_size_t i = 0; i < vectorSize; ++i) { @@ -318,41 +319,43 @@ uint64_t IcebergSplitReader::next(uint64_t size, VectorPtr& output) { } rowIdChild = flatVec; } else if (rowIdChild->mayHaveNulls()) { - // Column is in the file but has some null values — replace nulls - // with first_row_id + _pos. Handle any vector encoding. - auto pool = connectorQueryCtx_->memoryPool(); - auto* simpleVec = rowIdChild->as>(); - auto newFlat = BaseVector::create>( - BIGINT(), vectorSize, pool); - int64_t batchStartPos = splitOffset_ + baseReadOffset_; - if (mutation.deletedRows == nullptr) { - for (vector_size_t i = 0; i < vectorSize; ++i) { - if (simpleVec->isNullAt(i)) { - newFlat->set( - i, static_cast(*firstRowId_) + batchStartPos + i); - } else { - newFlat->set(i, simpleVec->valueAt(i)); - } - } - } else { - vector_size_t outputIdx = 0; - for (vector_size_t j = 0; - j < static_cast(actualSize) && - outputIdx < vectorSize; - ++j) { - if (!bits::isBitSet(mutation.deletedRows, j)) { - if (simpleVec->isNullAt(outputIdx)) { + // Column is in the file but has some null values - replace nulls + // with first_row_id + _pos. Use DecodedVector for encoding support. + DecodedVector decoded(*rowIdChild); + if (decoded.mayHaveNulls()) { + auto pool = connectorQueryCtx_->memoryPool(); + auto newFlat = BaseVector::create>( + BIGINT(), vectorSize, pool); + int64_t batchStartPos = splitOffset_ + baseReadOffset_; + if (mutation.deletedRows == nullptr) { + for (vector_size_t i = 0; i < vectorSize; ++i) { + if (decoded.isNullAt(i)) { newFlat->set( - outputIdx, - static_cast(*firstRowId_) + batchStartPos + j); + i, static_cast(*firstRowId_) + batchStartPos + i); } else { - newFlat->set(outputIdx, simpleVec->valueAt(outputIdx)); + newFlat->set(i, decoded.valueAt(i)); + } + } + } else { + vector_size_t outputIdx = 0; + for (vector_size_t j = 0; + j < static_cast(actualSize) && + outputIdx < vectorSize; + ++j) { + if (!bits::isBitSet(mutation.deletedRows, j)) { + if (decoded.isNullAt(outputIdx)) { + newFlat->set( + outputIdx, + static_cast(*firstRowId_) + batchStartPos + j); + } else { + newFlat->set(outputIdx, decoded.valueAt(outputIdx)); + } + ++outputIdx; } - ++outputIdx; } } + rowIdChild = newFlat; } - rowIdChild = newFlat; } } } diff --git a/velox/connectors/hive/iceberg/tests/IcebergReadTest.cpp b/velox/connectors/hive/iceberg/tests/IcebergReadTest.cpp index 6666763e79fa..15241e9a8ccf 100644 --- a/velox/connectors/hive/iceberg/tests/IcebergReadTest.cpp +++ b/velox/connectors/hive/iceberg/tests/IcebergReadTest.cpp @@ -239,7 +239,7 @@ class HiveIcebergTest : public HiveConnectorTestBase { IcebergDeleteFile icebergDeleteFile( FileContent::kPositionalDeletes, deleteFilePath, - fileFomat_, + fileFormat_, deleteFilePaths[deleteFileName].first, testing::internal::GetFileSize( std::fopen(deleteFilePath.c_str(), "r"))); @@ -275,6 +275,8 @@ class HiveIcebergTest : public HiveConnectorTestBase { std::shared_ptr config_; std::function()> flushPolicyFactory_; + FileFormat fileFormat_{FileFormat::DWRF}; + std::vector> makeIcebergSplits( const std::string& dataFilePath, const std::vector& deleteFiles = {}, @@ -294,7 +296,7 @@ class HiveIcebergTest : public HiveConnectorTestBase { std::make_shared( kIcebergConnectorId, dataFilePath, - fileFomat_, + fileFormat_, i * splitSize, splitSize, partitionKeys, @@ -352,7 +354,7 @@ class HiveIcebergTest : public HiveConnectorTestBase { IcebergDeleteFile icebergDeleteFile( FileContent::kPositionalDeletes, deleteFilePath->getPath(), - fileFomat_, + fileFormat_, deletedPositionSize, testing::internal::GetFileSize( std::fopen(deleteFilePath->getPath().c_str(), "r"))); @@ -364,7 +366,7 @@ class HiveIcebergTest : public HiveConnectorTestBase { return {std::make_shared( kIcebergConnectorId, path, - dwio::common::FileFormat::PARQUET, + FileFormat::PARQUET, 0, fileSize, partitionKeys, @@ -578,8 +580,6 @@ class HiveIcebergTest : public HiveConnectorTestBase { return deletePositionVector; } - dwio::common::FileFormat fileFomat_{dwio::common::FileFormat::DWRF}; - std::shared_ptr pathColumn_ = IcebergMetadataColumn::icebergDeleteFilePathColumn(); @@ -932,7 +932,7 @@ TEST_F(HiveIcebergTest, skipDeleteFileByPositionUpperBound) { IcebergDeleteFile deleteFile( FileContent::kPositionalDeletes, deleteFilePath->getPath(), - dwio::common::FileFormat::DWRF, + FileFormat::DWRF, 3, testing::internal::GetFileSize( std::fopen(deleteFilePath->getPath().c_str(), "r")), @@ -957,7 +957,7 @@ TEST_F(HiveIcebergTest, skipDeleteFileByPositionUpperBound) { auto split = std::make_shared( kIcebergConnectorId, dataFilePath->getPath(), - dwio::common::FileFormat::DWRF, + FileFormat::DWRF, static_cast(fileSize / 2), static_cast(fileSize / 2), std::unordered_map>{}, @@ -1043,7 +1043,7 @@ TEST_F(HiveIcebergTest, readRowIdColumnMissing) { auto split = std::make_shared( kIcebergConnectorId, dataFilePath->getPath(), - fileFomat_, + fileFormat_, 0, file->size(), std::unordered_map>{}, @@ -1089,8 +1089,6 @@ TEST_F(HiveIcebergTest, readRowIdColumnComputedWithDeletes) { auto pathColumn = IcebergMetadataColumn::icebergDeleteFilePathColumn(); auto posColumn = IcebergMetadataColumn::icebergDeletePosColumn(); - auto fileRowType = ROW({"c0"}, {BIGINT()}); - // Write data file with 5 rows. std::vector dataVectors; dataVectors.push_back(makeRowVector({ @@ -1108,16 +1106,22 @@ TEST_F(HiveIcebergTest, readRowIdColumnComputedWithDeletes) { makeFlatVector({1, 3})})}; writeToFile(deleteFilePath->getPath(), deleteVectors); + uint64_t upperBound = 3; + auto upperBoundLE = folly::Endian::little(upperBound); + auto encodedUpperBound = encoding::Base64::encode( + std::string_view( + reinterpret_cast(&upperBoundLE), sizeof(upperBoundLE))); + IcebergDeleteFile deleteFile( FileContent::kPositionalDeletes, deleteFilePath->getPath(), - fileFomat_, + fileFormat_, 2, testing::internal::GetFileSize( std::fopen(deleteFilePath->getPath().c_str(), "r")), {}, {}, - {{posColumn->id, "3"}}); + {{posColumn->id, encodedUpperBound}}); auto file = filesystems::getFileSystem(dataFilePath->getPath(), nullptr) ->openFileForRead(dataFilePath->getPath()); @@ -1126,7 +1130,7 @@ TEST_F(HiveIcebergTest, readRowIdColumnComputedWithDeletes) { auto split = std::make_shared( kIcebergConnectorId, dataFilePath->getPath(), - fileFomat_, + fileFormat_, 0, file->size(), std::unordered_map>{}, @@ -1194,7 +1198,7 @@ TEST_F(HiveIcebergTest, readLastUpdatedSequenceNumberAllNulls) { auto split = std::make_shared( kIcebergConnectorId, dataFilePath->getPath(), - fileFomat_, + fileFormat_, 0, file->size(), std::unordered_map>{}, @@ -1258,7 +1262,7 @@ TEST_F(HiveIcebergTest, readLastUpdatedSequenceNumberMixed) { auto split = std::make_shared( kIcebergConnectorId, dataFilePath->getPath(), - fileFomat_, + fileFormat_, 0, file->size(), std::unordered_map>{}, @@ -1300,8 +1304,6 @@ TEST_F(HiveIcebergTest, readLastUpdatedSequenceNumberMixed) { TEST_F(HiveIcebergTest, readLastUpdatedSequenceNumberInherited) { folly::SingletonVault::singleton()->registrationComplete(); - auto fileRowType = ROW({"c0"}, {BIGINT()}); - // Write data file without _last_updated_sequence_number column. std::vector dataVectors; dataVectors.push_back(makeRowVector({ @@ -1317,7 +1319,7 @@ TEST_F(HiveIcebergTest, readLastUpdatedSequenceNumberInherited) { auto split = std::make_shared( kIcebergConnectorId, dataFilePath->getPath(), - fileFomat_, + fileFormat_, 0, file->size(), std::unordered_map>{}, From 14b572961a5fc6daac6d25c33006f388e3776162 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Thu, 12 Mar 2026 09:47:08 +0000 Subject: [PATCH 7/7] Remove redundant boolean flags in favor of optional checks Replace readLastUpdatedSeqNumFromFile_ with lastUpdatedSeqNumOutputIndex_.has_value() && dataSequenceNumber_.has_value(), and computeRowId_ with rowIdOutputIndex_.has_value() && firstRowId_.has_value(). The boolean flags were always equivalent to checking whether both corresponding optionals had values, so they were redundant. Co-authored-by: Joe-Abraham <53977252+Joe-Abraham@users.noreply.github.com> --- .../hive/iceberg/IcebergSplitReader.cpp | 22 +++++-------------- .../hive/iceberg/IcebergSplitReader.h | 12 ++++------ 2 files changed, 10 insertions(+), 24 deletions(-) diff --git a/velox/connectors/hive/iceberg/IcebergSplitReader.cpp b/velox/connectors/hive/iceberg/IcebergSplitReader.cpp index e6323031b82b..fd90b1b30d60 100644 --- a/velox/connectors/hive/iceberg/IcebergSplitReader.cpp +++ b/velox/connectors/hive/iceberg/IcebergSplitReader.cpp @@ -115,30 +115,20 @@ void IcebergSplitReader::prepareSplit( // in next(). If adaptColumns() set them as constants (column missing from // file), no replacement is needed. If the column is read from the file // (not constant), null values must be replaced per spec. - readLastUpdatedSeqNumFromFile_ = false; lastUpdatedSeqNumOutputIndex_ = std::nullopt; if (dataSequenceNumber_.has_value()) { auto* seqNumSpec = scanSpec_->childByName( IcebergMetadataColumn::kLastUpdatedSequenceNumberColumnName); if (seqNumSpec && !seqNumSpec->isConstant()) { - auto idx = readerOutputType_->getChildIdxIfExists( + lastUpdatedSeqNumOutputIndex_ = readerOutputType_->getChildIdxIfExists( IcebergMetadataColumn::kLastUpdatedSequenceNumberColumnName); - if (idx.has_value()) { - readLastUpdatedSeqNumFromFile_ = true; - lastUpdatedSeqNumOutputIndex_ = *idx; - } } } - computeRowId_ = false; rowIdOutputIndex_ = std::nullopt; if (firstRowId_.has_value()) { - auto idx = readerOutputType_->getChildIdxIfExists( + rowIdOutputIndex_ = readerOutputType_->getChildIdxIfExists( IcebergMetadataColumn::kRowIdColumnName); - if (idx.has_value()) { - computeRowId_ = true; - rowIdOutputIndex_ = *idx; - } } if (checkIfSplitIsEmpty(runtimeStats)) { @@ -239,8 +229,8 @@ uint64_t IcebergSplitReader::next(uint64_t size, VectorPtr& output) { // _last_updated_sequence_number with the data sequence number from the // file's manifest entry. Per the spec, null means the value should be // inherited from the manifest entry's sequence number. - if (readLastUpdatedSeqNumFromFile_ && dataSequenceNumber_.has_value() && - lastUpdatedSeqNumOutputIndex_.has_value() && rowsScanned > 0) { + if (lastUpdatedSeqNumOutputIndex_.has_value() && + dataSequenceNumber_.has_value() && rowsScanned > 0) { auto* rowOutput = output->as(); if (rowOutput) { auto& seqNumChild = rowOutput->childAt(*lastUpdatedSeqNumOutputIndex_); @@ -282,8 +272,8 @@ uint64_t IcebergSplitReader::next(uint64_t size, VectorPtr& output) { // the manifest entry's first_row_id plus the row's position in the file. // When positional deletes are applied, we track actual file positions // using the deletion bitmap. - if (computeRowId_ && firstRowId_.has_value() && - rowIdOutputIndex_.has_value() && rowsScanned > 0) { + if (rowIdOutputIndex_.has_value() && firstRowId_.has_value() && + rowsScanned > 0) { auto* rowOutput = output->as(); if (rowOutput) { auto& rowIdChild = rowOutput->childAt(*rowIdOutputIndex_); diff --git a/velox/connectors/hive/iceberg/IcebergSplitReader.h b/velox/connectors/hive/iceberg/IcebergSplitReader.h index 4e6e8956f02d..713f2b356ea0 100644 --- a/velox/connectors/hive/iceberg/IcebergSplitReader.h +++ b/velox/connectors/hive/iceberg/IcebergSplitReader.h @@ -113,12 +113,9 @@ class IcebergSplitReader : public SplitReader { positionalDeleteFileReaders_; BufferPtr deleteBitmap_; - // True if _last_updated_sequence_number is read from the data file (not set - // as a constant). Set in adaptColumns(). - bool readLastUpdatedSeqNumFromFile_{false}; - // The child index of _last_updated_sequence_number in readerOutputType_. - // Used to locate the column in the output for null-value replacement. + // When set (along with dataSequenceNumber_), null values in this column + // are replaced with dataSequenceNumber_ during reads per the Iceberg spec. std::optional lastUpdatedSeqNumOutputIndex_; // Data sequence number from the file's manifest entry, used to replace null @@ -129,10 +126,9 @@ class IcebergSplitReader : public SplitReader { // When available (>= 0), _row_id = first_row_id + _pos for rows not in file. std::optional firstRowId_; - // True if _row_id should be computed as first_row_id + _pos in next(). - bool computeRowId_{false}; - // The child index of _row_id in readerOutputType_. + // When set (along with firstRowId_), _row_id is computed as + // first_row_id + _pos for null values during reads per the Iceberg spec. std::optional rowIdOutputIndex_; }; } // namespace facebook::velox::connector::hive::iceberg