diff --git a/db/blob/blob_counting_iterator_test.cc b/db/blob/blob_counting_iterator_test.cc index c7bbc8f587dd..45d13a4ef5f4 100644 --- a/db/blob/blob_counting_iterator_test.cc +++ b/db/blob/blob_counting_iterator_test.cc @@ -53,7 +53,7 @@ TEST(BlobCountingIteratorTest, CountBlobs) { std::string first_blob_index; BlobIndex::EncodeBlob(&first_blob_index, first_blob_file_number, first_offset, - first_size, kNoCompression); + first_size, kNoCompression, 0); constexpr uint64_t second_blob_file_number = 6; constexpr uint64_t second_offset = 2000; @@ -61,7 +61,7 @@ TEST(BlobCountingIteratorTest, CountBlobs) { std::string second_blob_index; BlobIndex::EncodeBlob(&second_blob_index, second_blob_file_number, - second_offset, second_size, kNoCompression); + second_offset, second_size, kNoCompression, 0); const std::vector values{first_blob_index, second_blob_index, "raw_value"}; diff --git a/db/blob/blob_file_builder.cc b/db/blob/blob_file_builder.cc index 35269fdb509d..e4434dd7d04f 100644 --- a/db/blob/blob_file_builder.cc +++ b/db/blob/blob_file_builder.cc @@ -104,6 +104,11 @@ Status BlobFileBuilder::Add(const Slice& key, const Slice& value, return Status::OK(); } + // expire time is the suffix 8 bytes of value, see format in pika project + Slice expire_time_slice = Slice(value.data() + value.size() - sizeof(uint64_t), sizeof(uint64_t)); + uint64_t expire_time = 0; + GetFixed64(&expire_time_slice, &expire_time); + { const Status s = OpenBlobFileIfNeeded(); if (!s.ok()) { @@ -150,7 +155,7 @@ Status BlobFileBuilder::Add(const Slice& key, const Slice& value, } BlobIndex::EncodeBlob(blob_index, blob_file_number, blob_offset, blob.size(), - blob_compression_type_); + blob_compression_type_, expire_time); return Status::OK(); } diff --git a/db/blob/blob_garbage_meter_test.cc b/db/blob/blob_garbage_meter_test.cc index ba53f06f1a37..a6b9e89a52e4 100644 --- a/db/blob/blob_garbage_meter_test.cc +++ b/db/blob/blob_garbage_meter_test.cc @@ -53,7 +53,7 @@ TEST(BlobGarbageMeterTest, MeasureGarbage) { std::string value; BlobIndex::EncodeBlob(&value, blob.blob_file_number, blob.offset, blob.size, - blob.compression_type); + blob.compression_type, 0); const Slice value_slice(value); if (blob.has_in_flow) { diff --git a/db/blob/blob_index.h b/db/blob/blob_index.h index e9944d78448b..f0cf4fe78183 100644 --- a/db/blob/blob_index.h +++ b/db/blob/blob_index.h @@ -26,11 +26,11 @@ namespace ROCKSDB_NAMESPACE { // +------+------------+---------------+ // // kBlob: -// +------+-------------+----------+----------+-------------+ -// | type | file number | offset | size | compression | -// +------+-------------+----------+----------+-------------+ -// | char | varint64 | varint64 | varint64 | char | -// +------+-------------+----------+----------+-------------+ +// +------+-------------+----------+----------+-------------+-------------+ +// | type | file number | offset | size | expire_time | compression | +// +------+-------------+----------+----------+-------------+-------------+ +// | char | varint64 | varint64 | varint64 | varint64 | char | +// +------+-------------+----------+----------+-------------+-------------+ // // kBlobTTL: // +------+------------+-------------+----------+----------+-------------+ @@ -76,6 +76,11 @@ class BlobIndex { return file_number_; } + uint64_t expire_time() const { + assert(!IsInlined()); + return expire_time_; + } + uint64_t offset() const { assert(!IsInlined()); return offset_; @@ -110,7 +115,7 @@ class BlobIndex { value_ = slice; } else { if (GetVarint64(&slice, &file_number_) && GetVarint64(&slice, &offset_) && - GetVarint64(&slice, &size_) && slice.size() == 1) { + GetVarint64(&slice, &size_) && GetVarint64(&slice, &expire_time_) && slice.size() == 1) { compression_ = static_cast(*slice.data()); } else { return Status::Corruption(kErrorMessage, "Corrupted blob offset"); @@ -149,14 +154,15 @@ class BlobIndex { static void EncodeBlob(std::string* dst, uint64_t file_number, uint64_t offset, uint64_t size, - CompressionType compression) { + CompressionType compression, uint64_t expire_time) { assert(dst != nullptr); dst->clear(); - dst->reserve(kMaxVarint64Length * 3 + 2); + dst->reserve(kMaxVarint64Length * 4 + 2); dst->push_back(static_cast(Type::kBlob)); PutVarint64(dst, file_number); PutVarint64(dst, offset); PutVarint64(dst, size); + PutVarint64(dst, expire_time); dst->push_back(static_cast(compression)); } @@ -182,6 +188,7 @@ class BlobIndex { uint64_t offset_ = 0; uint64_t size_ = 0; CompressionType compression_ = kNoCompression; + uint64_t expire_time_ = 0; }; } // namespace ROCKSDB_NAMESPACE diff --git a/db/blob/db_blob_basic_test.cc b/db/blob/db_blob_basic_test.cc index 1c0caba93d95..1eaf5e44b151 100644 --- a/db/blob/db_blob_basic_test.cc +++ b/db/blob/db_blob_basic_test.cc @@ -1051,7 +1051,7 @@ TEST_F(DBBlobBasicTest, GetBlob_IndexWithInvalidFileNumber) { constexpr uint64_t size = 5678; BlobIndex::EncodeBlob(&blob_index, blob_file_number, offset, size, - kNoCompression); + kNoCompression, 0); WriteBatch batch; ASSERT_OK(WriteBatchInternal::PutBlobIndex(&batch, 0, key, blob_index)); diff --git a/db/blob/db_blob_compaction_test.cc b/db/blob/db_blob_compaction_test.cc index 14a3155e251b..cd8742ef01e7 100644 --- a/db/blob/db_blob_compaction_test.cc +++ b/db/blob/db_blob_compaction_test.cc @@ -8,6 +8,7 @@ #include "db/db_test_util.h" #include "port/stack_trace.h" #include "test_util/sync_point.h" +#include "rocksdb/system_clock.h" namespace ROCKSDB_NAMESPACE { @@ -40,8 +41,13 @@ class FilterByKeyLength : public CompactionFilter { return "rocksdb.compaction.filter.by.key.length"; } CompactionFilter::Decision FilterBlobByKey( - int /*level*/, const Slice& key, std::string* /*new_value*/, + int /*level*/, const Slice& key, uint64_t expire_time, std::string* /*new_value*/, std::string* /*skip_until*/) const override { + const auto clock = SystemClock::Default().get(); + uint64_t now = clock->NowMicros() / 1000 / 1000; + if (expire_time != 0 && expire_time < now) { + return CompactionFilter::Decision::kRemove; + } if (key.size() < length_threshold_) { return CompactionFilter::Decision::kRemove; } @@ -82,8 +88,13 @@ class BadBlobCompactionFilter : public CompactionFilter { filter_v2_(filter_v2) {} const char* Name() const override { return "rocksdb.compaction.filter.bad"; } CompactionFilter::Decision FilterBlobByKey( - int /*level*/, const Slice& key, std::string* /*new_value*/, + int /*level*/, const Slice& key, uint64_t expire_time, std::string* /*new_value*/, std::string* /*skip_until*/) const override { + const auto clock = SystemClock::Default().get(); + uint64_t now = clock->NowMicros() / 1000 / 1000; + if (expire_time != 0 && expire_time < now) { + return CompactionFilter::Decision::kRemove; + } if (key.size() >= prefix_.size() && 0 == strncmp(prefix_.data(), key.data(), prefix_.size())) { return CompactionFilter::Decision::kUndetermined; @@ -111,7 +122,7 @@ class ValueBlindWriteFilter : public CompactionFilter { return "rocksdb.compaction.filter.blind.write"; } CompactionFilter::Decision FilterBlobByKey( - int level, const Slice& key, std::string* new_value, + int level, const Slice& key, uint64_t expire_time, std::string* new_value, std::string* skip_until) const override; private: @@ -119,9 +130,14 @@ class ValueBlindWriteFilter : public CompactionFilter { }; CompactionFilter::Decision ValueBlindWriteFilter::FilterBlobByKey( - int /*level*/, const Slice& /*key*/, std::string* new_value, + int /*level*/, const Slice& /*key*/, uint64_t expire_time, std::string* new_value, std::string* /*skip_until*/) const { assert(new_value); + const auto clock = SystemClock::Default().get(); + uint64_t now = clock->NowMicros() / 1000 / 1000; + if (expire_time != 0 && expire_time < now) { + return CompactionFilter::Decision::kRemove; + } new_value->assign(new_value_); return CompactionFilter::Decision::kChangeValue; } diff --git a/db/compaction/compaction_iterator.cc b/db/compaction/compaction_iterator.cc index 85d1c039bd30..e3140a5b171e 100644 --- a/db/compaction/compaction_iterator.cc +++ b/db/compaction/compaction_iterator.cc @@ -259,8 +259,18 @@ bool CompactionIterator::InvokeFilterIfNeeded(bool* need_skip, StopWatchNano timer(clock_, report_detailed_time_); if (ikey_.type == kTypeBlobIndex) { + // For integrated BlobDB impl, CompactionIterator reads blob value. + // For Stacked BlobDB impl, the corresponding CompactionFilter's + // FilterV2 method should read the blob value. + BlobIndex blob_index; + Status s = blob_index.DecodeFrom(value_); + if (!s.ok()) { + status_ = s; + validity_info_.Invalidate(); + return false; + } decision = compaction_filter_->FilterBlobByKey( - level_, filter_key, &compaction_filter_value_, + level_, filter_key, blob_index.expire_time(), &compaction_filter_value_, compaction_filter_skip_until_.rep()); if (decision == CompactionFilter::Decision::kUndetermined && !compaction_filter_->IsStackedBlobDbInternalCompactionFilter()) { @@ -275,17 +285,6 @@ bool CompactionIterator::InvokeFilterIfNeeded(bool* need_skip, "CompactionIterator::InvokeFilterIfNeeded::TamperWithBlobIndex", &value_); - // For integrated BlobDB impl, CompactionIterator reads blob value. - // For Stacked BlobDB impl, the corresponding CompactionFilter's - // FilterV2 method should read the blob value. - BlobIndex blob_index; - Status s = blob_index.DecodeFrom(value_); - if (!s.ok()) { - status_ = s; - validity_info_.Invalidate(); - return false; - } - FilePrefetchBuffer* prefetch_buffer = prefetch_buffers_ ? prefetch_buffers_->GetOrCreatePrefetchBuffer( blob_index.file_number()) diff --git a/db/compaction/compaction_job_test.cc b/db/compaction/compaction_job_test.cc index a16891110020..68493c0a3275 100644 --- a/db/compaction/compaction_job_test.cc +++ b/db/compaction/compaction_job_test.cc @@ -269,7 +269,7 @@ class CompactionJobTestBase : public testing::Test { uint64_t size) { std::string blob_index; BlobIndex::EncodeBlob(&blob_index, blob_file_number, offset, size, - kNoCompression); + kNoCompression, 0); return blob_index; } diff --git a/db/db_compaction_test.cc b/db/db_compaction_test.cc index 2d71231173b3..a6fc54512437 100644 --- a/db/db_compaction_test.cc +++ b/db/db_compaction_test.cc @@ -8793,7 +8793,7 @@ TEST_F(DBCompactionTest, CompactionWithBlobGCError_IndexWithInvalidFileNumber) { constexpr uint64_t size = 5678; BlobIndex::EncodeBlob(&blob_index, blob_file_number, offset, size, - kNoCompression); + kNoCompression, 0); WriteBatch batch; ASSERT_OK( diff --git a/db/db_test.cc b/db/db_test.cc index 99a03b1509a2..75f390bb7717 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -1352,7 +1352,7 @@ TEST_F(DBTest, MetaDataTest) { // Add a single blob reference to each file std::string blob_index; BlobIndex::EncodeBlob(&blob_index, /* blob_file_number */ i + 1000, - /* offset */ 1234, /* size */ 5678, kNoCompression); + /* offset */ 1234, /* size */ 5678, kNoCompression, 0); WriteBatch batch; ASSERT_OK(WriteBatchInternal::PutBlobIndex(&batch, 0, Key(key_index), diff --git a/db/flush_job_test.cc b/db/flush_job_test.cc index 21d1571a05e1..b0190f21c881 100644 --- a/db/flush_job_test.cc +++ b/db/flush_job_test.cc @@ -236,7 +236,7 @@ TEST_F(FlushJobTest, NonEmpty) { } else { BlobIndex::EncodeBlob(&blob_index, blob_file_numbers[i], /* offset */ i << 10, /* size */ i << 20, - kNoCompression); + kNoCompression, 0); } const SequenceNumber seq(i + 10001); diff --git a/db/listener_test.cc b/db/listener_test.cc index 41577b92c179..bc0cf4b6eae1 100644 --- a/db/listener_test.cc +++ b/db/listener_test.cc @@ -44,7 +44,7 @@ class EventListenerTest : public DBTestBase { uint64_t size) { std::string blob_index; BlobIndex::EncodeBlob(&blob_index, blob_file_number, offset, size, - kNoCompression); + kNoCompression, 0); return blob_index; } diff --git a/db/version_edit_test.cc b/db/version_edit_test.cc index c47389901739..024a8bfd923d 100644 --- a/db/version_edit_test.cc +++ b/db/version_edit_test.cc @@ -705,7 +705,7 @@ TEST(FileMetaDataTest, UpdateBoundariesBlobIndex) { std::string blob_index; BlobIndex::EncodeBlob(&blob_index, blob_file_number, offset, size, - kNoCompression); + kNoCompression, 0); constexpr SequenceNumber seq = 201; @@ -721,7 +721,7 @@ TEST(FileMetaDataTest, UpdateBoundariesBlobIndex) { std::string blob_index; BlobIndex::EncodeBlob(&blob_index, expected_oldest_blob_file_number, offset, - size, kNoCompression); + size, kNoCompression, 0); constexpr SequenceNumber seq = 202; @@ -782,7 +782,7 @@ TEST(FileMetaDataTest, UpdateBoundariesBlobIndex) { std::string blob_index; BlobIndex::EncodeBlob(&blob_index, kInvalidBlobFileNumber, offset, size, - kNoCompression); + kNoCompression, 0); constexpr SequenceNumber seq = 206; diff --git a/include/rocksdb/compaction_filter.h b/include/rocksdb/compaction_filter.h index 1784f2329ac6..55f95c13dd92 100644 --- a/include/rocksdb/compaction_filter.h +++ b/include/rocksdb/compaction_filter.h @@ -332,6 +332,7 @@ class CompactionFilter : public Customizable { // and kRemoveAndSkipUntil respectively, and have the same semantics as // the corresponding parameters of FilterV2/V3. virtual Decision FilterBlobByKey(int /*level*/, const Slice& /*key*/, + uint64_t /*expire_time*/, std::string* /*new_value*/, std::string* /*skip_until*/) const { return Decision::kUndetermined; diff --git a/utilities/blob_db/blob_compaction_filter.cc b/utilities/blob_db/blob_compaction_filter.cc index ddaa98c7d32a..e13b81be3e2b 100644 --- a/utilities/blob_db/blob_compaction_filter.cc +++ b/utilities/blob_db/blob_compaction_filter.cc @@ -141,7 +141,7 @@ CompactionFilter::Decision BlobIndexCompactionFilterBase::HandleValueChange( } BlobIndex::EncodeBlob(new_value, new_blob_file_number, new_blob_offset, new_blob_value.size(), - blob_db_impl->bdb_options_.compression); + blob_db_impl->bdb_options_.compression, 0); return Decision::kChangeBlobIndex; } @@ -404,7 +404,7 @@ CompactionFilter::BlobDecision BlobIndexCompactionFilterGC::PrepareBlobOutput( } BlobIndex::EncodeBlob(new_value, new_blob_file_number, new_blob_offset, - blob.size(), compression_type); + blob.size(), compression_type, 0); gc_stats_.AddRelocatedBlob(blob_index.size()); diff --git a/utilities/blob_db/blob_db_impl.cc b/utilities/blob_db/blob_db_impl.cc index 2fa7ae898f56..ca02947018fa 100644 --- a/utilities/blob_db/blob_db_impl.cc +++ b/utilities/blob_db/blob_db_impl.cc @@ -1380,7 +1380,7 @@ Status BlobDBImpl::AppendBlob(const std::shared_ptr& bfile, if (expiration == kNoExpiration) { BlobIndex::EncodeBlob(index_entry, bfile->BlobFileNumber(), blob_offset, - value.size(), bdb_options_.compression); + value.size(), bdb_options_.compression, 0); } else { BlobIndex::EncodeBlobTTL(index_entry, expiration, bfile->BlobFileNumber(), blob_offset, value.size(), diff --git a/utilities/blob_db/blob_db_test.cc b/utilities/blob_db/blob_db_test.cc index 07f0cc89e4c0..b26d09a35143 100644 --- a/utilities/blob_db/blob_db_test.cc +++ b/utilities/blob_db/blob_db_test.cc @@ -1946,7 +1946,7 @@ TEST_F(BlobDBTest, GarbageCollectionFailure) { // blob file. std::string blob_index; BlobIndex::EncodeBlob(&blob_index, /* file_number */ 1000, /* offset */ 1234, - /* size */ 5678, kNoCompression); + /* size */ 5678, kNoCompression, 0); WriteBatch batch; ASSERT_OK(WriteBatchInternal::PutBlobIndex(