Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 10 additions & 7 deletions src/iceberg/avro/avro_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,14 @@ class AvroWriter::Impl {

bool Closed() const { return writer_ == nullptr; }

int64_t length() { return total_bytes_; }
Result<int64_t> length() {
if (Closed()) {
return total_bytes_;
}
// Return current flushed length when writer is still open
ICEBERG_ARROW_ASSIGN_OR_RETURN(auto current_pos, arrow_output_stream_->Tell());
return current_pos;
}

private:
// The schema to write.
Expand All @@ -135,6 +142,7 @@ class AvroWriter::Impl {
std::unique_ptr<::avro::GenericDatum> datum_;
// Arrow schema to write data.
ArrowSchema arrow_schema_;
// Total length of the written Avro file.
int64_t total_bytes_ = 0;
};

Expand Down Expand Up @@ -162,12 +170,7 @@ Result<Metrics> AvroWriter::metrics() {
return Invalid("AvroWriter is not closed");
}

Result<int64_t> AvroWriter::length() {
if (impl_->Closed()) {
return impl_->length();
}
return Invalid("AvroWriter is not closed");
}
Result<int64_t> AvroWriter::length() { return impl_->length(); }

std::vector<int64_t> AvroWriter::split_offsets() { return {}; }

Expand Down
18 changes: 11 additions & 7 deletions src/iceberg/parquet/parquet_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,16 @@ class ParquetWriter::Impl {

bool Closed() const { return writer_ == nullptr; }

int64_t length() const { return total_bytes_; }
Result<int64_t> length() {
if (Closed()) {
return total_bytes_;
}
// Return current flushed length when writer is still open.
// It would be good if we could get the number of buffered bytes
// from the internal RowGroupWriter.
ICEBERG_ARROW_ASSIGN_OR_RETURN(auto current_pos, output_stream_->Tell());
return current_pos;
}

std::vector<int64_t> split_offsets() const { return split_offsets_; }

Expand Down Expand Up @@ -144,12 +153,7 @@ Result<Metrics> ParquetWriter::metrics() {
return {};
}

Result<int64_t> ParquetWriter::length() {
if (!impl_->Closed()) {
return Invalid("ParquetWriter is not closed");
}
return impl_->length();
}
Result<int64_t> ParquetWriter::length() { return impl_->length(); }

std::vector<int64_t> ParquetWriter::split_offsets() {
if (!impl_->Closed()) {
Expand Down
Loading