Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 25 additions & 2 deletions velox/connectors/hive/iceberg/IcebergMetadataColumns.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,16 @@ struct IcebergMetadataColumn {
std::shared_ptr<const Type> type;
std::string doc;

// Position delete file's metadata column ID, see
// https://iceberg.apache.org/spec/#position-delete-files.
// Reserved Field IDs for Iceberg tables, refer
// https://iceberg.apache.org/spec/#reserved-field-ids
static constexpr int32_t kPosId = 2'147'483'545;
static constexpr int32_t kFilePathId = 2'147'483'546;
static constexpr int32_t kRowId = 2'147'483'540;
static constexpr int32_t kLastUpdatedSequenceNumber = 2'147'483'539;

static constexpr const char* kRowIdColumnName = "_row_id";
static constexpr const char* kLastUpdatedSequenceNumberColumnName =
"_last_updated_sequence_number";

IcebergMetadataColumn(
int _id,
Expand All @@ -55,6 +61,23 @@ struct IcebergMetadataColumn {
BIGINT(),
"Ordinal position of a deleted row in the data file");
}

static std::shared_ptr<IcebergMetadataColumn> icebergRowIdColumn() {
return std::make_shared<IcebergMetadataColumn>(
kRowId,
kRowIdColumnName,
BIGINT(),
"Implicit row ID that is automatically assigned");
}

static std::shared_ptr<IcebergMetadataColumn>
icebergLastUpdatedSequenceNumberColumn() {
return std::make_shared<IcebergMetadataColumn>(
kLastUpdatedSequenceNumber,
kLastUpdatedSequenceNumberColumnName,
BIGINT(),
"Sequence number when the row was last updated");
}
};

} // namespace facebook::velox::connector::hive::iceberg
220 changes: 217 additions & 3 deletions velox/connectors/hive/iceberg/IcebergSplitReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include "velox/connectors/hive/iceberg/IcebergMetadataColumns.h"
#include "velox/connectors/hive/iceberg/IcebergSplit.h"
#include "velox/dwio/common/BufferUtil.h"
#include "velox/vector/DecodedVector.h"

using namespace facebook::velox::dwio::common;

Expand Down Expand Up @@ -60,12 +61,76 @@ void IcebergSplitReader::prepareSplit(
std::shared_ptr<common::MetadataFilter> metadataFilter,
dwio::common::RuntimeStatistics& runtimeStats,
const folly::F14FastMap<std::string, std::string>& fileReadOps) {
// Expand the file schema to include row lineage metadata columns
// (_row_id and _last_updated_sequence_number) if they're requested
// in the output. These are hidden metadata columns physically stored
// in Iceberg V3 data files but not listed in the table's logical
// schema (dataColumns). The Parquet reader needs them in the file
// schema to read them from the file.
auto fileSchema = baseReaderOpts_.fileSchema();
if (fileSchema) {
auto names = fileSchema->names();
auto types = fileSchema->children();
bool modified = false;
for (const auto* colName :
{IcebergMetadataColumn::kRowIdColumnName,
IcebergMetadataColumn::kLastUpdatedSequenceNumberColumnName}) {
if (readerOutputType_->containsChild(colName) &&
!fileSchema->containsChild(colName)) {
names.push_back(std::string(colName));
types.push_back(BIGINT());
modified = true;
}
}
if (modified) {
baseReaderOpts_.setFileSchema(ROW(std::move(names), std::move(types)));
}
}

createReader();
if (emptySplit_) {
return;
}

// Initialize row lineage fields BEFORE getAdaptedRowType() because
// adaptColumns() needs them to decide how to handle missing columns.
firstRowId_ = std::nullopt;
if (auto it = hiveSplit_->infoColumns.find("$first_row_id");
it != hiveSplit_->infoColumns.end()) {
auto value = folly::to<int64_t>(it->second);
if (value >= 0) {
firstRowId_ = value;
}
}

dataSequenceNumber_ = std::nullopt;
if (auto it = hiveSplit_->infoColumns.find("$data_sequence_number");
it != hiveSplit_->infoColumns.end()) {
dataSequenceNumber_ = folly::to<int64_t>(it->second);
}

auto rowType = getAdaptedRowType();

// After adaptColumns(), check if row lineage columns need null replacement
// in next(). If adaptColumns() set them as constants (column missing from
// file), no replacement is needed. If the column is read from the file
// (not constant), null values must be replaced per spec.
lastUpdatedSeqNumOutputIndex_ = std::nullopt;
if (dataSequenceNumber_.has_value()) {
auto* seqNumSpec = scanSpec_->childByName(
IcebergMetadataColumn::kLastUpdatedSequenceNumberColumnName);
if (seqNumSpec && !seqNumSpec->isConstant()) {
lastUpdatedSeqNumOutputIndex_ = readerOutputType_->getChildIdxIfExists(
IcebergMetadataColumn::kLastUpdatedSequenceNumberColumnName);
}
}

rowIdOutputIndex_ = std::nullopt;
if (firstRowId_.has_value()) {
rowIdOutputIndex_ = readerOutputType_->getChildIdxIfExists(
IcebergMetadataColumn::kRowIdColumnName);
}

if (checkIfSplitIsEmpty(runtimeStats)) {
VELOX_CHECK(emptySplit_);
return;
Expand Down Expand Up @@ -160,6 +225,131 @@ uint64_t IcebergSplitReader::next(uint64_t size, VectorPtr& output) {

auto rowsScanned = baseRowReader_->next(actualSize, output, &mutation);

// For Iceberg V3 row lineage: replace null values in
// _last_updated_sequence_number with the data sequence number from the
// file's manifest entry. Per the spec, null means the value should be
// inherited from the manifest entry's sequence number.
if (lastUpdatedSeqNumOutputIndex_.has_value() &&
dataSequenceNumber_.has_value() && rowsScanned > 0) {
auto* rowOutput = output->as<RowVector>();
if (rowOutput) {
auto& seqNumChild = rowOutput->childAt(*lastUpdatedSeqNumOutputIndex_);
// Load lazy vector - the Parquet reader wraps columns in LazyVector.
seqNumChild = BaseVector::loadedVectorShared(seqNumChild);
auto vectorSize = seqNumChild->size();

if (seqNumChild->isConstantEncoding()) {
if (seqNumChild->isNullAt(0)) {
seqNumChild = std::make_shared<ConstantVector<int64_t>>(
connectorQueryCtx_->memoryPool(),
vectorSize,
false,
BIGINT(),
static_cast<int64_t>(*dataSequenceNumber_));
}
} else if (seqNumChild->mayHaveNulls()) {
// Use DecodedVector to handle any encoding (flat, dictionary, etc.).
DecodedVector decoded(*seqNumChild);
if (decoded.mayHaveNulls()) {
auto pool = connectorQueryCtx_->memoryPool();
auto newFlat = BaseVector::create<FlatVector<int64_t>>(
BIGINT(), vectorSize, pool);
for (vector_size_t i = 0; i < vectorSize; ++i) {
if (decoded.isNullAt(i)) {
newFlat->set(i, *dataSequenceNumber_);
} else {
newFlat->set(i, decoded.valueAt<int64_t>(i));
}
}
seqNumChild = newFlat;
}
}
}
}

// For Iceberg V3 row lineage: compute _row_id = first_row_id + _pos.
// When the data file doesn't contain _row_id physically, compute it from
// the manifest entry's first_row_id plus the row's position in the file.
// When positional deletes are applied, we track actual file positions
// using the deletion bitmap.
if (rowIdOutputIndex_.has_value() && firstRowId_.has_value() &&
rowsScanned > 0) {
auto* rowOutput = output->as<RowVector>();
if (rowOutput) {
auto& rowIdChild = rowOutput->childAt(*rowIdOutputIndex_);
// Load lazy vector - the Parquet reader wraps columns in LazyVector.
rowIdChild = BaseVector::loadedVectorShared(rowIdChild);
auto vectorSize = rowIdChild->size();

if (rowIdChild->isConstantEncoding() && rowIdChild->isNullAt(0)) {
// All null - compute _row_id = first_row_id + _pos for all rows.
auto pool = connectorQueryCtx_->memoryPool();
auto flatVec =
BaseVector::create<FlatVector<int64_t>>(BIGINT(), vectorSize, pool);
int64_t batchStartPos = splitOffset_ + baseReadOffset_;
if (mutation.deletedRows == nullptr) {
for (vector_size_t i = 0; i < vectorSize; ++i) {
flatVec->set(
i, static_cast<int64_t>(*firstRowId_) + batchStartPos + i);
}
} else {
// With deletions: use the bitmap to find actual file positions
// of surviving rows.
vector_size_t outputIdx = 0;
for (vector_size_t j = 0;
j < static_cast<vector_size_t>(actualSize) &&
outputIdx < vectorSize;
++j) {
if (!bits::isBitSet(mutation.deletedRows, j)) {
flatVec->set(
outputIdx++,
static_cast<int64_t>(*firstRowId_) + batchStartPos + j);
}
}
}
rowIdChild = flatVec;
} else if (rowIdChild->mayHaveNulls()) {
// Column is in the file but has some null values - replace nulls
// with first_row_id + _pos. Use DecodedVector for encoding support.
DecodedVector decoded(*rowIdChild);
if (decoded.mayHaveNulls()) {
auto pool = connectorQueryCtx_->memoryPool();
auto newFlat = BaseVector::create<FlatVector<int64_t>>(
BIGINT(), vectorSize, pool);
int64_t batchStartPos = splitOffset_ + baseReadOffset_;
if (mutation.deletedRows == nullptr) {
for (vector_size_t i = 0; i < vectorSize; ++i) {
if (decoded.isNullAt(i)) {
newFlat->set(
i, static_cast<int64_t>(*firstRowId_) + batchStartPos + i);
} else {
newFlat->set(i, decoded.valueAt<int64_t>(i));
}
}
} else {
vector_size_t outputIdx = 0;
for (vector_size_t j = 0;
j < static_cast<vector_size_t>(actualSize) &&
outputIdx < vectorSize;
++j) {
if (!bits::isBitSet(mutation.deletedRows, j)) {
if (decoded.isNullAt(outputIdx)) {
newFlat->set(
outputIdx,
static_cast<int64_t>(*firstRowId_) + batchStartPos + j);
} else {
newFlat->set(outputIdx, decoded.valueAt<int64_t>(outputIdx));
}
++outputIdx;
}
}
}
rowIdChild = newFlat;
}
}
}
}

return rowsScanned;
}

Expand Down Expand Up @@ -197,15 +387,39 @@ std::vector<TypePtr> IcebergSplitReader::adaptColumns(
// files, partition column values are stored in partition metadata
// rather than in the data file itself, following Hive's partitioning
// convention.
// 3. _last_updated_sequence_number: For Iceberg V3 row lineage, if
// the column is not in the file, inherit the data sequence number
// from the file's manifest entry.
// 4. _row_id: For Iceberg V3 row lineage, if the column is not in
// the file, set as NULL constant here. When first_row_id is
// available, next() will replace NULL with first_row_id + _pos.
if (auto it = hiveSplit_->partitionKeys.find(fieldName);
it != hiveSplit_->partitionKeys.end()) {
setPartitionValue(childSpec.get(), fieldName, it->second);
} else if (
fieldName ==
IcebergMetadataColumn::kLastUpdatedSequenceNumberColumnName &&
dataSequenceNumber_.has_value()) {
childSpec->setConstantValue(
std::make_shared<ConstantVector<int64_t>>(
connectorQueryCtx_->memoryPool(),
1,
false,
BIGINT(),
static_cast<int64_t>(*dataSequenceNumber_)));
} else {
// Column missing from both the file and partition keys. This
// can be a schema evolution column (in tableSchema) or a
// metadata column like _row_id (in readerOutputType_ but not
// in tableSchema). Try readerOutputType_ first, then fall
// back to tableSchema.
auto outputIdx = readerOutputType_->getChildIdxIfExists(fieldName);
auto colType = outputIdx.has_value()
? readerOutputType_->childAt(*outputIdx)
: tableSchema->findChild(fieldName);
childSpec->setConstantValue(
BaseVector::createNullConstant(
tableSchema->findChild(fieldName),
1,
connectorQueryCtx_->memoryPool()));
colType, 1, connectorQueryCtx_->memoryPool()));
}
}
}
Expand Down
29 changes: 29 additions & 0 deletions velox/connectors/hive/iceberg/IcebergSplitReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,17 @@ class IcebergSplitReader : public SplitReader {
/// Column was added to the table schema after this data file was
/// written. Set as NULL constant since the old file doesn't contain
/// this column.
/// c) Row lineage (_last_updated_sequence_number):
/// For Iceberg V3 row lineage, if the column is not in the file,
/// inherit the data sequence number from the file's manifest entry
/// (provided via $data_sequence_number info column). Per the spec,
/// null values indicate the value should be inherited.
/// d) Row lineage (_row_id):
/// Per the spec, null _row_id values are assigned as
/// first_row_id + _pos. When first_row_id is available from
/// the split info column $first_row_id, the value is computed
/// in next(). When first_row_id is not available (e.g.,
/// pre-V3 tables), NULL is returned.
std::vector<TypePtr> adaptColumns(
const RowTypePtr& fileType,
const RowTypePtr& tableSchema) const override;
Expand All @@ -101,5 +112,23 @@ class IcebergSplitReader : public SplitReader {
std::list<std::unique_ptr<PositionalDeleteFileReader>>
positionalDeleteFileReaders_;
BufferPtr deleteBitmap_;

// The child index of _last_updated_sequence_number in readerOutputType_.
// When set (along with dataSequenceNumber_), null values in this column
// are replaced with dataSequenceNumber_ during reads per the Iceberg spec.
std::optional<column_index_t> lastUpdatedSeqNumOutputIndex_;

// Data sequence number from the file's manifest entry, used to replace null
// values in _last_updated_sequence_number during reads.
std::optional<int64_t> dataSequenceNumber_;

// First row ID from the manifest entry, used to compute _row_id.
// When available (>= 0), _row_id = first_row_id + _pos for rows not in file.
std::optional<int64_t> firstRowId_;
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@copilot Is it equivalent to lastUpdatedSeqNumOutputIndex_.has_value() && dataSequenceNumber_.has_value()?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, readLastUpdatedSeqNumFromFile_ was equivalent to lastUpdatedSeqNumOutputIndex_.has_value() && dataSequenceNumber_.has_value(). Removed the redundant boolean in 14b5729 — now using the optional checks directly.


// The child index of _row_id in readerOutputType_.
// When set (along with firstRowId_), _row_id is computed as
// first_row_id + _pos for null values during reads per the Iceberg spec.
std::optional<column_index_t> rowIdOutputIndex_;
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@copilot , Is it equivalent to rowIdOutputIndex_.has_value() && firstRowId_.has_value()?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, computeRowId_ was equivalent to rowIdOutputIndex_.has_value() && firstRowId_.has_value(). Removed the redundant boolean in 14b5729 — now using the optional checks directly.

};
} // namespace facebook::velox::connector::hive::iceberg
Loading