Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion cpp/src/parquet/column_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2076,7 +2076,10 @@ Status TypedColumnWriterImpl<ParquetType>::WriteArrowSerialize(
PARQUET_THROW_NOT_OK(ctx->GetScratchData<ParquetCType>(array.length(), &buffer));

SerializeFunctor<ParquetType, ArrowType> functor;
RETURN_NOT_OK(functor.Serialize(checked_cast<const ArrayType&>(array), ctx, buffer));
// The value buffer could be empty if all values are nulls.
if (array.null_count() != array.length()) {
RETURN_NOT_OK(functor.Serialize(checked_cast<const ArrayType&>(array), ctx, buffer));
}
bool no_nulls =
this->descr()->schema_node()->is_required() || (array.null_count() == 0);
if (!maybe_parent_nulls && no_nulls) {
Expand Down
83 changes: 83 additions & 0 deletions cpp/src/parquet/column_writer_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <gmock/gmock.h>
#include <gtest/gtest.h>

#include "arrow/array.h"
#include "arrow/io/buffered.h"
#include "arrow/io/file.h"
#include "arrow/testing/gtest_util.h"
Expand Down Expand Up @@ -2380,5 +2381,87 @@ TYPED_TEST(TestColumnWriterMaxRowsPerPage, RequiredLargeChunk) {
}
}

class TestArrowWriteSerialize : public ::testing::Test {
public:
void SetUp() {
// Create a Parquet schema
// Int8 maps to Int32 in Parquet with INT_8 converted type
auto node = schema::PrimitiveNode::Make("int8_col", Repetition::OPTIONAL, Type::INT32,
ConvertedType::INT_8);
schema_descriptor_ = std::make_shared<ColumnDescriptor>(node, 1, 0);
sink_ = CreateOutputStream();
writer_properties_ = default_writer_properties();
}

std::shared_ptr<TypedColumnWriter<Int32Type>> BuildWriter() {
metadata_ =
ColumnChunkMetaDataBuilder::Make(writer_properties_, schema_descriptor_.get());
std::unique_ptr<PageWriter> pager =
PageWriter::Open(sink_, Compression::UNCOMPRESSED, metadata_.get());
std::shared_ptr<ColumnWriter> writer =
ColumnWriter::Make(metadata_.get(), std::move(pager), writer_properties_.get());
return std::static_pointer_cast<TypedColumnWriter<Int32Type>>(writer);
}

std::shared_ptr<TypedColumnReader<Int32Type>> BuildReader(int64_t num_rows) {
EXPECT_OK_AND_ASSIGN(auto buffer, sink_->Finish());
auto source = std::make_shared<::arrow::io::BufferReader>(buffer);
ReaderProperties reader_properties;
std::unique_ptr<PageReader> page_reader = PageReader::Open(
std::move(source), num_rows, Compression::UNCOMPRESSED, reader_properties);
std::shared_ptr<ColumnReader> reader =
ColumnReader::Make(schema_descriptor_.get(), std::move(page_reader));
return std::static_pointer_cast<TypedColumnReader<Int32Type>>(reader);
}

protected:
std::shared_ptr<ColumnDescriptor> schema_descriptor_;
std::shared_ptr<::arrow::io::BufferOutputStream> sink_;
std::shared_ptr<WriterProperties> writer_properties_;
std::unique_ptr<ColumnChunkMetaDataBuilder> metadata_;
};

TEST_F(TestArrowWriteSerialize, AllNulls) {
std::shared_ptr<::arrow::Buffer> null_bitmap;
ASSERT_OK_AND_ASSIGN(null_bitmap, ::arrow::AllocateBitmap(100));
// Set all bits to 0 (null)
::arrow::bit_util::SetBitsTo(null_bitmap->mutable_data(), 0, 100, false);

std::shared_ptr<::arrow::Buffer> data_buffer = nullptr;

auto array_data =
::arrow::ArrayData::Make(::arrow::int8(), 100, {null_bitmap, data_buffer}, 100);
auto array = ::arrow::MakeArray(array_data);

auto typed_writer = this->BuildWriter();

std::vector<int16_t> def_levels(100, 0);
std::vector<int16_t> rep_levels(100, 0);

auto arrow_writer_properties = default_arrow_writer_properties();
ArrowWriteContext ctx(::arrow::default_memory_pool(), arrow_writer_properties.get());

ASSERT_OK(typed_writer->WriteArrow(def_levels.data(), rep_levels.data(), 100, *array,
&ctx, true));

typed_writer->Close();

auto typed_reader = this->BuildReader(100);
int64_t values_read = 0;
std::vector<int16_t> def_levels_out(100);
std::vector<int16_t> rep_levels_out(100);
std::vector<int32_t> values_out(100);

int64_t rows_read = typed_reader->ReadBatch(
100, def_levels_out.data(), rep_levels_out.data(), values_out.data(), &values_read);

ASSERT_EQ(100, rows_read);
ASSERT_EQ(0, values_read);

for (int i = 0; i < 100; ++i) {
ASSERT_EQ(0, def_levels_out[i]);
}
}

} // namespace test
} // namespace parquet
Loading