Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 25 additions & 2 deletions velox/connectors/hive/iceberg/IcebergMetadataColumns.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,16 @@ struct IcebergMetadataColumn {
std::shared_ptr<const Type> type;
std::string doc;

// Position delete file's metadata column ID, see
// https://iceberg.apache.org/spec/#position-delete-files.
// Reserved Field IDs for Iceberg tables, refer
// https://iceberg.apache.org/spec/#reserved-field-ids
static constexpr int32_t kPosId = 2'147'483'545;
static constexpr int32_t kFilePathId = 2'147'483'546;
static constexpr int32_t kRowId = 2'147'483'540;
static constexpr int32_t kLastUpdatedSequenceNumber = 2'147'483'539;

static constexpr const char* kRowIdColumnName = "_row_id";
static constexpr const char* kLastUpdatedSequenceNumberColumnName =
"_last_updated_sequence_number";

IcebergMetadataColumn(
int _id,
Expand All @@ -55,6 +61,23 @@ struct IcebergMetadataColumn {
BIGINT(),
"Ordinal position of a deleted row in the data file");
}

static std::shared_ptr<IcebergMetadataColumn> icebergRowIdColumn() {
return std::make_shared<IcebergMetadataColumn>(
kRowId,
kRowIdColumnName,
Comment on lines +65 to +68
Copy link

Copilot AI Mar 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new metadata column IDs for _row_id and _last_updated_sequence_number are hard-coded magic numbers. For consistency with the existing delete-file metadata IDs above, consider introducing named static constexpr int32_t constants (and ideally a short comment pointing to the Iceberg spec section/source for these IDs). This makes it easier to validate and update these values later.

Copilot uses AI. Check for mistakes.
BIGINT(),
"Implicit row ID that is automatically assigned");
}

static std::shared_ptr<IcebergMetadataColumn>
icebergLastUpdatedSequenceNumberColumn() {
return std::make_shared<IcebergMetadataColumn>(
kLastUpdatedSequenceNumber,
kLastUpdatedSequenceNumberColumnName,
BIGINT(),
"Sequence number when the row was last updated");
}
};

} // namespace facebook::velox::connector::hive::iceberg
220 changes: 217 additions & 3 deletions velox/connectors/hive/iceberg/IcebergSplitReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include "velox/connectors/hive/iceberg/IcebergMetadataColumns.h"
#include "velox/connectors/hive/iceberg/IcebergSplit.h"
#include "velox/dwio/common/BufferUtil.h"
#include "velox/vector/DecodedVector.h"

using namespace facebook::velox::dwio::common;

Expand Down Expand Up @@ -60,12 +61,76 @@ void IcebergSplitReader::prepareSplit(
std::shared_ptr<common::MetadataFilter> metadataFilter,
dwio::common::RuntimeStatistics& runtimeStats,
const folly::F14FastMap<std::string, std::string>& fileReadOps) {
// Expand the file schema to include row lineage metadata columns
// (_row_id and _last_updated_sequence_number) if they're requested
// in the output. These are hidden metadata columns physically stored
// in Iceberg V3 data files but not listed in the table's logical
// schema (dataColumns). The Parquet reader needs them in the file
// schema to read them from the file.
auto fileSchema = baseReaderOpts_.fileSchema();
if (fileSchema) {
auto names = fileSchema->names();
auto types = fileSchema->children();
bool modified = false;
for (const auto* colName :
{IcebergMetadataColumn::kRowIdColumnName,
IcebergMetadataColumn::kLastUpdatedSequenceNumberColumnName}) {
if (readerOutputType_->containsChild(colName) &&
!fileSchema->containsChild(colName)) {
names.push_back(std::string(colName));
types.push_back(BIGINT());
modified = true;
}
}
if (modified) {
baseReaderOpts_.setFileSchema(ROW(std::move(names), std::move(types)));
}
}

createReader();
if (emptySplit_) {
return;
}
Comment on lines +85 to 93
Copy link

Copilot AI Mar 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

prepareSplit receives fileReadOps, but the implementation calls createReader() without forwarding it. This means split-specific read options won’t be applied for Iceberg reads. Pass fileReadOps into createReader(fileReadOps) (and keep the row-lineage file-schema expansion before that call).

Copilot uses AI. Check for mistakes.

// Initialize row lineage fields BEFORE getAdaptedRowType() because
// adaptColumns() needs them to decide how to handle missing columns.
firstRowId_ = std::nullopt;
if (auto it = hiveSplit_->infoColumns.find("$first_row_id");
it != hiveSplit_->infoColumns.end()) {
auto value = folly::to<int64_t>(it->second);
if (value >= 0) {
firstRowId_ = value;
}
}

dataSequenceNumber_ = std::nullopt;
if (auto it = hiveSplit_->infoColumns.find("$data_sequence_number");
it != hiveSplit_->infoColumns.end()) {
dataSequenceNumber_ = folly::to<int64_t>(it->second);
}

auto rowType = getAdaptedRowType();

// After adaptColumns(), check if row lineage columns need null replacement
// in next(). If adaptColumns() set them as constants (column missing from
// file), no replacement is needed. If the column is read from the file
// (not constant), null values must be replaced per spec.
lastUpdatedSeqNumOutputIndex_ = std::nullopt;
if (dataSequenceNumber_.has_value()) {
auto* seqNumSpec = scanSpec_->childByName(
IcebergMetadataColumn::kLastUpdatedSequenceNumberColumnName);
if (seqNumSpec && !seqNumSpec->isConstant()) {
lastUpdatedSeqNumOutputIndex_ = readerOutputType_->getChildIdxIfExists(
IcebergMetadataColumn::kLastUpdatedSequenceNumberColumnName);
}
}

rowIdOutputIndex_ = std::nullopt;
if (firstRowId_.has_value()) {
rowIdOutputIndex_ = readerOutputType_->getChildIdxIfExists(
IcebergMetadataColumn::kRowIdColumnName);
}

if (checkIfSplitIsEmpty(runtimeStats)) {
VELOX_CHECK(emptySplit_);
return;
Expand Down Expand Up @@ -160,6 +225,131 @@ uint64_t IcebergSplitReader::next(uint64_t size, VectorPtr& output) {

auto rowsScanned = baseRowReader_->next(actualSize, output, &mutation);

// For Iceberg V3 row lineage: replace null values in
// _last_updated_sequence_number with the data sequence number from the
// file's manifest entry. Per the spec, null means the value should be
// inherited from the manifest entry's sequence number.
if (lastUpdatedSeqNumOutputIndex_.has_value() &&
dataSequenceNumber_.has_value() && rowsScanned > 0) {
auto* rowOutput = output->as<RowVector>();
if (rowOutput) {
auto& seqNumChild = rowOutput->childAt(*lastUpdatedSeqNumOutputIndex_);
// Load lazy vector - the Parquet reader wraps columns in LazyVector.
seqNumChild = BaseVector::loadedVectorShared(seqNumChild);
auto vectorSize = seqNumChild->size();

if (seqNumChild->isConstantEncoding()) {
if (seqNumChild->isNullAt(0)) {
seqNumChild = std::make_shared<ConstantVector<int64_t>>(
connectorQueryCtx_->memoryPool(),
vectorSize,
false,
BIGINT(),
static_cast<int64_t>(*dataSequenceNumber_));
}
} else if (seqNumChild->mayHaveNulls()) {
// Use DecodedVector to handle any encoding (flat, dictionary, etc.).
DecodedVector decoded(*seqNumChild);
if (decoded.mayHaveNulls()) {
auto pool = connectorQueryCtx_->memoryPool();
auto newFlat = BaseVector::create<FlatVector<int64_t>>(
BIGINT(), vectorSize, pool);
for (vector_size_t i = 0; i < vectorSize; ++i) {
if (decoded.isNullAt(i)) {
newFlat->set(i, *dataSequenceNumber_);
} else {
newFlat->set(i, decoded.valueAt<int64_t>(i));
}
}
seqNumChild = newFlat;
}
}
Comment on lines 226 to +266
Copy link

Copilot AI Mar 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

baseRowReader_->next() returns the number of rows scanned (i.e. actualSize), not the number of rows produced in output after filters/deletes. This block iterates up to rowsScanned and even creates vectors sized rowsScanned, which will go out-of-bounds or produce child vectors with the wrong length when output->size() < rowsScanned (common with filters or positional deletes). Use rowOutput->size() (or output->size()) for loops and created vector sizes, and handle dictionary-wrapped results (e.g. after filtering) so null replacement still runs.

Copilot uses AI. Check for mistakes.
Comment on lines +250 to +266
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (performance): Avoid always copying the vector when there are no actual nulls present

In the mayHaveNulls() branch we always allocate and populate a new FlatVector<int64_t> for _last_updated_sequence_number, which can be costly for wide vectors and high‑throughput workloads. Since mayHaveNulls() is conservative, consider first checking for actual nulls (e.g., via the nulls buffer / null count) and, if none are present, reuse the original vector instead of copying.

Suggested change
} else if (seqNumChild->mayHaveNulls()) {
// Handle any vector encoding (flat, dictionary, etc.) by creating
// a new flat vector with null values replaced.
auto pool = connectorQueryCtx_->memoryPool();
auto* simpleVec = seqNumChild->as<SimpleVector<int64_t>>();
auto newFlat =
BaseVector::create<FlatVector<int64_t>>(BIGINT(), vectorSize, pool);
for (vector_size_t i = 0; i < vectorSize; ++i) {
if (simpleVec->isNullAt(i)) {
newFlat->set(i, *dataSequenceNumber_);
} else {
newFlat->set(i, simpleVec->valueAt(i));
}
}
seqNumChild = newFlat;
}
} else if (seqNumChild->mayHaveNulls()) {
// Fast-path: 'mayHaveNulls' is conservative; only rewrite when we
// actually have nulls to replace.
if (seqNumChild->hasNulls()) {
// Handle any vector encoding (flat, dictionary, etc.) by creating
// a new flat vector with null values replaced.
auto pool = connectorQueryCtx_->memoryPool();
auto* simpleVec = seqNumChild->as<SimpleVector<int64_t>>();
auto newFlat =
BaseVector::create<FlatVector<int64_t>>(BIGINT(), vectorSize, pool);
for (vector_size_t i = 0; i < vectorSize; ++i) {
if (simpleVec->isNullAt(i)) {
newFlat->set(i, *dataSequenceNumber_);
} else {
newFlat->set(i, simpleVec->valueAt(i));
}
}
seqNumChild = newFlat;
}
}

}
}

// For Iceberg V3 row lineage: compute _row_id = first_row_id + _pos.
// When the data file doesn't contain _row_id physically, compute it from
// the manifest entry's first_row_id plus the row's position in the file.
// When positional deletes are applied, we track actual file positions
// using the deletion bitmap.
if (rowIdOutputIndex_.has_value() && firstRowId_.has_value() &&
rowsScanned > 0) {
auto* rowOutput = output->as<RowVector>();
if (rowOutput) {
auto& rowIdChild = rowOutput->childAt(*rowIdOutputIndex_);
// Load lazy vector - the Parquet reader wraps columns in LazyVector.
rowIdChild = BaseVector::loadedVectorShared(rowIdChild);
auto vectorSize = rowIdChild->size();

if (rowIdChild->isConstantEncoding() && rowIdChild->isNullAt(0)) {
// All null - compute _row_id = first_row_id + _pos for all rows.
auto pool = connectorQueryCtx_->memoryPool();
auto flatVec =
BaseVector::create<FlatVector<int64_t>>(BIGINT(), vectorSize, pool);
int64_t batchStartPos = splitOffset_ + baseReadOffset_;
if (mutation.deletedRows == nullptr) {
for (vector_size_t i = 0; i < vectorSize; ++i) {
flatVec->set(
i, static_cast<int64_t>(*firstRowId_) + batchStartPos + i);
}
Comment on lines +289 to +294
Copy link

Copilot AI Mar 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_row_id is computed using sequential output indices (e.g., batchStartPos + i / batchStartPos + j), but RowReader::next() can return a compacted output (e.g., dictionary-wrapped) after applying scan filters and deletions. In those cases, output row 0 may correspond to an arbitrary original row position within the scanned batch, so this computation can produce incorrect _row_id values. Consider deriving the per-output-row file position from the vector's dictionary indices (e.g., via DecodedVector) and computing first_row_id + (batchStartPos + originalRowOffset) for each output row, instead of assuming output index == file position.

Copilot uses AI. Check for mistakes.
} else {
// With deletions: use the bitmap to find actual file positions
// of surviving rows.
vector_size_t outputIdx = 0;
for (vector_size_t j = 0;
j < static_cast<vector_size_t>(actualSize) &&
outputIdx < vectorSize;
++j) {
if (!bits::isBitSet(mutation.deletedRows, j)) {
flatVec->set(
outputIdx++,
static_cast<int64_t>(*firstRowId_) + batchStartPos + j);
}
}
}
rowIdChild = flatVec;
} else if (rowIdChild->mayHaveNulls()) {
// Column is in the file but has some null values - replace nulls
// with first_row_id + _pos. Use DecodedVector for encoding support.
DecodedVector decoded(*rowIdChild);
if (decoded.mayHaveNulls()) {
auto pool = connectorQueryCtx_->memoryPool();
auto newFlat = BaseVector::create<FlatVector<int64_t>>(
BIGINT(), vectorSize, pool);
int64_t batchStartPos = splitOffset_ + baseReadOffset_;
if (mutation.deletedRows == nullptr) {
for (vector_size_t i = 0; i < vectorSize; ++i) {
if (decoded.isNullAt(i)) {
newFlat->set(
i, static_cast<int64_t>(*firstRowId_) + batchStartPos + i);
} else {
newFlat->set(i, decoded.valueAt<int64_t>(i));
}
}
} else {
vector_size_t outputIdx = 0;
for (vector_size_t j = 0;
j < static_cast<vector_size_t>(actualSize) &&
outputIdx < vectorSize;
++j) {
if (!bits::isBitSet(mutation.deletedRows, j)) {
if (decoded.isNullAt(outputIdx)) {
newFlat->set(
outputIdx,
static_cast<int64_t>(*firstRowId_) + batchStartPos + j);
} else {
newFlat->set(outputIdx, decoded.valueAt<int64_t>(outputIdx));
}
++outputIdx;
}
}
}
rowIdChild = newFlat;
}
}
Comment on lines +270 to +349
Copy link

Copilot AI Mar 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The _row_id computation currently assumes output row i corresponds to file position batchStartPos + i (or reconstructs positions using only the delete bitmap). This breaks as soon as any filter (or randomSkip) is applied because the output is dictionary-wrapped with indices into the scanned batch; positions are no longer contiguous and aren’t derivable from i. Consider using the dictionary indices (or a decoded vector) to map each output row back to its scanned-row offset j, then compute first_row_id + batchStartPos + j (this also removes the need for the separate delete-bitmap walk). Also, avoid using rowsScanned for vector sizing/iteration here for the same reason as above.

Copilot uses AI. Check for mistakes.
}
}

return rowsScanned;
}

Expand Down Expand Up @@ -197,15 +387,39 @@ std::vector<TypePtr> IcebergSplitReader::adaptColumns(
// files, partition column values are stored in partition metadata
// rather than in the data file itself, following Hive's partitioning
// convention.
// 3. _last_updated_sequence_number: For Iceberg V3 row lineage, if
// the column is not in the file, inherit the data sequence number
// from the file's manifest entry.
// 4. _row_id: For Iceberg V3 row lineage, if the column is not in
// the file, set as NULL constant here. When first_row_id is
// available, next() will replace NULL with first_row_id + _pos.
if (auto it = hiveSplit_->partitionKeys.find(fieldName);
it != hiveSplit_->partitionKeys.end()) {
setPartitionValue(childSpec.get(), fieldName, it->second);
} else if (
fieldName ==
IcebergMetadataColumn::kLastUpdatedSequenceNumberColumnName &&
dataSequenceNumber_.has_value()) {
childSpec->setConstantValue(
std::make_shared<ConstantVector<int64_t>>(
connectorQueryCtx_->memoryPool(),
1,
false,
BIGINT(),
static_cast<int64_t>(*dataSequenceNumber_)));
} else {
// Column missing from both the file and partition keys. This
// can be a schema evolution column (in tableSchema) or a
// metadata column like _row_id (in readerOutputType_ but not
// in tableSchema). Try readerOutputType_ first, then fall
// back to tableSchema.
auto outputIdx = readerOutputType_->getChildIdxIfExists(fieldName);
auto colType = outputIdx.has_value()
? readerOutputType_->childAt(*outputIdx)
: tableSchema->findChild(fieldName);
childSpec->setConstantValue(
BaseVector::createNullConstant(
tableSchema->findChild(fieldName),
1,
connectorQueryCtx_->memoryPool()));
colType, 1, connectorQueryCtx_->memoryPool()));
Comment on lines +416 to +422
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (bug_risk): Guard against null colType when constructing a null constant for missing columns

In this fallback branch, colType may be null if the field name is not found in either readerOutputType_ or tableSchema. Passing a null type into createNullConstant(colType, ...) would likely crash. Consider adding a VELOX_CHECK to enforce non-null here or handling the null case explicitly with a clear error path instead of failing implicitly.

}
}
}
Expand Down
26 changes: 26 additions & 0 deletions velox/connectors/hive/iceberg/IcebergSplitReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,17 @@ class IcebergSplitReader : public SplitReader {
/// Column was added to the table schema after this data file was
/// written. Set as NULL constant since the old file doesn't contain
/// this column.
/// c) Row lineage (_last_updated_sequence_number):
/// For Iceberg V3 row lineage, if the column is not in the file,
/// inherit the data sequence number from the file's manifest entry
/// (provided via $data_sequence_number info column). Per the spec,
/// null values indicate the value should be inherited.
/// d) Row lineage (_row_id):
/// Per the spec, null _row_id values are assigned as
/// first_row_id + _pos. When first_row_id is available from
/// the split info column $first_row_id, the value is computed
/// in next(). When first_row_id is not available (e.g.,
/// pre-V3 tables), NULL is returned.
std::vector<TypePtr> adaptColumns(
const RowTypePtr& fileType,
const RowTypePtr& tableSchema) const override;
Expand All @@ -101,5 +112,20 @@ class IcebergSplitReader : public SplitReader {
std::list<std::unique_ptr<PositionalDeleteFileReader>>
positionalDeleteFileReaders_;
BufferPtr deleteBitmap_;

// The child index of _last_updated_sequence_number in readerOutputType_.
// Used to locate the column in the output for null-value replacement.
std::optional<column_index_t> lastUpdatedSeqNumOutputIndex_;

// Data sequence number from the file's manifest entry, used to replace null
// values in _last_updated_sequence_number during reads.
std::optional<int64_t> dataSequenceNumber_;

// First row ID from the manifest entry, used to compute _row_id.
// When available (>= 0), _row_id = first_row_id + _pos for rows not in file.
std::optional<int64_t> firstRowId_;

// The child index of _row_id in readerOutputType_.
std::optional<column_index_t> rowIdOutputIndex_;
};
} // namespace facebook::velox::connector::hive::iceberg
Loading