Skip to content

Commit 66436bf

Browse files
drymancopybara-github
authored andcommitted
Improve compression ratio by disabling the chunk padding by default.
Also set group_size default to 1 which is the most common ArrayRecord use case. PiperOrigin-RevId: 842234733
1 parent 6662d1d commit 66436bf

File tree

8 files changed

+79
-27
lines changed

8 files changed

+79
-27
lines changed

MODULE.bazel

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
# limitations under the License.
1414

1515
# TODO(fchern): automate version string alignment with setup.py
16-
VERSION = "0.8.3"
16+
VERSION = "0.8.4"
1717

1818
module(
1919
name = "array_record",

cpp/BUILD

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -276,6 +276,7 @@ cc_test(
276276

277277
cc_test(
278278
name = "array_record_reader_test",
279+
size = "large",
279280
srcs = ["array_record_reader_test.cc"],
280281
shard_count = 4,
281282
deps = [

cpp/array_record_reader_test.cc

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -203,52 +203,55 @@ TEST_P(ArrayRecordReaderTest, RandomDatasetTest) {
203203
std::min(ArrayRecordWriterBase::Options::kDefaultGroupSize, kDatasetSize);
204204
EXPECT_EQ(reader.RecordGroupSize(), group_size);
205205

206-
std::vector<bool> read_all_records(kDatasetSize, false);
206+
// vector bool is casted as bit string, which is not thread safe to write.
207+
std::vector<int> read_all_records(kDatasetSize, 0);
207208
ASSERT_TRUE(reader
208209
.ParallelReadRecords(
209210
[&](uint64_t record_index,
210211
absl::string_view result_view) -> absl::Status {
211212
EXPECT_EQ(result_view, records[record_index]);
212213
EXPECT_FALSE(read_all_records[record_index]);
213-
read_all_records[record_index] = true;
214+
read_all_records[record_index] = 1;
214215
return absl::OkStatus();
215216
})
216217
.ok());
217-
for (bool record_was_read : read_all_records) {
218+
for (auto record_was_read : read_all_records) {
218219
EXPECT_TRUE(record_was_read);
219220
}
220221

221222
std::vector<uint64_t> indices = {0, 3, 5, 7, 101, 2000};
222-
std::vector<bool> read_indexed_records(indices.size(), false);
223+
// vector bool is casted as bit string, which is not thread safe to write.
224+
std::vector<int> read_indexed_records(indices.size(), 0);
223225
ASSERT_TRUE(reader
224226
.ParallelReadRecordsWithIndices(
225227
indices,
226228
[&](uint64_t indices_idx,
227229
absl::string_view result_view) -> absl::Status {
228230
EXPECT_EQ(result_view, records[indices[indices_idx]]);
229231
EXPECT_FALSE(read_indexed_records[indices_idx]);
230-
read_indexed_records[indices_idx] = true;
232+
read_indexed_records[indices_idx] = 1;
231233
return absl::OkStatus();
232234
})
233235
.ok());
234-
for (bool record_was_read : read_indexed_records) {
236+
for (auto record_was_read : read_indexed_records) {
235237
EXPECT_TRUE(record_was_read);
236238
}
237239

238240
uint64_t begin = 10, end = 101;
239-
std::vector<bool> read_range_records(end - begin, false);
241+
// vector bool is casted as bit string, which is not thread safe to write.
242+
std::vector<int> read_range_records(end - begin, 0);
240243
ASSERT_TRUE(reader
241244
.ParallelReadRecordsInRange(
242245
begin, end,
243246
[&](uint64_t record_index,
244247
absl::string_view result_view) -> absl::Status {
245248
EXPECT_EQ(result_view, records[record_index]);
246249
EXPECT_FALSE(read_range_records[record_index - begin]);
247-
read_range_records[record_index - begin] = true;
250+
read_range_records[record_index - begin] = 1;
248251
return absl::OkStatus();
249252
})
250253
.ok());
251-
for (bool record_was_read : read_range_records) {
254+
for (auto record_was_read : read_range_records) {
252255
EXPECT_TRUE(record_was_read);
253256
}
254257

cpp/array_record_writer.cc

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -105,10 +105,7 @@ template <typename T, typename H = std::string>
105105
absl::StatusOr<Chunk> ChunkFromSpan(CompressorOptions compression_options,
106106
absl::Span<const T> span,
107107
std::optional<H> header = std::nullopt) {
108-
riegeli::SimpleEncoder encoder(
109-
compression_options,
110-
riegeli::SimpleEncoder::TuningOptions().set_size_hint(
111-
span.size() * sizeof(typename decltype(span)::value_type)));
108+
riegeli::SimpleEncoder encoder(compression_options);
112109
if (header.has_value()) {
113110
encoder.AddRecord(header.value());
114111
}
@@ -166,7 +163,7 @@ ArrayRecordWriterBase::Options::FromString(absl::string_view text) {
166163
// Padding
167164
options_parser.AddOption(
168165
"pad_to_block_boundary",
169-
ValueParser::Enum({{"", true}, {"true", true}, {"false", false}},
166+
ValueParser::Enum({{"", false}, {"true", true}, {"false", false}},
170167
&options.pad_to_block_boundary_));
171168
if (!options_parser.FromString(text)) {
172169
return options_parser.status();
@@ -400,10 +397,8 @@ std::unique_ptr<riegeli::ChunkEncoder> ArrayRecordWriterBase::CreateEncoder() {
400397
riegeli::TransposeEncoder::TuningOptions().set_bucket_size(
401398
options_.transpose_bucket_size()));
402399
} else {
403-
encoder = std::make_unique<riegeli::SimpleEncoder>(
404-
options_.compressor_options(),
405-
riegeli::SimpleEncoder::TuningOptions().set_size_hint(
406-
submit_chunk_callback_->get_last_decoded_data_size()));
400+
encoder =
401+
std::make_unique<riegeli::SimpleEncoder>(options_.compressor_options());
407402
}
408403
if (pool_) {
409404
return std::make_unique<riegeli::DeferredEncoder>(std::move(encoder));

cpp/array_record_writer.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ class ArrayRecordWriterBase : public riegeli::Object {
108108
// "window_log" : window_log |
109109
// "pad_to_block_boundary" (":" ("true" | "false"))?
110110
// group_size ::= positive integer which specifies number of records to be
111-
// grouped into a chunk before compression. (default 65536)
111+
// grouped into a chunk before compression. (default 1)
112112
// saturation_delay_ms ::= positive integer which specifies a delay in
113113
// milliseconds when the parallel writing queue is saturated.
114114
// max_parallelism ::= `auto` or positive integers which specifies
@@ -126,7 +126,7 @@ class ArrayRecordWriterBase : public riegeli::Object {
126126
//
127127
// The larger the value, the denser the file, at the cost of more expansive
128128
// random accessing.
129-
static constexpr uint32_t kDefaultGroupSize = 65536;
129+
static constexpr uint32_t kDefaultGroupSize = 1;
130130
Options& set_group_size(uint32_t group_size) {
131131
group_size_ = group_size;
132132
return *this;

cpp/array_record_writer_test.cc

Lines changed: 57 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -220,6 +220,59 @@ TEST_P(ArrayRecordWriterTest, RandomDatasetTest) {
220220
ASSERT_TRUE(reader.Close());
221221
}
222222

223+
TEST_P(ArrayRecordWriterTest, CompressionRatioTest) {
224+
float expected_ratio = 1.0;
225+
auto options = GetOptions();
226+
if (options.pad_to_block_boundary()) {
227+
GTEST_SKIP()
228+
<< "Padded boundaries are known to have bad compression ratio.";
229+
}
230+
// We use uniform int distribution for values that are within a byte.
231+
// Therefore regular compression algorithm should easily compress it
232+
// for at least 2x and near 4x. However, the snappy compression doesn't
233+
// work that well so we set a higher threshold.
234+
switch (options.compression_type()) {
235+
case riegeli::CompressionType::kNone:
236+
GTEST_SKIP() << "No need to verify compression ratio for uncompressed.";
237+
case riegeli::CompressionType::kBrotli:
238+
expected_ratio = 0.4;
239+
break;
240+
case riegeli::CompressionType::kZstd:
241+
expected_ratio = 0.4;
242+
break;
243+
case riegeli::CompressionType::kSnappy:
244+
expected_ratio = 0.6;
245+
break;
246+
}
247+
options.set_group_size(128);
248+
std::mt19937 bitgen;
249+
std::uniform_int_distribution<uint32_t> dist(0, 128);
250+
constexpr uint32_t num_records = 32768;
251+
constexpr uint32_t dim = 256;
252+
std::vector<uint32_t> records(num_records * dim);
253+
254+
for (auto i : Seq(num_records * dim)) {
255+
records[i] = dist(bitgen);
256+
}
257+
std::string encoded;
258+
259+
ARThreadPool* pool = nullptr;
260+
if (std::get<3>(GetParam())) {
261+
pool = ArrayRecordGlobalPool();
262+
}
263+
264+
auto writer = ArrayRecordWriter(
265+
riegeli::Maker<riegeli::StringWriter>(&encoded), options, pool);
266+
267+
for (auto i : Seq(num_records)) {
268+
EXPECT_TRUE(
269+
writer.WriteRecord(records.data() + dim * i, dim * sizeof(uint32_t)));
270+
}
271+
ASSERT_TRUE(writer.Close());
272+
EXPECT_LE(encoded.size(),
273+
num_records * dim * sizeof(uint32_t) * expected_ratio);
274+
}
275+
223276
INSTANTIATE_TEST_SUITE_P(
224277
ParamTest, ArrayRecordWriterTest,
225278
testing::Combine(testing::Values(CompressionType::kUncompressed,
@@ -253,7 +306,7 @@ TEST(ArrayRecordWriterOptionsTest, ParsingTest) {
253306
EXPECT_FALSE(option.pad_to_block_boundary());
254307

255308
EXPECT_EQ(option.ToString(),
256-
"group_size:65536,"
309+
"group_size:1,"
257310
"transpose:false,"
258311
"pad_to_block_boundary:false,"
259312
"zstd:3,"
@@ -274,7 +327,7 @@ TEST(ArrayRecordWriterOptionsTest, ParsingTest) {
274327
EXPECT_FALSE(option.pad_to_block_boundary());
275328

276329
EXPECT_EQ(option.ToString(),
277-
"group_size:65536,"
330+
"group_size:1,"
278331
"transpose:false,"
279332
"pad_to_block_boundary:false,"
280333
"zstd:3,"
@@ -362,7 +415,7 @@ TEST(ArrayRecordWriterOptionsTest, ParsingTest) {
362415
EXPECT_TRUE(option.pad_to_block_boundary());
363416

364417
EXPECT_EQ(option.ToString(),
365-
"group_size:65536,"
418+
"group_size:1,"
366419
"transpose:false,"
367420
"pad_to_block_boundary:true,"
368421
"uncompressed");
@@ -382,7 +435,7 @@ TEST(ArrayRecordWriterOptionsTest, ParsingTest) {
382435
EXPECT_TRUE(option.pad_to_block_boundary());
383436

384437
EXPECT_EQ(option.ToString(),
385-
"group_size:65536,"
438+
"group_size:1,"
386439
"transpose:false,"
387440
"pad_to_block_boundary:true,"
388441
"snappy");

python/array_record_data_source_test.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ def setUp(self):
4747

4848
def test_check_default_group_size(self):
4949
filename = os.path.join(FLAGS.test_tmpdir, "test.array_record")
50-
writer = array_record_module.ArrayRecordWriter(filename)
50+
writer = array_record_module.ArrayRecordWriter(filename, "group_size:65536")
5151
writer.write(b"foobar")
5252
writer.close()
5353
reader = array_record_module.ArrayRecordReader(filename)

setup.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ def has_ext_modules(self):
3131

3232
setup(
3333
name='array_record',
34-
version='0.8.3',
34+
version='0.8.4',
3535
description='A file format that achieves a new frontier of IO efficiency',
3636
author='ArrayRecord team',
3737
author_email='no-reply@google.com',

0 commit comments

Comments
 (0)