diff --git a/lang/c++/impl/DataFile.cc b/lang/c++/impl/DataFile.cc index e605a6f4480..61caddac454 100644 --- a/lang/c++/impl/DataFile.cc +++ b/lang/c++/impl/DataFile.cc @@ -64,36 +64,105 @@ const size_t maxSyncInterval = 1u << 30; // Recommended by https://www.zlib.net/zlib_how.html const size_t zlibBufGrowSize = 128 * 1024; +std::string codecToString(Codec codec) { + switch (codec) { + case NULL_CODEC: return "null"; + case DEFLATE_CODEC: return "deflate"; + case SNAPPY_CODEC: return "snappy"; + case ZSTD_CODEC: return "zstandard"; + default: return "unknown"; + } +} + +void validateCodec(Codec codec, std::optional level) { + if (!isCodecAvailable(codec)) { + throw Exception("Codec {} is not available.", codecToString(codec)); + } + + if (!level.has_value()) { + return; + } + + int levelValue = level.value(); + switch (codec) { + case NULL_CODEC: + case SNAPPY_CODEC: + // These codecs don't support compression levels, ignore + break; + case DEFLATE_CODEC: + if (levelValue < 0 || levelValue > 9) { + throw Exception("Invalid compression level {} for deflate codec. " + "Valid range is 0-9.", + levelValue); + } + break; + case ZSTD_CODEC: + if (levelValue < 1 || levelValue > 22) { + throw Exception("Invalid compression level {} for zstandard codec. " + "Valid range is 1-22.", + levelValue); + } + break; + default: + throw Exception("Unknown codec: {}", static_cast(codec)); + } +} + } // namespace +bool isCodecAvailable(Codec codec) { + switch (codec) { + case NULL_CODEC: + case DEFLATE_CODEC: + return true; + case SNAPPY_CODEC: +#ifdef SNAPPY_CODEC_AVAILABLE + return true; +#else + return false; +#endif + case ZSTD_CODEC: +#ifdef ZSTD_CODEC_AVAILABLE + return true; +#else + return false; +#endif + default: + return false; + } +} + DataFileWriterBase::DataFileWriterBase(const char *filename, const ValidSchema &schema, size_t syncInterval, - Codec codec, const Metadata &metadata) : filename_(filename), - schema_(schema), - encoderPtr_(binaryEncoder()), - syncInterval_(syncInterval), - codec_(codec), - stream_(fileOutputStream(filename)), - buffer_(memoryOutputStream()), - sync_(makeSync()), - objectCount_(0), - metadata_(metadata), - lastSync_(0) { + Codec codec, const Metadata &metadata, + std::optional compressionLevel) : filename_(filename), + schema_(schema), + encoderPtr_(binaryEncoder()), + syncInterval_(syncInterval), + codec_(codec), + compressionLevel_(compressionLevel), + stream_(fileOutputStream(filename)), + buffer_(memoryOutputStream()), + sync_(makeSync()), + objectCount_(0), + metadata_(metadata), + lastSync_(0) { init(schema, syncInterval, codec); } -DataFileWriterBase::DataFileWriterBase(std::unique_ptr outputStream, - const ValidSchema &schema, size_t syncInterval, - Codec codec, const Metadata &metadata) : filename_(), - schema_(schema), - encoderPtr_(binaryEncoder()), - syncInterval_(syncInterval), - codec_(codec), - stream_(std::move(outputStream)), - buffer_(memoryOutputStream()), - sync_(makeSync()), - objectCount_(0), - metadata_(metadata), - lastSync_(0) { +DataFileWriterBase::DataFileWriterBase(std::unique_ptr outputStream, const ValidSchema &schema, + size_t syncInterval, Codec codec, const Metadata &metadata, + std::optional compressionLevel) : filename_(), + schema_(schema), + encoderPtr_(binaryEncoder()), + syncInterval_(syncInterval), + codec_(codec), + compressionLevel_(compressionLevel), + stream_(std::move(outputStream)), + buffer_(memoryOutputStream()), + sync_(makeSync()), + objectCount_(0), + metadata_(metadata), + lastSync_(0) { init(schema, syncInterval, codec); } @@ -103,20 +172,17 @@ void DataFileWriterBase::init(const ValidSchema &schema, size_t syncInterval, co "Invalid sync interval: {}. Should be between {} and {}", syncInterval, minSyncInterval, maxSyncInterval); } - setMetadata(AVRO_CODEC_KEY, AVRO_NULL_CODEC); + validateCodec(codec_, compressionLevel_); + setMetadata(AVRO_CODEC_KEY, AVRO_NULL_CODEC); if (codec_ == NULL_CODEC) { setMetadata(AVRO_CODEC_KEY, AVRO_NULL_CODEC); } else if (codec_ == DEFLATE_CODEC) { setMetadata(AVRO_CODEC_KEY, AVRO_DEFLATE_CODEC); -#ifdef SNAPPY_CODEC_AVAILABLE } else if (codec_ == SNAPPY_CODEC) { setMetadata(AVRO_CODEC_KEY, AVRO_SNAPPY_CODEC); -#endif -#ifdef ZSTD_CODEC_AVAILABLE } else if (codec_ == ZSTD_CODEC) { setMetadata(AVRO_CODEC_KEY, AVRO_ZSTD_CODEC); -#endif } else { throw Exception("Unknown codec: {}", int(codec)); } @@ -160,7 +226,10 @@ void DataFileWriterBase::sync() { zs.zfree = Z_NULL; zs.opaque = Z_NULL; - int ret = deflateInit2(&zs, Z_DEFAULT_COMPRESSION, Z_DEFLATED, -15, 8, Z_DEFAULT_STRATEGY); + // Use Z_DEFAULT_COMPRESSION if no level specified + int effectiveLevel = compressionLevel_.value_or(Z_DEFAULT_COMPRESSION); + + int ret = deflateInit2(&zs, effectiveLevel, Z_DEFLATED, -15, 8, Z_DEFAULT_STRATEGY); if (ret != Z_OK) { throw Exception("Failed to initialize deflate, error: {}", ret); } @@ -246,7 +315,7 @@ void DataFileWriterBase::sync() { } ZstdCompressWrapper zstdCompressWrapper; - std::vector compressed = zstdCompressWrapper.compress(uncompressed); + std::vector compressed = zstdCompressWrapper.compress(uncompressed, compressionLevel_); std::unique_ptr in = memoryInputStream( reinterpret_cast(compressed.data()), compressed.size()); diff --git a/lang/c++/impl/ZstdCompressWrapper.cc b/lang/c++/impl/ZstdCompressWrapper.cc index b756e19d8ea..fecf335ef6f 100644 --- a/lang/c++/impl/ZstdCompressWrapper.cc +++ b/lang/c++/impl/ZstdCompressWrapper.cc @@ -25,7 +25,7 @@ namespace avro { -std::vector ZstdCompressWrapper::compress(const std::vector &uncompressed) { +std::vector ZstdCompressWrapper::compress(const std::vector &uncompressed, std::optional compressionLevel) { // Pre-allocate buffer for compressed data size_t max_compressed_size = ZSTD_compressBound(uncompressed.size()); if (ZSTD_isError(max_compressed_size)) { @@ -37,7 +37,7 @@ std::vector ZstdCompressWrapper::compress(const std::vector &uncompr size_t compressed_size = ZSTD_compress( compressed.data(), max_compressed_size, uncompressed.data(), uncompressed.size(), - ZSTD_CLEVEL_DEFAULT); + compressionLevel.value_or(ZSTD_CLEVEL_DEFAULT)); if (ZSTD_isError(compressed_size)) { throw Exception("ZSTD compression error: {}", ZSTD_getErrorName(compressed_size)); diff --git a/lang/c++/impl/ZstdCompressWrapper.hh b/lang/c++/impl/ZstdCompressWrapper.hh index 871dd711a66..419fb261770 100644 --- a/lang/c++/impl/ZstdCompressWrapper.hh +++ b/lang/c++/impl/ZstdCompressWrapper.hh @@ -21,6 +21,7 @@ #ifdef ZSTD_CODEC_AVAILABLE +#include #include #include @@ -32,7 +33,7 @@ public: ZstdCompressWrapper(); ~ZstdCompressWrapper(); - std::vector compress(const std::vector &uncompressed); + std::vector compress(const std::vector &uncompressed, std::optional compressionLevel = std::nullopt); private: ZSTD_CCtx *cctx_ = nullptr; diff --git a/lang/c++/include/avro/DataFile.hh b/lang/c++/include/avro/DataFile.hh index 27f9c43913e..4a16a3bd5d3 100644 --- a/lang/c++/include/avro/DataFile.hh +++ b/lang/c++/include/avro/DataFile.hh @@ -28,6 +28,7 @@ #include #include +#include #include #include @@ -37,17 +38,15 @@ namespace avro { enum Codec { NULL_CODEC = 0, DEFLATE_CODEC = 1, - -#ifdef SNAPPY_CODEC_AVAILABLE SNAPPY_CODEC = 2, -#endif - -#ifdef ZSTD_CODEC_AVAILABLE - ZSTD_CODEC = 3, -#endif - + ZSTD_CODEC = 3 }; +/** + * Returns true if the specified codec is available at runtime. + */ +AVRO_DECL bool isCodecAvailable(Codec codec); + const int SyncSize = 16; /** * The sync value. @@ -81,6 +80,7 @@ class AVRO_DECL DataFileWriterBase { const EncoderPtr encoderPtr_; const size_t syncInterval_; Codec codec_; + std::optional compressionLevel_; std::unique_ptr stream_; std::unique_ptr buffer_; @@ -134,10 +134,10 @@ public: */ DataFileWriterBase(const char *filename, const ValidSchema &schema, size_t syncInterval, Codec codec = NULL_CODEC, - const Metadata &metadata = {}); + const Metadata &metadata = {}, std::optional compressionLevel = std::nullopt); DataFileWriterBase(std::unique_ptr outputStream, const ValidSchema &schema, size_t syncInterval, Codec codec, - const Metadata &metadata = {}); + const Metadata &metadata = {}, std::optional compressionLevel = std::nullopt); DataFileWriterBase(const DataFileWriterBase &) = delete; DataFileWriterBase &operator=(const DataFileWriterBase &) = delete; @@ -173,11 +173,13 @@ public: */ DataFileWriter(const char *filename, const ValidSchema &schema, size_t syncInterval = 16 * 1024, Codec codec = NULL_CODEC, - const Metadata &metadata = {}) : base_(std::make_unique(filename, schema, syncInterval, codec, metadata)) {} + const Metadata &metadata = {}, std::optional compressionLevel = std::nullopt) + : base_(std::make_unique(filename, schema, syncInterval, codec, metadata, compressionLevel)) {} DataFileWriter(std::unique_ptr outputStream, const ValidSchema &schema, size_t syncInterval = 16 * 1024, Codec codec = NULL_CODEC, - const Metadata &metadata = {}) : base_(std::make_unique(std::move(outputStream), schema, syncInterval, codec, metadata)) {} + const Metadata &metadata = {}, std::optional compressionLevel = std::nullopt) + : base_(std::make_unique(std::move(outputStream), schema, syncInterval, codec, metadata, compressionLevel)) {} DataFileWriter(const DataFileWriter &) = delete; DataFileWriter &operator=(const DataFileWriter &) = delete; diff --git a/lang/c++/test/DataFileTests.cc b/lang/c++/test/DataFileTests.cc index 3a44970c553..ed4e93c0663 100644 --- a/lang/c++/test/DataFileTests.cc +++ b/lang/c++/test/DataFileTests.cc @@ -1284,6 +1284,219 @@ void testMetadataWithZstdCodec() { } #endif +void testDeflateCompressionLevelValidation() { + BOOST_TEST_CHECKPOINT(__func__); + + avro::ValidSchema schema = avro::compileJsonSchemaFromString(sch); + const char *filename = "test_deflate_level.df"; + + boost::mt19937 rng(static_cast(time(nullptr))); + boost::random::uniform_int_distribution<> dist(-100, 100); + + for (int i = 0; i < 100; ++i) { + int level = dist(rng); + bool isValidLevel = (level >= 0 && level <= 9); + + if (isValidLevel) { + // Valid levels should succeed + BOOST_CHECK_NO_THROW({ + avro::DataFileWriter writer( + filename, schema, 16 * 1024, avro::DEFLATE_CODEC, {}, level); + writer.close(); + }); + } else { + // Invalid levels should throw + BOOST_CHECK_THROW({ avro::DataFileWriter writer( + filename, schema, 16 * 1024, avro::DEFLATE_CODEC, {}, level); }, avro::Exception); + } + } + + BOOST_CHECK_NO_THROW({ + avro::DataFileWriter writer( + filename, schema, 16 * 1024, avro::DEFLATE_CODEC, {}, std::nullopt); + writer.close(); + }); + + std::filesystem::remove(filename); +} + +#ifdef ZSTD_CODEC_AVAILABLE +void testZstdCompressionLevelValidation() { + BOOST_TEST_CHECKPOINT(__func__); + + avro::ValidSchema schema = avro::compileJsonSchemaFromString(sch); + const char *filename = "test_zstd_level.df"; + + boost::mt19937 rng(static_cast(time(nullptr))); + boost::random::uniform_int_distribution<> dist(-100, 100); + + for (int i = 0; i < 100; ++i) { + int level = dist(rng); + bool isValidLevel = (level >= 1 && level <= 22); + + if (isValidLevel) { + // Valid levels should succeed + BOOST_CHECK_NO_THROW({ + avro::DataFileWriter writer( + filename, schema, 16 * 1024, avro::ZSTD_CODEC, {}, level); + writer.close(); + }); + } else { + // Invalid levels should throw + BOOST_CHECK_THROW({ avro::DataFileWriter writer( + filename, schema, 16 * 1024, avro::ZSTD_CODEC, {}, level); }, avro::Exception); + } + } + + BOOST_CHECK_NO_THROW({ + avro::DataFileWriter writer( + filename, schema, 16 * 1024, avro::ZSTD_CODEC, {}, std::nullopt); + writer.close(); + }); + + std::filesystem::remove(filename); +} +#endif + +void testDeflateCompressionRoundTrip() { + BOOST_TEST_CHECKPOINT(__func__); + + avro::ValidSchema schema = avro::compileJsonSchemaFromString(sch); + const char *filename = "test_deflate_roundtrip.df"; + + boost::mt19937 rng(static_cast(time(nullptr))); + boost::random::uniform_int_distribution<> levelDist(0, 10); // 0-9 valid, 10 = nullopt + boost::random::uniform_int_distribution<> dataDist(1, 1000); + + for (int i = 0; i < 100; ++i) { + int rawLevel = levelDist(rng); + std::optional level = (rawLevel == 10) ? std::nullopt : std::optional(rawLevel); + int numRecords = dataDist(rng) % 100 + 1; + + std::vector originalData; + int64_t re = rng(); + int64_t im = rng(); + for (int j = 0; j < numRecords; ++j) { + originalData.emplace_back(re, im); + re = re * 31 + im; + im = im * 17 + re; + } + + // Write with compression level + { + avro::DataFileWriter writer( + filename, schema, 16 * 1024, avro::DEFLATE_CODEC, {}, level); + for (const auto &record : originalData) { + writer.write(record); + } + writer.close(); + } + + // Read back and verify + { + avro::DataFileReader reader(filename, schema); + std::vector readData; + ComplexInteger record; + while (reader.read(record)) { + readData.push_back(record); + } + + BOOST_CHECK_EQUAL(readData.size(), originalData.size()); + for (size_t j = 0; j < originalData.size() && j < readData.size(); ++j) { + BOOST_CHECK_EQUAL(readData[j].re, originalData[j].re); + BOOST_CHECK_EQUAL(readData[j].im, originalData[j].im); + } + } + } + + std::filesystem::remove(filename); +} + +#ifdef ZSTD_CODEC_AVAILABLE +void testZstdCompressionRoundTrip() { + BOOST_TEST_CHECKPOINT(__func__); + + avro::ValidSchema schema = avro::compileJsonSchemaFromString(sch); + const char *filename = "test_zstd_roundtrip.df"; + + boost::mt19937 rng(static_cast(time(nullptr))); + // Valid ZSTD levels: 1-22 + boost::random::uniform_int_distribution<> levelDist(0, 22); // 0 = nullopt, 1-22 = valid levels + boost::random::uniform_int_distribution<> dataDist(1, 1000); + + for (int i = 0; i < 100; ++i) { + int rawLevel = levelDist(rng); + std::optional level = (rawLevel == 0) ? std::nullopt : std::optional(rawLevel); + int numRecords = dataDist(rng) % 100 + 1; + + std::vector originalData; + int64_t re = rng(); + int64_t im = rng(); + for (int j = 0; j < numRecords; ++j) { + originalData.emplace_back(re, im); + re = re * 31 + im; + im = im * 17 + re; + } + + // Write with compression level + { + avro::DataFileWriter writer( + filename, schema, 16 * 1024, avro::ZSTD_CODEC, {}, level); + for (const auto &record : originalData) { + writer.write(record); + } + writer.close(); + } + + // Read back and verify + { + avro::DataFileReader reader(filename, schema); + std::vector readData; + ComplexInteger record; + while (reader.read(record)) { + readData.push_back(record); + } + + BOOST_CHECK_EQUAL(readData.size(), originalData.size()); + for (size_t j = 0; j < originalData.size() && j < readData.size(); ++j) { + BOOST_CHECK_EQUAL(readData[j].re, originalData[j].re); + BOOST_CHECK_EQUAL(readData[j].im, originalData[j].im); + } + } + } + + std::filesystem::remove(filename); +} +#endif + +void testCodecEnumValues() { + BOOST_TEST_CHECKPOINT(__func__); + + BOOST_CHECK_EQUAL(static_cast(avro::NULL_CODEC), 0); + BOOST_CHECK_EQUAL(static_cast(avro::DEFLATE_CODEC), 1); + BOOST_CHECK_EQUAL(static_cast(avro::SNAPPY_CODEC), 2); + BOOST_CHECK_EQUAL(static_cast(avro::ZSTD_CODEC), 3); +} + +void testIsCodecAvailable() { + BOOST_TEST_CHECKPOINT(__func__); + + BOOST_CHECK_EQUAL(avro::isCodecAvailable(avro::NULL_CODEC), true); + BOOST_CHECK_EQUAL(avro::isCodecAvailable(avro::DEFLATE_CODEC), true); + +#ifdef SNAPPY_CODEC_AVAILABLE + BOOST_CHECK_EQUAL(avro::isCodecAvailable(avro::SNAPPY_CODEC), true); +#else + BOOST_CHECK_EQUAL(avro::isCodecAvailable(avro::SNAPPY_CODEC), false); +#endif + +#ifdef ZSTD_CODEC_AVAILABLE + BOOST_CHECK_EQUAL(avro::isCodecAvailable(avro::ZSTD_CODEC), true); +#else + BOOST_CHECK_EQUAL(avro::isCodecAvailable(avro::ZSTD_CODEC), false); +#endif +} + test_suite * init_unit_test_suite(int, char *[]) { { @@ -1487,5 +1700,21 @@ init_unit_test_suite(int, char *[]) { boost::unit_test::framework::master_test_suite().add(BOOST_TEST_CASE(&testMetadataWithZstdCodec)); #endif + // Codec enum and isCodecAvailable tests + boost::unit_test::framework::master_test_suite().add(BOOST_TEST_CASE(&testCodecEnumValues)); + boost::unit_test::framework::master_test_suite().add(BOOST_TEST_CASE(&testIsCodecAvailable)); + + // Compression level validation property tests + boost::unit_test::framework::master_test_suite().add(BOOST_TEST_CASE(&testDeflateCompressionLevelValidation)); +#ifdef ZSTD_CODEC_AVAILABLE + boost::unit_test::framework::master_test_suite().add(BOOST_TEST_CASE(&testZstdCompressionLevelValidation)); +#endif + + // Compression round-trip property tests + boost::unit_test::framework::master_test_suite().add(BOOST_TEST_CASE(&testDeflateCompressionRoundTrip)); +#ifdef ZSTD_CODEC_AVAILABLE + boost::unit_test::framework::master_test_suite().add(BOOST_TEST_CASE(&testZstdCompressionRoundTrip)); +#endif + return nullptr; }