Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
203 changes: 182 additions & 21 deletions cpp/src/parquet/arrow/generate_fuzz_corpus.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,11 @@
#include <cstdlib>
#include <functional>
#include <iostream>
#include <limits>
#include <memory>
#include <sstream>
#include <string>
#include <utility>
#include <vector>

#include "arrow/array.h"
Expand All @@ -49,9 +51,11 @@ using ::arrow::internal::CreateDir;
using ::arrow::internal::PlatformFilename;
using ::arrow::util::Float16;
using ::parquet::ArrowWriterProperties;
using ::parquet::Encoding;
using ::parquet::WriterProperties;

struct WriteConfig {
std::string name;
std::shared_ptr<WriterProperties> writer_properties;
std::shared_ptr<ArrowWriterProperties> arrow_writer_properties;
};
Expand All @@ -74,6 +78,13 @@ struct Column {
}
};

using EncodingVector = std::vector<Encoding::type>;

struct ColumnWithEncodings {
Column column;
EncodingVector encodings;
};

std::shared_ptr<Field> FieldForArray(const std::shared_ptr<Array>& array,
std::string name) {
return field(std::move(name), array->type(), /*nullable=*/array->null_count() != 0);
Expand Down Expand Up @@ -135,13 +146,13 @@ std::vector<WriteConfig> GetWriteConfigurations() {
// clang-format on

std::vector<WriteConfig> configs;
configs.push_back({w_uncompressed, a_default});
configs.push_back({w_brotli, a_default});
configs.push_back({w_gzip, a_default});
configs.push_back({w_lz4, a_default});
configs.push_back({w_snappy, a_default});
configs.push_back({w_zstd, a_default});
configs.push_back({w_pages_v1, a_default});
configs.push_back({"uncompressed", w_uncompressed, a_default});
configs.push_back({"brotli", w_brotli, a_default});
configs.push_back({"gzip", w_gzip, a_default});
configs.push_back({"lz4", w_lz4, a_default});
configs.push_back({"snappy", w_snappy, a_default});
configs.push_back({"zstd", w_zstd, a_default});
configs.push_back({"v1_data_page", w_pages_v1, a_default});
return configs;
}

Expand All @@ -158,12 +169,14 @@ std::vector<WriteConfig> GetEncryptedWriteConfigurations(const ::arrow::Schema&
return builder;
};

std::vector<std::shared_ptr<FileEncryptionProperties>> file_encryptions;
std::vector<std::tuple<std::string, std::shared_ptr<FileEncryptionProperties>>>
file_encryptions;

// Uniform encryption
file_encryptions.push_back(file_encryption_builder().build());
file_encryptions.push_back({"uniform", file_encryption_builder().build()});
// Uniform encryption with plaintext footer
file_encryptions.push_back(file_encryption_builder().set_plaintext_footer()->build());
file_encryptions.push_back({"uniform_plaintext_footer",
file_encryption_builder().set_plaintext_footer()->build()});
// Columns encrypted with individual keys
{
ColumnPathToEncryptionPropertiesMap column_map;
Expand All @@ -174,7 +187,8 @@ std::vector<WriteConfig> GetEncryptedWriteConfigurations(const ::arrow::Schema&
}
ARROW_DCHECK_NE(column_map.size(), 0);
file_encryptions.push_back(
file_encryption_builder().encrypted_columns(std::move(column_map))->build());
{"column_keys",
file_encryption_builder().encrypted_columns(std::move(column_map))->build()});
}
// Unencrypted columns
{
Expand All @@ -184,15 +198,16 @@ std::vector<WriteConfig> GetEncryptedWriteConfigurations(const ::arrow::Schema&
}
ARROW_DCHECK_NE(column_map.size(), 0);
file_encryptions.push_back(
file_encryption_builder().encrypted_columns(std::move(column_map))->build());
{"unencrypted_columns",
file_encryption_builder().encrypted_columns(std::move(column_map))->build()});
}

auto a_default = MakeArrowPropertiesBuilder().build();

std::vector<WriteConfig> configs;
for (const auto& file_encryption : file_encryptions) {
for (auto [name, file_encryption] : file_encryptions) {
auto writer_properties = MakePropertiesBuilder().encryption(file_encryption)->build();
configs.push_back({writer_properties, a_default});
configs.push_back({name, writer_properties, a_default});
}
return configs;
}
Expand Down Expand Up @@ -369,6 +384,110 @@ Result<std::vector<Column>> ExampleColumns(int32_t length, double null_probabili
return columns;
}

template <typename T>
constexpr auto kMin = std::numeric_limits<T>::lowest();
template <typename T>
constexpr auto kMax = std::numeric_limits<T>::max();

// Generate columns for physical types along with their supported encodings
Result<std::vector<ColumnWithEncodings>> AllColumnsWithEncodings(
int32_t length, double null_probability = 0.2) {
const EncodingVector kIntEncodings = {Encoding::PLAIN, Encoding::RLE_DICTIONARY,
Encoding::DELTA_BINARY_PACKED,
Encoding::BYTE_STREAM_SPLIT};
const EncodingVector kFloatEncodings = {Encoding::PLAIN, Encoding::RLE_DICTIONARY,
Encoding::BYTE_STREAM_SPLIT};
const EncodingVector kBooleanEncodings = {Encoding::PLAIN, Encoding::RLE};
const EncodingVector kByteArrayEncodings = {Encoding::PLAIN, Encoding::RLE_DICTIONARY,
Encoding::DELTA_LENGTH_BYTE_ARRAY,
Encoding::DELTA_BYTE_ARRAY};
const EncodingVector kFixedLenByteArrayEncodings = {
Encoding::PLAIN, Encoding::RLE_DICTIONARY, Encoding::DELTA_BYTE_ARRAY,
Encoding::BYTE_STREAM_SPLIT};
const EncodingVector kInt96Encodings = {Encoding::PLAIN};

std::vector<ColumnWithEncodings> columns;

random::RandomArrayGenerator gen(42);
auto name_gen = Column::NameGenerator();

for (const double true_probability : {0.0, 0.001, 0.01, 0.5, 0.999}) {
columns.push_back(
{{name_gen(), gen.Boolean(length, true_probability, null_probability)},
kBooleanEncodings});
}

// Generate integer columns with different ranges to trigger delta encoding modes
columns.push_back(
{{name_gen(), gen.Int32(length, -100, 100, null_probability)}, kIntEncodings});
columns.push_back(
{{name_gen(), gen.Int32(length, kMin<int32_t>, kMax<int32_t>, null_probability)},
kIntEncodings});
columns.push_back({{name_gen(), gen.Int64(length, -100'000, 100'000, null_probability)},
kIntEncodings});
columns.push_back(
{{name_gen(), gen.Int64(length, kMin<int64_t>, kMax<int64_t>, null_probability)},
kIntEncodings});

// This won't necessarily span all 96 bits of precision, but PLAIN encoding allows
// the fuzzer to do its thing on the values.
for (auto unit : {TimeUnit::SECOND, TimeUnit::MILLI, TimeUnit::MICRO, TimeUnit::NANO}) {
ARROW_ASSIGN_OR_RAISE(
auto array, gen.Int64(length, kMin<int64_t>, kMax<int64_t>, null_probability)
->View(timestamp(unit)));
columns.push_back({{name_gen(), array}, kInt96Encodings});
}

// NOTE: will need to vary NaNs if there are encodings for which that matters (ALP?)
columns.push_back(
{{name_gen(), gen.Float32(length, -1.0, 1.0, null_probability)}, kFloatEncodings});
columns.push_back(
{{name_gen(), gen.Float32(length, kMin<float>, kMax<float>, null_probability)},
kFloatEncodings});
columns.push_back(
{{name_gen(), gen.Float64(length, -1.0, 1.0, null_probability)}, kFloatEncodings});
columns.push_back(
{{name_gen(), gen.Float64(length, kMin<double>, kMax<double>, null_probability)},
kFloatEncodings});

// For FLBA
columns.push_back(
{{name_gen(), gen.Float16(length, Float16(-1.0), Float16(1.0), null_probability)},
kFixedLenByteArrayEncodings});

// For BYTE_ARRAY (vary lengths and repetitions to trigger encoding modes)
columns.push_back({{name_gen(), gen.String(length, /*min_length=*/0,
/*max_length=*/20, null_probability)},
kByteArrayEncodings});
columns.push_back({{name_gen(), gen.String(length, /*min_length=*/12,
/*max_length=*/14, null_probability)},
kByteArrayEncodings});
columns.push_back({{name_gen(), gen.String(length, /*min_length=*/100,
/*max_length=*/200, null_probability)},
kByteArrayEncodings});
columns.push_back({{name_gen(), gen.StringWithRepeats(
length, /*unique=*/length / 50, /*min_length=*/0,
/*max_length=*/20, null_probability)},
kByteArrayEncodings});
columns.push_back({{name_gen(), gen.StringWithRepeats(
length, /*unique=*/length / 100, /*min_length=*/12,
/*max_length=*/14, null_probability)},
kByteArrayEncodings});

return columns;
}

Result<std::vector<ColumnWithEncodings>> AllColumnsWithEncodings() {
std::vector<ColumnWithEncodings> all_columns;

for (const double null_probability : {0.0, 0.01, 0.5, 1.0}) {
ARROW_ASSIGN_OR_RAISE(auto columns,
AllColumnsWithEncodings(/*length=*/1'000, null_probability));
all_columns.insert(all_columns.end(), columns.begin(), columns.end());
}
return all_columns;
}

Result<std::shared_ptr<RecordBatch>> BatchFromColumns(const std::vector<Column>& columns,
int64_t num_rows) {
FieldVector fields;
Expand Down Expand Up @@ -425,13 +544,25 @@ Status DoMain(const std::string& out_dir) {
RETURN_NOT_OK(CreateDir(dir_fn));

int sample_num = 1;
auto sample_name = [&]() -> std::string {
return "pq-table-" + std::to_string(sample_num++);
auto sample_file_name = [&](const std::string& name = "") -> std::string {
std::stringstream ss;
if (!name.empty()) {
ss << name << "-";
}
ss << sample_num++ << ".pq";
return std::move(ss).str();
};

auto write_sample = [&](const std::shared_ptr<Table>& table,
const WriteConfig& config) -> Status {
ARROW_ASSIGN_OR_RAISE(auto sample_fn, dir_fn.Join(sample_name()));
auto write_sample = [&](const std::shared_ptr<Table>& table, const WriteConfig& config,
std::string name = "") -> Status {
if (name.empty() && table->num_columns() == 1) {
name = table->schema()->field(0)->type()->name();
}
if (!name.empty()) {
name += "-";
}
name += config.name;
ARROW_ASSIGN_OR_RAISE(auto sample_fn, dir_fn.Join(sample_file_name(name)));
std::cerr << sample_fn.ToString() << std::endl;
ARROW_ASSIGN_OR_RAISE(auto file, io::FileOutputStream::Open(sample_fn.ToString()));
// Emit several row groups
Expand All @@ -443,7 +574,7 @@ Status DoMain(const std::string& out_dir) {
};

{
// 1. Unencrypted files
// 1. Unencrypted files for various write configurations
// Write a cardinal product of example batches x write configurations
ARROW_ASSIGN_OR_RAISE(auto batches, Batches());
auto write_configs = GetWriteConfigurations();
Expand All @@ -458,7 +589,37 @@ Status DoMain(const std::string& out_dir) {
}
}
{
// 2. Encrypted files
// 2. Unencrypted files for various column encodings
// Write one file per (column, encoding) pair.
ARROW_ASSIGN_OR_RAISE(auto columns, AllColumnsWithEncodings());

for (const auto& column : columns) {
RETURN_NOT_OK(column.column.array->ValidateFull());
ARROW_ASSIGN_OR_RAISE(
auto batch, BatchFromColumn(column.column, column.column.array->length()));
ARROW_ASSIGN_OR_RAISE(auto table, Table::FromRecordBatches({batch}));

for (const auto encoding : column.encodings) {
auto w_props_builder = MakePropertiesBuilder();
if (encoding == Encoding::RLE_DICTIONARY) {
// RLE_DICTIONARY is enabled through enable_dictionary() rather than
// encoding(), also increase the dictionary page size limit as we
// generate data with a typically high cardinality.
w_props_builder.enable_dictionary()->dictionary_pagesize_limit(1'000'000);
} else {
w_props_builder.disable_dictionary()->encoding(encoding);
}
auto w_props = w_props_builder.build();
// Ensure that we generate INT96 Parquet data when given a timestamp column
auto a_props =
MakeArrowPropertiesBuilder().enable_deprecated_int96_timestamps()->build();
auto config_name = ::parquet::EncodingToString(encoding);
RETURN_NOT_OK(write_sample(table, WriteConfig{config_name, w_props, a_props}));
}
}
}
{
// 3. Encrypted files
// Use a single batch and write it using different configurations
ARROW_ASSIGN_OR_RAISE(auto batch, BatchForEncryption());
auto write_configs = GetEncryptedWriteConfigurations(*batch->schema());
Expand Down
Loading