Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
131 changes: 100 additions & 31 deletions lang/c++/impl/DataFile.cc
Original file line number Diff line number Diff line change
Expand Up @@ -64,36 +64,105 @@ const size_t maxSyncInterval = 1u << 30;
// Recommended by https://www.zlib.net/zlib_how.html
const size_t zlibBufGrowSize = 128 * 1024;

std::string codecToString(Codec codec) {
switch (codec) {
case NULL_CODEC: return "null";
case DEFLATE_CODEC: return "deflate";
case SNAPPY_CODEC: return "snappy";
case ZSTD_CODEC: return "zstandard";
default: return "unknown";
}
}

void validateCodec(Codec codec, std::optional<int> level) {
if (!isCodecAvailable(codec)) {
throw Exception("Codec {} is not available.", codecToString(codec));
}

if (!level.has_value()) {
return;
}

int levelValue = level.value();
switch (codec) {
case NULL_CODEC:
case SNAPPY_CODEC:
// These codecs don't support compression levels, ignore
break;
case DEFLATE_CODEC:
if (levelValue < 0 || levelValue > 9) {
throw Exception("Invalid compression level {} for deflate codec. "
"Valid range is 0-9.",
levelValue);
}
break;
case ZSTD_CODEC:
if (levelValue < 1 || levelValue > 22) {
throw Exception("Invalid compression level {} for zstandard codec. "
"Valid range is 1-22.",
levelValue);
}
break;
default:
throw Exception("Unknown codec: {}", static_cast<int>(codec));
}
}

} // namespace

bool isCodecAvailable(Codec codec) {
switch (codec) {
case NULL_CODEC:
case DEFLATE_CODEC:
return true;
case SNAPPY_CODEC:
#ifdef SNAPPY_CODEC_AVAILABLE
return true;
#else
return false;
#endif
case ZSTD_CODEC:
#ifdef ZSTD_CODEC_AVAILABLE
return true;
#else
return false;
#endif
default:
return false;
}
}

DataFileWriterBase::DataFileWriterBase(const char *filename, const ValidSchema &schema, size_t syncInterval,
Codec codec, const Metadata &metadata) : filename_(filename),
schema_(schema),
encoderPtr_(binaryEncoder()),
syncInterval_(syncInterval),
codec_(codec),
stream_(fileOutputStream(filename)),
buffer_(memoryOutputStream()),
sync_(makeSync()),
objectCount_(0),
metadata_(metadata),
lastSync_(0) {
Codec codec, const Metadata &metadata,
std::optional<int> compressionLevel) : filename_(filename),
schema_(schema),
encoderPtr_(binaryEncoder()),
syncInterval_(syncInterval),
codec_(codec),
compressionLevel_(compressionLevel),
stream_(fileOutputStream(filename)),
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because validateCodec() runs inside init() after stream_(fileOutputStream(filename)) has already executed, an unavailable codec or invalid compression level can still create/truncate the target file before throwing. This is a new side-effect path introduced by the compression-level validation.

Fix This in Augment

🤖 Was this useful? React with 👍 or 👎

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

value:useful; category:bug; feedback:The Augment AI reviewer is correct! The output file is created/truncated and then the validation checks are executed. This is an old issue - it was the same until now if syncInterval was invalid. Prevents creating/truncating the output file if it won't be used due to validation failure.

buffer_(memoryOutputStream()),
sync_(makeSync()),
objectCount_(0),
metadata_(metadata),
lastSync_(0) {
init(schema, syncInterval, codec);
}

DataFileWriterBase::DataFileWriterBase(std::unique_ptr<OutputStream> outputStream,
const ValidSchema &schema, size_t syncInterval,
Codec codec, const Metadata &metadata) : filename_(),
schema_(schema),
encoderPtr_(binaryEncoder()),
syncInterval_(syncInterval),
codec_(codec),
stream_(std::move(outputStream)),
buffer_(memoryOutputStream()),
sync_(makeSync()),
objectCount_(0),
metadata_(metadata),
lastSync_(0) {
DataFileWriterBase::DataFileWriterBase(std::unique_ptr<OutputStream> outputStream, const ValidSchema &schema,
size_t syncInterval, Codec codec, const Metadata &metadata,
std::optional<int> compressionLevel) : filename_(),
schema_(schema),
encoderPtr_(binaryEncoder()),
syncInterval_(syncInterval),
codec_(codec),
compressionLevel_(compressionLevel),
stream_(std::move(outputStream)),
buffer_(memoryOutputStream()),
sync_(makeSync()),
objectCount_(0),
metadata_(metadata),
lastSync_(0) {
init(schema, syncInterval, codec);
}

Expand All @@ -103,20 +172,17 @@ void DataFileWriterBase::init(const ValidSchema &schema, size_t syncInterval, co
"Invalid sync interval: {}. Should be between {} and {}",
syncInterval, minSyncInterval, maxSyncInterval);
}
setMetadata(AVRO_CODEC_KEY, AVRO_NULL_CODEC);

validateCodec(codec_, compressionLevel_);
setMetadata(AVRO_CODEC_KEY, AVRO_NULL_CODEC);
if (codec_ == NULL_CODEC) {
setMetadata(AVRO_CODEC_KEY, AVRO_NULL_CODEC);
} else if (codec_ == DEFLATE_CODEC) {
setMetadata(AVRO_CODEC_KEY, AVRO_DEFLATE_CODEC);
#ifdef SNAPPY_CODEC_AVAILABLE
} else if (codec_ == SNAPPY_CODEC) {
setMetadata(AVRO_CODEC_KEY, AVRO_SNAPPY_CODEC);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This references AVRO_SNAPPY_CODEC (and similarly AVRO_ZSTD_CODEC below) unconditionally, but those metadata strings are still only defined under #ifdef SNAPPY_CODEC_AVAILABLE / #ifdef ZSTD_CODEC_AVAILABLE earlier in this file. As a result, builds without those optional codecs enabled will fail to compile here.

Fix This in Augment

🤖 Was this useful? React with 👍 or 👎

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

value:useful; category:bug; feedback:The Augment AI reviewer is correct! The Pull Request author tried to simplify the code by removing the usage of the build arguments but introduced build errors in case some of the arguments are not defined.

#endif
#ifdef ZSTD_CODEC_AVAILABLE
} else if (codec_ == ZSTD_CODEC) {
setMetadata(AVRO_CODEC_KEY, AVRO_ZSTD_CODEC);
#endif
} else {
throw Exception("Unknown codec: {}", int(codec));
}
Expand Down Expand Up @@ -160,7 +226,10 @@ void DataFileWriterBase::sync() {
zs.zfree = Z_NULL;
zs.opaque = Z_NULL;

int ret = deflateInit2(&zs, Z_DEFAULT_COMPRESSION, Z_DEFLATED, -15, 8, Z_DEFAULT_STRATEGY);
// Use Z_DEFAULT_COMPRESSION if no level specified
int effectiveLevel = compressionLevel_.value_or(Z_DEFAULT_COMPRESSION);

int ret = deflateInit2(&zs, effectiveLevel, Z_DEFLATED, -15, 8, Z_DEFAULT_STRATEGY);
if (ret != Z_OK) {
throw Exception("Failed to initialize deflate, error: {}", ret);
}
Expand Down Expand Up @@ -246,7 +315,7 @@ void DataFileWriterBase::sync() {
}

ZstdCompressWrapper zstdCompressWrapper;
std::vector<char> compressed = zstdCompressWrapper.compress(uncompressed);
std::vector<char> compressed = zstdCompressWrapper.compress(uncompressed, compressionLevel_);

std::unique_ptr<InputStream> in = memoryInputStream(
reinterpret_cast<const uint8_t *>(compressed.data()), compressed.size());
Expand Down
4 changes: 2 additions & 2 deletions lang/c++/impl/ZstdCompressWrapper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@

namespace avro {

std::vector<char> ZstdCompressWrapper::compress(const std::vector<char> &uncompressed) {
std::vector<char> ZstdCompressWrapper::compress(const std::vector<char> &uncompressed, std::optional<int> compressionLevel) {
// Pre-allocate buffer for compressed data
size_t max_compressed_size = ZSTD_compressBound(uncompressed.size());
if (ZSTD_isError(max_compressed_size)) {
Expand All @@ -37,7 +37,7 @@ std::vector<char> ZstdCompressWrapper::compress(const std::vector<char> &uncompr
size_t compressed_size = ZSTD_compress(
compressed.data(), max_compressed_size,
uncompressed.data(), uncompressed.size(),
ZSTD_CLEVEL_DEFAULT);
compressionLevel.value_or(ZSTD_CLEVEL_DEFAULT));

if (ZSTD_isError(compressed_size)) {
throw Exception("ZSTD compression error: {}", ZSTD_getErrorName(compressed_size));
Expand Down
3 changes: 2 additions & 1 deletion lang/c++/impl/ZstdCompressWrapper.hh
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

#ifdef ZSTD_CODEC_AVAILABLE

#include <optional>
#include <vector>

#include <zstd.h>
Expand All @@ -32,7 +33,7 @@ public:
ZstdCompressWrapper();
~ZstdCompressWrapper();

std::vector<char> compress(const std::vector<char> &uncompressed);
std::vector<char> compress(const std::vector<char> &uncompressed, std::optional<int> compressionLevel = std::nullopt);

private:
ZSTD_CCtx *cctx_ = nullptr;
Expand Down
26 changes: 14 additions & 12 deletions lang/c++/include/avro/DataFile.hh
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

#include <array>
#include <map>
#include <optional>
#include <string>
#include <vector>

Expand All @@ -37,17 +38,15 @@ namespace avro {
enum Codec {
NULL_CODEC = 0,
DEFLATE_CODEC = 1,

#ifdef SNAPPY_CODEC_AVAILABLE
SNAPPY_CODEC = 2,
#endif

#ifdef ZSTD_CODEC_AVAILABLE
ZSTD_CODEC = 3,
#endif

ZSTD_CODEC = 3
};

/**
* Returns true if the specified codec is available at runtime.
*/
AVRO_DECL bool isCodecAvailable(Codec codec);

const int SyncSize = 16;
/**
* The sync value.
Expand Down Expand Up @@ -81,6 +80,7 @@ class AVRO_DECL DataFileWriterBase {
const EncoderPtr encoderPtr_;
const size_t syncInterval_;
Codec codec_;
std::optional<int> compressionLevel_;

std::unique_ptr<OutputStream> stream_;
std::unique_ptr<OutputStream> buffer_;
Expand Down Expand Up @@ -134,10 +134,10 @@ public:
*/
DataFileWriterBase(const char *filename, const ValidSchema &schema,
size_t syncInterval, Codec codec = NULL_CODEC,
const Metadata &metadata = {});
const Metadata &metadata = {}, std::optional<int> compressionLevel = std::nullopt);
DataFileWriterBase(std::unique_ptr<OutputStream> outputStream,
const ValidSchema &schema, size_t syncInterval, Codec codec,
const Metadata &metadata = {});
const Metadata &metadata = {}, std::optional<int> compressionLevel = std::nullopt);

DataFileWriterBase(const DataFileWriterBase &) = delete;
DataFileWriterBase &operator=(const DataFileWriterBase &) = delete;
Expand Down Expand Up @@ -173,11 +173,13 @@ public:
*/
DataFileWriter(const char *filename, const ValidSchema &schema,
size_t syncInterval = 16 * 1024, Codec codec = NULL_CODEC,
const Metadata &metadata = {}) : base_(std::make_unique<DataFileWriterBase>(filename, schema, syncInterval, codec, metadata)) {}
const Metadata &metadata = {}, std::optional<int> compressionLevel = std::nullopt)
: base_(std::make_unique<DataFileWriterBase>(filename, schema, syncInterval, codec, metadata, compressionLevel)) {}

DataFileWriter(std::unique_ptr<OutputStream> outputStream, const ValidSchema &schema,
size_t syncInterval = 16 * 1024, Codec codec = NULL_CODEC,
const Metadata &metadata = {}) : base_(std::make_unique<DataFileWriterBase>(std::move(outputStream), schema, syncInterval, codec, metadata)) {}
const Metadata &metadata = {}, std::optional<int> compressionLevel = std::nullopt)
: base_(std::make_unique<DataFileWriterBase>(std::move(outputStream), schema, syncInterval, codec, metadata, compressionLevel)) {}

DataFileWriter(const DataFileWriter &) = delete;
DataFileWriter &operator=(const DataFileWriter &) = delete;
Expand Down
Loading