Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions db/blob/blob_counting_iterator_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -53,15 +53,15 @@ TEST(BlobCountingIteratorTest, CountBlobs) {

std::string first_blob_index;
BlobIndex::EncodeBlob(&first_blob_index, first_blob_file_number, first_offset,
first_size, kNoCompression);
first_size, kNoCompression, 0);

constexpr uint64_t second_blob_file_number = 6;
constexpr uint64_t second_offset = 2000;
constexpr uint64_t second_size = 4000;

std::string second_blob_index;
BlobIndex::EncodeBlob(&second_blob_index, second_blob_file_number,
second_offset, second_size, kNoCompression);
second_offset, second_size, kNoCompression, 0);

const std::vector<std::string> values{first_blob_index, second_blob_index,
"raw_value"};
Expand Down
7 changes: 6 additions & 1 deletion db/blob/blob_file_builder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,11 @@ Status BlobFileBuilder::Add(const Slice& key, const Slice& value,
return Status::OK();
}

// expire time is the suffix 8 bytes of value, see format in pika project
Slice expire_time_slice = Slice(value.data() + value.size() - sizeof(uint64_t), sizeof(uint64_t));
uint64_t expire_time = 0;
GetFixed64(&expire_time_slice, &expire_time);

{
const Status s = OpenBlobFileIfNeeded();
if (!s.ok()) {
Expand Down Expand Up @@ -150,7 +155,7 @@ Status BlobFileBuilder::Add(const Slice& key, const Slice& value,
}

BlobIndex::EncodeBlob(blob_index, blob_file_number, blob_offset, blob.size(),
blob_compression_type_);
blob_compression_type_, expire_time);

return Status::OK();
}
Expand Down
2 changes: 1 addition & 1 deletion db/blob/blob_garbage_meter_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ TEST(BlobGarbageMeterTest, MeasureGarbage) {

std::string value;
BlobIndex::EncodeBlob(&value, blob.blob_file_number, blob.offset, blob.size,
blob.compression_type);
blob.compression_type, 0);
const Slice value_slice(value);

if (blob.has_in_flow) {
Expand Down
23 changes: 15 additions & 8 deletions db/blob/blob_index.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@ namespace ROCKSDB_NAMESPACE {
// +------+------------+---------------+
//
// kBlob:
// +------+-------------+----------+----------+-------------+
// | type | file number | offset | size | compression |
// +------+-------------+----------+----------+-------------+
// | char | varint64 | varint64 | varint64 | char |
// +------+-------------+----------+----------+-------------+
// +------+-------------+----------+----------+-------------+-------------+
// | type | file number | offset | size | expire_time | compression |
// +------+-------------+----------+----------+-------------+-------------+
// | char | varint64 | varint64 | varint64 | varint64 | char |
// +------+-------------+----------+----------+-------------+-------------+
//
// kBlobTTL:
// +------+------------+-------------+----------+----------+-------------+
Expand Down Expand Up @@ -76,6 +76,11 @@ class BlobIndex {
return file_number_;
}

uint64_t expire_time() const {
assert(!IsInlined());
return expire_time_;
}

uint64_t offset() const {
assert(!IsInlined());
return offset_;
Expand Down Expand Up @@ -110,7 +115,7 @@ class BlobIndex {
value_ = slice;
} else {
if (GetVarint64(&slice, &file_number_) && GetVarint64(&slice, &offset_) &&
GetVarint64(&slice, &size_) && slice.size() == 1) {
GetVarint64(&slice, &size_) && GetVarint64(&slice, &expire_time_) && slice.size() == 1) {
compression_ = static_cast<CompressionType>(*slice.data());
} else {
return Status::Corruption(kErrorMessage, "Corrupted blob offset");
Expand Down Expand Up @@ -149,14 +154,15 @@ class BlobIndex {

static void EncodeBlob(std::string* dst, uint64_t file_number,
uint64_t offset, uint64_t size,
CompressionType compression) {
CompressionType compression, uint64_t expire_time) {
assert(dst != nullptr);
dst->clear();
dst->reserve(kMaxVarint64Length * 3 + 2);
dst->reserve(kMaxVarint64Length * 4 + 2);
dst->push_back(static_cast<char>(Type::kBlob));
PutVarint64(dst, file_number);
PutVarint64(dst, offset);
PutVarint64(dst, size);
PutVarint64(dst, expire_time);
dst->push_back(static_cast<char>(compression));
}

Expand All @@ -182,6 +188,7 @@ class BlobIndex {
uint64_t offset_ = 0;
uint64_t size_ = 0;
CompressionType compression_ = kNoCompression;
uint64_t expire_time_ = 0;
};

} // namespace ROCKSDB_NAMESPACE
2 changes: 1 addition & 1 deletion db/blob/db_blob_basic_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1051,7 +1051,7 @@ TEST_F(DBBlobBasicTest, GetBlob_IndexWithInvalidFileNumber) {
constexpr uint64_t size = 5678;

BlobIndex::EncodeBlob(&blob_index, blob_file_number, offset, size,
kNoCompression);
kNoCompression, 0);

WriteBatch batch;
ASSERT_OK(WriteBatchInternal::PutBlobIndex(&batch, 0, key, blob_index));
Expand Down
24 changes: 20 additions & 4 deletions db/blob/db_blob_compaction_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include "db/db_test_util.h"
#include "port/stack_trace.h"
#include "test_util/sync_point.h"
#include "rocksdb/system_clock.h"

namespace ROCKSDB_NAMESPACE {

Expand Down Expand Up @@ -40,8 +41,13 @@ class FilterByKeyLength : public CompactionFilter {
return "rocksdb.compaction.filter.by.key.length";
}
CompactionFilter::Decision FilterBlobByKey(
int /*level*/, const Slice& key, std::string* /*new_value*/,
int /*level*/, const Slice& key, uint64_t expire_time, std::string* /*new_value*/,
std::string* /*skip_until*/) const override {
const auto clock = SystemClock::Default().get();
uint64_t now = clock->NowMicros() / 1000 / 1000;
if (expire_time != 0 && expire_time < now) {
return CompactionFilter::Decision::kRemove;
}
if (key.size() < length_threshold_) {
return CompactionFilter::Decision::kRemove;
}
Expand Down Expand Up @@ -82,8 +88,13 @@ class BadBlobCompactionFilter : public CompactionFilter {
filter_v2_(filter_v2) {}
const char* Name() const override { return "rocksdb.compaction.filter.bad"; }
CompactionFilter::Decision FilterBlobByKey(
int /*level*/, const Slice& key, std::string* /*new_value*/,
int /*level*/, const Slice& key, uint64_t expire_time, std::string* /*new_value*/,
std::string* /*skip_until*/) const override {
const auto clock = SystemClock::Default().get();
uint64_t now = clock->NowMicros() / 1000 / 1000;
if (expire_time != 0 && expire_time < now) {
return CompactionFilter::Decision::kRemove;
}
if (key.size() >= prefix_.size() &&
0 == strncmp(prefix_.data(), key.data(), prefix_.size())) {
return CompactionFilter::Decision::kUndetermined;
Expand Down Expand Up @@ -111,17 +122,22 @@ class ValueBlindWriteFilter : public CompactionFilter {
return "rocksdb.compaction.filter.blind.write";
}
CompactionFilter::Decision FilterBlobByKey(
int level, const Slice& key, std::string* new_value,
int level, const Slice& key, uint64_t expire_time, std::string* new_value,
std::string* skip_until) const override;

private:
const std::string new_value_;
};

CompactionFilter::Decision ValueBlindWriteFilter::FilterBlobByKey(
int /*level*/, const Slice& /*key*/, std::string* new_value,
int /*level*/, const Slice& /*key*/, uint64_t expire_time, std::string* new_value,
std::string* /*skip_until*/) const {
assert(new_value);
const auto clock = SystemClock::Default().get();
uint64_t now = clock->NowMicros() / 1000 / 1000;
if (expire_time != 0 && expire_time < now) {
return CompactionFilter::Decision::kRemove;
}
new_value->assign(new_value_);
return CompactionFilter::Decision::kChangeValue;
}
Expand Down
23 changes: 11 additions & 12 deletions db/compaction/compaction_iterator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -259,8 +259,18 @@ bool CompactionIterator::InvokeFilterIfNeeded(bool* need_skip,
StopWatchNano timer(clock_, report_detailed_time_);

if (ikey_.type == kTypeBlobIndex) {
// For integrated BlobDB impl, CompactionIterator reads blob value.
// For Stacked BlobDB impl, the corresponding CompactionFilter's
// FilterV2 method should read the blob value.
BlobIndex blob_index;
Status s = blob_index.DecodeFrom(value_);
if (!s.ok()) {
status_ = s;
validity_info_.Invalidate();
return false;
}
decision = compaction_filter_->FilterBlobByKey(
level_, filter_key, &compaction_filter_value_,
level_, filter_key, blob_index.expire_time(), &compaction_filter_value_,
compaction_filter_skip_until_.rep());
if (decision == CompactionFilter::Decision::kUndetermined &&
!compaction_filter_->IsStackedBlobDbInternalCompactionFilter()) {
Expand All @@ -275,17 +285,6 @@ bool CompactionIterator::InvokeFilterIfNeeded(bool* need_skip,
"CompactionIterator::InvokeFilterIfNeeded::TamperWithBlobIndex",
&value_);

// For integrated BlobDB impl, CompactionIterator reads blob value.
// For Stacked BlobDB impl, the corresponding CompactionFilter's
// FilterV2 method should read the blob value.
BlobIndex blob_index;
Status s = blob_index.DecodeFrom(value_);
if (!s.ok()) {
status_ = s;
validity_info_.Invalidate();
return false;
}

FilePrefetchBuffer* prefetch_buffer =
prefetch_buffers_ ? prefetch_buffers_->GetOrCreatePrefetchBuffer(
blob_index.file_number())
Expand Down
2 changes: 1 addition & 1 deletion db/compaction/compaction_job_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ class CompactionJobTestBase : public testing::Test {
uint64_t size) {
std::string blob_index;
BlobIndex::EncodeBlob(&blob_index, blob_file_number, offset, size,
kNoCompression);
kNoCompression, 0);
return blob_index;
}

Expand Down
2 changes: 1 addition & 1 deletion db/db_compaction_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -8793,7 +8793,7 @@ TEST_F(DBCompactionTest, CompactionWithBlobGCError_IndexWithInvalidFileNumber) {
constexpr uint64_t size = 5678;

BlobIndex::EncodeBlob(&blob_index, blob_file_number, offset, size,
kNoCompression);
kNoCompression, 0);

WriteBatch batch;
ASSERT_OK(
Expand Down
2 changes: 1 addition & 1 deletion db/db_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1352,7 +1352,7 @@ TEST_F(DBTest, MetaDataTest) {
// Add a single blob reference to each file
std::string blob_index;
BlobIndex::EncodeBlob(&blob_index, /* blob_file_number */ i + 1000,
/* offset */ 1234, /* size */ 5678, kNoCompression);
/* offset */ 1234, /* size */ 5678, kNoCompression, 0);

WriteBatch batch;
ASSERT_OK(WriteBatchInternal::PutBlobIndex(&batch, 0, Key(key_index),
Expand Down
2 changes: 1 addition & 1 deletion db/flush_job_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ TEST_F(FlushJobTest, NonEmpty) {
} else {
BlobIndex::EncodeBlob(&blob_index, blob_file_numbers[i],
/* offset */ i << 10, /* size */ i << 20,
kNoCompression);
kNoCompression, 0);
}

const SequenceNumber seq(i + 10001);
Expand Down
2 changes: 1 addition & 1 deletion db/listener_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ class EventListenerTest : public DBTestBase {
uint64_t size) {
std::string blob_index;
BlobIndex::EncodeBlob(&blob_index, blob_file_number, offset, size,
kNoCompression);
kNoCompression, 0);
return blob_index;
}

Expand Down
6 changes: 3 additions & 3 deletions db/version_edit_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -705,7 +705,7 @@ TEST(FileMetaDataTest, UpdateBoundariesBlobIndex) {

std::string blob_index;
BlobIndex::EncodeBlob(&blob_index, blob_file_number, offset, size,
kNoCompression);
kNoCompression, 0);

constexpr SequenceNumber seq = 201;

Expand All @@ -721,7 +721,7 @@ TEST(FileMetaDataTest, UpdateBoundariesBlobIndex) {

std::string blob_index;
BlobIndex::EncodeBlob(&blob_index, expected_oldest_blob_file_number, offset,
size, kNoCompression);
size, kNoCompression, 0);

constexpr SequenceNumber seq = 202;

Expand Down Expand Up @@ -782,7 +782,7 @@ TEST(FileMetaDataTest, UpdateBoundariesBlobIndex) {

std::string blob_index;
BlobIndex::EncodeBlob(&blob_index, kInvalidBlobFileNumber, offset, size,
kNoCompression);
kNoCompression, 0);

constexpr SequenceNumber seq = 206;

Expand Down
1 change: 1 addition & 0 deletions include/rocksdb/compaction_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,7 @@ class CompactionFilter : public Customizable {
// and kRemoveAndSkipUntil respectively, and have the same semantics as
// the corresponding parameters of FilterV2/V3.
virtual Decision FilterBlobByKey(int /*level*/, const Slice& /*key*/,
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the args expire_time seems do nothing, and in db/blob/db_blob_compaction_test.cc, the FilterBlobByKey function of derived class is not be modified.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

derived compactionfilter will implement FilterBlobByKey to determine if drop this kv according to expire_time.
since pika store ttl in value,this can skip blobfile random read during compaction.

uint64_t /*expire_time*/,
std::string* /*new_value*/,
std::string* /*skip_until*/) const {
return Decision::kUndetermined;
Expand Down
4 changes: 2 additions & 2 deletions utilities/blob_db/blob_compaction_filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ CompactionFilter::Decision BlobIndexCompactionFilterBase::HandleValueChange(
}
BlobIndex::EncodeBlob(new_value, new_blob_file_number, new_blob_offset,
new_blob_value.size(),
blob_db_impl->bdb_options_.compression);
blob_db_impl->bdb_options_.compression, 0);
return Decision::kChangeBlobIndex;
}

Expand Down Expand Up @@ -404,7 +404,7 @@ CompactionFilter::BlobDecision BlobIndexCompactionFilterGC::PrepareBlobOutput(
}

BlobIndex::EncodeBlob(new_value, new_blob_file_number, new_blob_offset,
blob.size(), compression_type);
blob.size(), compression_type, 0);

gc_stats_.AddRelocatedBlob(blob_index.size());

Expand Down
2 changes: 1 addition & 1 deletion utilities/blob_db/blob_db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1380,7 +1380,7 @@ Status BlobDBImpl::AppendBlob(const std::shared_ptr<BlobFile>& bfile,

if (expiration == kNoExpiration) {
BlobIndex::EncodeBlob(index_entry, bfile->BlobFileNumber(), blob_offset,
value.size(), bdb_options_.compression);
value.size(), bdb_options_.compression, 0);
} else {
BlobIndex::EncodeBlobTTL(index_entry, expiration, bfile->BlobFileNumber(),
blob_offset, value.size(),
Expand Down
2 changes: 1 addition & 1 deletion utilities/blob_db/blob_db_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1946,7 +1946,7 @@ TEST_F(BlobDBTest, GarbageCollectionFailure) {
// blob file.
std::string blob_index;
BlobIndex::EncodeBlob(&blob_index, /* file_number */ 1000, /* offset */ 1234,
/* size */ 5678, kNoCompression);
/* size */ 5678, kNoCompression, 0);

WriteBatch batch;
ASSERT_OK(WriteBatchInternal::PutBlobIndex(
Expand Down