-
Notifications
You must be signed in to change notification settings - Fork 4
Feat/partition by evaluator add only if triggered #165
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Feat/partition by evaluator add only if triggered #165
Conversation
|
Note Reviews pausedUse the following commands to manage reviews:
📝 WalkthroughWalkthroughAdds evaluator state queries and unconditional out-of-order detection to Evaluator; reworks DynamicEvaluator storage to LRU-style weak/shared pointer management with on-demand creation and cleanup; implements partition-aware query evaluation with per-partition evaluator mapping and cached attribute-index lookups; removes multiple CI Artifactory steps. Changes
Sequence Diagram(s)sequenceDiagram
participant Client
participant PartitionByQuery
participant DynamicEvaluator
participant Evaluator
Client->>PartitionByQuery: process_event(event)
rect rgba(200,220,255,0.18)
Note over PartitionByQuery: resolve tuple indexes (cached)\nand build partition key
PartitionByQuery->>PartitionByQuery: find_tuple_indexes / build key
PartitionByQuery->>PartitionByQuery: find_or_create_evaluator_index(key)
end
PartitionByQuery->>DynamicEvaluator: process_event(event, evaluator_idx)
rect rgba(220,255,220,0.18)
Note over DynamicEvaluator: LRU get-or-create & cleanup
DynamicEvaluator->>DynamicEvaluator: cleanup_evaluators_if_necessary(now)
alt missing or expired evaluator
DynamicEvaluator->>DynamicEvaluator: create_evaluator_*()
end
DynamicEvaluator->>Evaluator: next(event)
end
rect rgba(255,240,200,0.18)
Note over Evaluator: evaluation & ordering checks
Evaluator->>Evaluator: check out-of-order (unconditional)
alt in-order
Evaluator->>Evaluator: update last_tuple_time
Evaluator->>Evaluator: evaluate event
else out-of-order
Evaluator-->>DynamicEvaluator: return empty / log critical
end
end
Evaluator-->>DynamicEvaluator: result
DynamicEvaluator-->>PartitionByQuery: result
PartitionByQuery-->>Client: result
Estimated code review effort🎯 4 (Complex) | ⏱️ ~50 minutes
Possibly related PRs
Poem
Pre-merge checks and finishing touches❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
src/core_server/internal/evaluation/evaluator.hpp (1)
71-74:last_tuple_timeused in non-debug code but only declared underCORE_DEBUG
is_time_window_empty(Lines 111-118) andnextuselast_tuple_time, but the member is declared only inside#ifdef CORE_DEBUG(Lines 71-74). In non-CORE_DEBUGbuilds this will not compile, andDynamicEvaluator::cleanup_out_of_time_window_evaluatorsrelies onis_time_window_emptyin all builds.Given
last_tuple_timenow participates in functional logic (time-window cleanup), it should exist unconditionally, not just in debug.Suggested fix (make
last_tuple_timealways available and update the comment):- // Only in debug, check tuples are being sent in ascending order. -#ifdef CORE_DEBUG - uint64_t last_tuple_time = 0; -#endif + // Tracks timestamp of the last processed tuple; used for ordering checks and + // time-window based cleanup. + uint64_t last_tuple_time = 0;Also applies to: 107-118
src/core_server/internal/interface/modules/query/evaluators/dynamic_evaluator.hpp (1)
32-45:EvaluatorArgs::limitis never initialized but used when creating evaluatorsIn
EvaluatorArgs(Lines 32-45), the constructor takes aCEQL::Limit limitparameter but does not initialize thelimitmember. Later,create_evaluator(Lines 202-213) passesevaluator_args.limitinto theEvaluation::Evaluatorconstructor.This means per-partition evaluators will see a default-constructed
CEQL::Limitinstead of the query’s configured limit, changing result limiting behavior.Suggested fix (initialize
limit):EvaluatorArgs(Evaluation::PredicateEvaluator&& tuple_evaluator, std::atomic<uint64_t>& event_time_of_expiration, CEQL::ConsumeBy::ConsumptionPolicy consumption_policy, CEQL::Limit limit) - : tuple_evaluator(std::move(tuple_evaluator)), - event_time_of_expiration(event_time_of_expiration), - consumption_policy(consumption_policy) {} + : tuple_evaluator(std::move(tuple_evaluator)), + event_time_of_expiration(event_time_of_expiration), + consumption_policy(consumption_policy), + limit(limit) {}(Optional) If
PredicateEvaluatoris expensive to copy and is effectively stateless, you may also consider holding it bystd::shared_ptr<const Evaluation::PredicateEvaluator>and passing shared ownership to eachEvaluatorto avoid repeated copies.Also applies to: 202-213
🧹 Nitpick comments (2)
src/core_server/internal/evaluation/evaluator.hpp (1)
136-147: Out-of-order handling now active in release with 500ms sleepThe out-of-order check in
next(Lines 136-147) is now compiled in all builds and still includes:
- A blocking
std::this_thread::sleep_for(std::chrono::nanoseconds(500000000));(500ms).- An
assert(false && ...)that disappears in release builds, leaving only log + sleep before proceeding.If misordered events occur in production, this will stall the query thread for 500ms per event, which is a significant throughput and latency risk.
Consider keeping strong diagnostics but avoiding long sleeps in release:
- if (current_time < last_tuple_time) [[unlikely]] { + if (current_time < last_tuple_time) [[unlikely]] { std::string attributes = event.get_event_reference().to_string(); LOG_CRITICAL(logger, "Received tuple with timestamp {} in Evaluator::next, " "but the last tuple time was {}. Attributes: {}", current_time, last_tuple_time, attributes); - std::this_thread::sleep_for(std::chrono::nanoseconds(500000000)); - assert(false && "Received tuple out of order in Evaluator::next"); + #ifdef CORE_DEBUG + std::this_thread::sleep_for(std::chrono::nanoseconds(500000000)); + assert(false && "Received tuple out of order in Evaluator::next"); + #endif }This preserves strict behavior in debug while avoiding long sleeps in optimized builds.
src/core_server/internal/interface/modules/query/query_types/partition_by_query.hpp (1)
48-78: Partition key hashing and evaluator index mapping look correct; consider long‑term map growthThe
TupleValuesKey+ValueVectorHashimplementation (Lines 48-78) and the mapping in
find_or_create_evaluator_index_from_tuple_indexes(Lines 199-229) correctly:
- Compare full attribute value vectors (using
Types::Value::operator==).- Hash both type and textual representation to distinguish different value types and contents.
- Produce dense evaluator indices (0, 1, 2, …) used by
DynamicEvaluator.The overall logic and caching are sound.
One trade‑off is that
partition_by_attrs_to_evaluator_idx(Lines 88-89) only ever grows: even when a partition’s evaluator is later garbage‑collected byDynamicEvaluator, the key → index entry remains. For workloads with extremely high partition cardinality and churn, this map could become a noticeable memory sink.If that becomes an issue in practice, you might later:
- Track last‑access time per partition index and periodically evict “cold” keys whose evaluators are gone and haven’t been referenced for a long time, or
- Store a more compact representation of the partition key (e.g., pre-hashed string or lightweight value variant) instead of full
Valueclones.For now, the current design is reasonable and consistent with the rest of the pipeline.
Also applies to: 88-92, 199-229
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
src/core_server/internal/evaluation/evaluator.hpp(2 hunks)src/core_server/internal/interface/modules/query/evaluators/dynamic_evaluator.hpp(3 hunks)src/core_server/internal/interface/modules/query/query_types/partition_by_query.hpp(6 hunks)
🧰 Additional context used
🧬 Code graph analysis (3)
src/core_server/internal/interface/modules/query/query_types/partition_by_query.hpp (4)
src/shared/datatypes/value.hpp (1)
val(145-145)src/core_server/internal/interface/modules/query/query_types/simple_query.hpp (3)
query(43-70)query(43-43)event(72-74)src/core_server/internal/evaluation/predicate_evaluator.hpp (1)
PredicateEvaluator(21-27)src/core_server/internal/interface/modules/query/evaluators/dynamic_evaluator.hpp (11)
it(128-139)event(71-102)event(72-72)evaluator_idx(157-164)evaluator_idx(157-157)evaluator_idx(169-180)evaluator_idx(169-169)evaluator_idx(185-197)evaluator_idx(185-185)evaluator_idx(202-213)evaluator_idx(202-202)
src/core_server/internal/interface/modules/query/evaluators/dynamic_evaluator.hpp (2)
src/core_server/internal/interface/modules/query/query_types/partition_by_query.hpp (6)
event(144-164)event(144-144)event(169-197)event(169-169)event(202-229)event(203-204)src/core_server/internal/evaluation/evaluator.hpp (7)
event(120-225)event(121-121)event(257-316)event(257-261)ZoneScopedN(319-346)current_time(111-118)current_time(111-111)
src/core_server/internal/evaluation/evaluator.hpp (1)
src/core_server/internal/interface/modules/query/evaluators/dynamic_evaluator.hpp (4)
current_time(104-123)current_time(104-104)current_time(144-155)current_time(144-144)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (16)
- GitHub Check: upload-conan (x86_64-linux-gcc-libstdc)
- GitHub Check: upload-conan (x86_64-linux-clang-libstdc)
- GitHub Check: run-benchmarks-release
- GitHub Check: run-docker-build-check
- GitHub Check: run-tests-release (x86_64-linux-gcc-libstdc)
- GitHub Check: run-thread-sanitizer-tests-release (x86_64-linux-clang-libstdc)
- GitHub Check: run-tests-release (x86_64-linux-clang-libstdc)
- GitHub Check: run-address-sanitizer-tests-release (x86_64-linux-clang-libstdc)
- GitHub Check: run-basic-tests-debug (x86_64-linux-gcc-libstdc)
- GitHub Check: run-basic-tests-debug (x86_64-linux-clang-libstdc)
- GitHub Check: run-sanitizer-tests-debug (x86_64-linux-clang-libstdc)
- GitHub Check: check-clang-tidy
- GitHub Check: check-ceql-query-grammar-up-to-date
- GitHub Check: check-code-format
- GitHub Check: run-valgrind-debug (x86_64-linux-gcc-libstdc)
- GitHub Check: run-valgrind-debug (x86_64-linux-clang-libstdc)
src/core_server/internal/interface/modules/query/evaluators/dynamic_evaluator.hpp
Outdated
Show resolved
Hide resolved
|
@coderabbitai review |
✅ Actions performedReview triggered.
|
|
@coderabbitai ignore |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
src/core_server/internal/interface/modules/query/evaluators/dynamic_evaluator.hpp (1)
32-45:EvaluatorArgs::limitparameter is ignored and left at its default valueIn
EvaluatorArgs, the constructor takes aCEQL::Limit limitparameter but does not initialize the corresponding member:struct EvaluatorArgs { ... CEQL::Limit limit; EvaluatorArgs(..., CEQL::Limit limit) : tuple_evaluator(std::move(tuple_evaluator)), event_time_of_expiration(event_time_of_expiration), consumption_policy(consumption_policy) {} };As written,
evaluator_args.limitis default-constructed and the passed-inlimitis never used, so all createdEvaluation::Evaluatorinstances effectively ignore the configured limit.Recommend wiring this up:
EvaluatorArgs(Evaluation::PredicateEvaluator&& tuple_evaluator, std::atomic<uint64_t>& event_time_of_expiration, CEQL::ConsumeBy::ConsumptionPolicy consumption_policy, CEQL::Limit limit) : tuple_evaluator(std::move(tuple_evaluator)), event_time_of_expiration(event_time_of_expiration), - consumption_policy(consumption_policy) {} + consumption_policy(consumption_policy), + limit(std::move(limit)) {}(or copy
limitif move semantics are not desired).
♻️ Duplicate comments (1)
src/core_server/internal/interface/modules/query/evaluators/dynamic_evaluator.hpp (1)
119-141: Evaluator cleanup invariant and max_evaluators growth logic still fragileThe logic here still has the issue from the prior review:
- You compute
current_size, optionally run cleanup, then recomputecurrent_size.- If
current_size > max_evaluators, you log and doublemax_evaluatorsonce.- You then assert
evaluators_storage.size() <= max_evaluators.If cleanup removes little or nothing, you can end up with
evaluators_storage.size()still far above the (once-doubled)max_evaluators, tripping the assert as before (e.g., 5000 evaluators vs.max_evaluatorsgrown from 1000 → 2000).Consider the tighter version previously suggested:
void cleanup_evaluators_if_necessary(uint64_t current_time) { ZoneScopedN("Interface::DynamicEvaluator::cleanup_evaluators_if_necessary"); std::size_t current_size = evaluators_storage.size(); if (current_size > max_evaluators) { cleanup_empty_evaluators(); cleanup_out_of_time_window_evaluators(current_time); current_size = evaluators_storage.size(); } - if (current_size > max_evaluators) { + while (current_size > max_evaluators) { LOG_WARNING(this->logger, "Number of evaluators ({}) exceeded maximum allowed ({}). " "Doubling current size to {}", current_size, max_evaluators, max_evaluators * 2); max_evaluators *= 2; + // optionally recompute current_size here if you expect concurrent changes } - assert(evaluators_storage.size() <= max_evaluators + assert(current_size <= max_evaluators && "Number of evaluators should be within the maximum allowed after cleanup"); }This guarantees the invariant in debug builds even when cleanup cannot shrink far enough.
🧹 Nitpick comments (4)
src/core_server/internal/evaluation/evaluator.hpp (1)
71-72: Clarify and enforce caller contract for time-window and emptiness checks
is_empty()andis_time_window_empty()look reasonable, but they implicitly assume:
current_timeis monotonic per evaluator (assert(current_time >= last_tuple_time)).- An evaluator that has ever processed an event will likely never report
is_empty() == true, becausehistoric_union_list_mapis never cleared inreset().If this is intentional (i.e., “empty” really only means “never used” and cleanup is expected to rely mainly on
is_time_window_empty), consider documenting these expectations in comments, or relaxing the assert inis_time_window_emptyto fall back to “not empty” instead of aborting when the precondition is violated. Otherwise, any upstream bug that feeds an older timestamp into cleanup will assert before the out-of-order path innext()can run.Also applies to: 105-116
src/core_server/internal/interface/modules/query/evaluators/dynamic_evaluator.hpp (3)
47-69: Logger initialization pattern is fine, but the null-check is redundant
loggeris a member initialized tonullptrand then immediately checked in the constructor before being set fromquill::Frontend::get_logger("root"). Since nothing else can assignloggerbefore this constructor body runs, theif (logger == nullptr)is effectively always true.This is harmless, but you could simplify to a direct assignment in the constructor body or member initializer if you want to reduce noise.
146-157: Cleanup helpers assume single-threaded access and rely on strong invariantsBoth
cleanup_empty_evaluatorsandcleanup_out_of_time_window_evaluators:
- Assert
it->get() != nullptr.- Assert
it->use_count() == 1(no other shared owners).- Rely on
Evaluator::is_empty()andEvaluator::is_time_window_empty(current_time).This is fine if
DynamicEvaluatoris strictly single-threaded and the only extrashared_ptrcopies are the short-lived ones inprocess_event. If there is any possibility of:
- Concurrent calls to
process_event/cleanup on the same instance, or- External code holding
shared_ptrs to evaluators,these asserts will become fragile and may start firing in debug. If multi-threaded use is in scope, you’ll need either synchronization around
evaluators_storageor to relax theuse_count()assertions.Also, note that
cleanup_out_of_time_window_evaluatorswill hit theassert(current_time >= last_tuple_time)insideEvaluator::is_time_window_emptyif a non-monotonic timestamp is ever passed in for that evaluator, so the same monotonicity contract applies here.Also applies to: 162-173
178-201: Evaluator creation and ownership wiring are sound; minor style tweaks possibleThe creation helpers:
- Distinguish between expired existing indices and genuinely new indices (
create_evaluator_expired_or_new).- Use
create_evaluator(evaluator_idx)to build astd::unique_ptr<Evaluation::Evaluator>.- Move that into
evaluators_storage(which holdsstd::shared_ptr<...>), relying on the converting constructor fromunique_ptrtoshared_ptr.- Update
evaluator_idx_to_evaluatorwith aweak_ptrview of the list’sshared_ptr.Ownership and mapping invariants look correct, and the assertions on index bounds help. Two minor suggestions:
- You could use
evaluators_storage.emplace_back(std::move(evaluator));to make the unique→shared conversion more explicit.- Consider adding a brief comment explaining that
evaluator_idx_to_evaluatorcan hold expired weak_ptrs and will be refreshed lazily on demand, just to clarify why there’s no explicit cleanup of the index vector.Functionally, this part looks solid.
Also applies to: 206-218, 223-234
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (7)
.github/workflows/benchmark.yaml(0 hunks).github/workflows/checks.yaml(0 hunks).github/workflows/test-checks.yaml(0 hunks).github/workflows/upload-conan.yaml(0 hunks).github/workflows/valgrind-checks.yaml(0 hunks)src/core_server/internal/evaluation/evaluator.hpp(3 hunks)src/core_server/internal/interface/modules/query/evaluators/dynamic_evaluator.hpp(3 hunks)
💤 Files with no reviewable changes (5)
- .github/workflows/valgrind-checks.yaml
- .github/workflows/checks.yaml
- .github/workflows/test-checks.yaml
- .github/workflows/upload-conan.yaml
- .github/workflows/benchmark.yaml
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (15)
- GitHub Check: run-valgrind-debug (x86_64-linux-gcc-libstdc)
- GitHub Check: run-valgrind-debug (x86_64-linux-clang-libstdc)
- GitHub Check: upload-conan (x86_64-linux-gcc-libstdc)
- GitHub Check: upload-conan (x86_64-linux-clang-libstdc)
- GitHub Check: check-ceql-query-grammar-up-to-date
- GitHub Check: check-clang-tidy
- GitHub Check: run-sanitizer-tests-debug (x86_64-linux-clang-libstdc)
- GitHub Check: run-address-sanitizer-tests-release (x86_64-linux-clang-libstdc)
- GitHub Check: run-tests-release (x86_64-linux-gcc-libstdc)
- GitHub Check: run-basic-tests-debug (x86_64-linux-clang-libstdc)
- GitHub Check: run-tests-release (x86_64-linux-clang-libstdc)
- GitHub Check: run-thread-sanitizer-tests-release (x86_64-linux-clang-libstdc)
- GitHub Check: run-basic-tests-debug (x86_64-linux-gcc-libstdc)
- GitHub Check: run-benchmarks-release
- GitHub Check: run-docker-build-check
🔇 Additional comments (2)
src/core_server/internal/evaluation/evaluator.hpp (1)
134-148: Out-of-order handling now unconditionally drops events in release buildsThe new out-of-order check (
if (current_time < last_tuple_time) [[unlikely]]) with a CRITICAL log and an empty return makes behavior in non-debug builds explicit: out-of-order tuples are always ignored, and only inCORE_DEBUGdo you sleep and assert.If this is the intended contract (best-effort processing with strict ordering requirement), this change looks good and is much clearer than relying on debug-only checks. Just make sure callers and tests are updated to expect that out-of-order events are silently dropped (aside from logging) in release builds.
src/core_server/internal/interface/modules/query/evaluators/dynamic_evaluator.hpp (1)
83-114: Process flow and ANY-consumption reset look consistent with new storage model
process_event:
- Runs
cleanup_evaluators_if_necessary(time)first.- Lazily creates or recreates the evaluator at
evaluator_idxvia the weak_ptr mapping.- Locks a shared_ptr, asserts non-null, calls
evaluator->next(...).- Under
ANYpolicy, setsshould_reseton all evaluators inevaluators_storagewhen a match is produced.This sequencing is coherent with the new lifetime model and matches the intended “per-index evaluator” semantics. Just ensure that:
event_time(event)is monotonic per evaluator index.DynamicEvaluatoris not used concurrently from multiple threads, since none of the containers are synchronized and later cleanup relies onuse_count() == 1.Otherwise this looks good.
✅ Actions performedReviews paused. |
|
Stocks:
|
|
Unordered Stocks:
|
|
Smart Homes:
|
|
Taxis:
|
|
Ordered Bluesky:
|
|
Unordered Bluesky:
|
667aeb1 to
22acee4
Compare
22acee4 to
9b8903b
Compare
|
@CodeRabbit review |
✅ Actions performedReview triggered.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (1)
src/core_server/internal/interface/modules/query/evaluators/dynamic_evaluator.hpp (1)
186-203:clean_or_expand_evaluator_storagecan leaveevaluators_storage.size() > max_evaluatorsafter partial cleanupThe LRU traversal and erase logic look correct, but there’s a logic hole around
max_evaluatorsenforcement:size_t initial_size = evaluators_storage.size(); // ... // potentially erase some evaluators // ... if (initial_size != evaluators_storage.size()) { // log cleanup } else { // no cleanup; double max_evaluators max_evaluators *= 2; } assert(evaluators_storage.size() <= max_evaluators);If you start above the limit and remove some evaluators but not enough, you get:
initial_size != evaluators_storage.size()→ skip doubling,evaluators_storage.size()may still be> max_evaluators,- the final assert can fail.
Example:
max_evaluators = 1000,initial_size = 1500, cleanup removes 100 →evaluators_storage.size() = 1400(> 1000), butinitial_size != current_size, somax_evaluatorsstays 1000 and the assert trips.This is essentially the same issue that existed in the earlier
cleanup_evaluators_if_necessaryvariant noted in the past review; the shape has changed but the underlying invariant problem remains.One way to fix it while keeping your new function:
void clean_or_expand_evaluator_storage(uint64_t current_time) { ZoneScopedN("Interface::DynamicEvaluator::clean_or_expand_evaluator_storage"); const auto before_cleanup = evaluators_storage.size(); // Try to clean from LRU end for (auto it = evaluators_storage.rbegin(); it != evaluators_storage.rend();) { assert(it->evaluator != nullptr && "Evaluator pointer should not be null"); assert(it->evaluator.use_count() == 1 && "Shared pointer use count should be 1."); if (it->evaluator->is_time_window_empty(current_time)) { value_to_evaluator_list_pos.erase(it->evaluator_idx); auto to_erase = std::next(it).base(); it = std::make_reverse_iterator(evaluators_storage.erase(to_erase)); } else { ++it; } } auto current_size = evaluators_storage.size(); if (before_cleanup != current_size) { LOG_TRACE_L1(this->logger, "Cleaned up {} evaluators that were out of time window.", before_cleanup - current_size); } while (current_size > max_evaluators) { LOG_WARNING(this->logger, "Number of evaluators ({}) exceeded maximum allowed ({}). " "Doubling current size to {}", current_size, max_evaluators, max_evaluators * 2); max_evaluators *= 2; } assert(current_size <= max_evaluators && "Number of evaluators should be within the maximum allowed after cleanup"); }This preserves your logging but ensures the invariant
size <= max_evaluatorsis always restored, even when cleanup only partially succeeds.Also applies to: 207-243
🧹 Nitpick comments (5)
src/core_server/internal/evaluation/evaluator.hpp (2)
111-118: Simplifyis_time_window_emptyand validate the monotonic-time assumptionThe logic is correct and guards against underflow, but you can make it more compact and slightly clearer:
bool is_time_window_empty(uint64_t current_time) const { assert(current_time >= last_tuple_time); const uint64_t expiration_time = current_time < time_window ? 0 : current_time - time_window; return last_tuple_time < expiration_time; }Also,
assert(current_time >= last_tuple_time)assumes the caller always passes a globally non-decreasingcurrent_time. That matches howDynamicEvaluator::clean_or_expand_evaluator_storagecurrently uses it, but it’s worth keeping in mind that if upstream ever relaxes global ordering this assert will start to trip in debug builds.
136-150: Out-of-order handling innext()is now production-visible; behavior looks intentionalThe unconditional check
if (current_time < last_tuple_time) [[unlikely]] { ... return {}; }with a CRITICAL log and early return, plus keeping
last_tuple_timeunchanged, is a sensible way to reject late events without corrupting internal state. The debug-only sleep/assert underCORE_DEBUGalso keeps the production path lean.If you need more observability later, consider adding a dedicated metric/counter for dropped out-of-order events, but the current behavior is coherent as-is.
src/core_server/internal/interface/modules/query/evaluators/dynamic_evaluator.hpp (3)
32-58: MakeEvaluatorStorageWrapperconstructorsexplicitand simplify initializationThe wrapper design and
get_evaluator()logic look fine, but you can tighten the API and address the static analysis feedback:
- Mark the single-argument constructors as
explicitto avoid unintended implicit conversions.- Drop the redundant
evaluator()initializer in the iterator constructor; the member will be default-initialized anyway.For example:
struct EvaluatorStorageWrapper { std::optional<EvaluatorIndexWrapper> evaluator; std::optional<std::list<EvaluatorIndexWrapper>::iterator> list_position; explicit EvaluatorStorageWrapper(EvaluatorIndexWrapper&& evaluator) : evaluator(std::move(evaluator)), list_position(std::nullopt) {} explicit EvaluatorStorageWrapper(std::list<EvaluatorIndexWrapper>::iterator list_it) : list_position(list_it) {} EvaluatorIndexWrapper& get_evaluator() { if (evaluator.has_value()) { return *evaluator; } assert(list_position.has_value() && "Either evaluator or list position must be set"); return *list_position.value(); } };
96-110:process_event: LRU usage is correct; fix returnstd::moveand consider aligning comment with behaviorThe overall flow looks good:
- You obtain or create the evaluator, call
next, then only insert new evaluators into the LRU viasave_evaluator, which enforcesmax_evaluators.- For
ANYconsumption policy you correctly propagateshould_reset = trueto all stored evaluators after a successful match.Two small points:
- The comment
// Only if not empty and new, we save itdoesn’t match the code: you only check “new” (
evaluator_wrapper.evaluator.has_value()), not “not empty” (nois_empty()/enumerator.has_value()check). Either update the comment or add the missing condition, depending on your intent.
return std::move(enumerator);at the end prevents copy elision and triggers the Sonar warning. Prefer just:return enumerator;which will move under the usual return semantics without forcing it.
Also applies to: 124-152
157-180: Remove unusedcurrent_timeparameter fromget_or_create_evaluator(or use it)
get_or_create_evaluatortakesuint64_t current_timebut doesn’t use it anywhere. With common warning flags this will show up as an unused-parameter warning.If there’s no near-term plan to use
current_timehere (e.g., for eager cleanup decisions), simplify the API:EvaluatorStorageWrapper get_or_create_evaluator(size_t evaluator_idx) { ZoneScopedN("Interface::DynamicEvaluator::get_or_create_evaluator"); auto it = value_to_evaluator_list_pos.find(evaluator_idx); if (it != value_to_evaluator_list_pos.end()) { auto list_it = it->second; evaluators_storage.splice(evaluators_storage.begin(), evaluators_storage, list_it); it->second = evaluators_storage.begin(); return EvaluatorStorageWrapper(evaluators_storage.begin()); } auto evaluator = std::make_shared<Evaluation::Evaluator>( this->cea, evaluator_args.tuple_evaluator, time_window.duration, evaluator_args.event_time_of_expiration, evaluator_args.consumption_policy, evaluator_args.limit); return EvaluatorStorageWrapper(EvaluatorIndexWrapper{evaluator_idx, std::move(evaluator)}); }and adjust the call in
process_event.If you do intend to use
current_timelater, consider adding a[[maybe_unused]]attribute for now to keep builds clean.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
src/core_server/internal/evaluation/evaluator.hpp(3 hunks)src/core_server/internal/interface/modules/query/evaluators/dynamic_evaluator.hpp(4 hunks)
🧰 Additional context used
🧬 Code graph analysis (2)
src/core_server/internal/evaluation/evaluator.hpp (1)
src/core_server/internal/interface/modules/query/evaluators/dynamic_evaluator.hpp (4)
current_time(185-202)current_time(186-186)current_time(207-243)current_time(207-207)
src/core_server/internal/interface/modules/query/evaluators/dynamic_evaluator.hpp (1)
src/core_server/internal/evaluation/evaluator.hpp (8)
assert(105-109)event(120-228)event(121-121)event(260-319)event(260-264)ZoneScopedN(322-349)current_time(111-118)current_time(111-111)
🪛 GitHub Check: SonarCloud Code Analysis
src/core_server/internal/evaluation/evaluator.hpp
[warning] 114-114: Use the init-statement to declare "expiration_time" inside the if statement.
src/core_server/internal/interface/modules/query/evaluators/dynamic_evaluator.hpp
[failure] 44-44: Add the "explicit" keyword to this constructor.
[failure] 47-47: Add the "explicit" keyword to this constructor.
[warning] 48-48: Remove this use of the constructor's initializer list for data member "evaluator". It is redundant with default initialization behavior.
[warning] 151-151: moving a local object in a return statement prevents copy elision
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
- GitHub Check: check-clang-tidy
- GitHub Check: run-address-sanitizer-tests-release (x86_64-linux-clang-libstdc)
- GitHub Check: run-thread-sanitizer-tests-release (x86_64-linux-clang-libstdc)
- GitHub Check: run-benchmarks-release
🔇 Additional comments (1)
src/core_server/internal/evaluation/evaluator.hpp (1)
105-109:is_empty()assertion and semantics look soundThe consistency assert between
historic_ordered_keysandhistoric_union_list_mapplus usinghistoric_ordered_keys.empty()as the emptiness check is a reasonable invariant for this type. No issues from my side here.
|



Summary by CodeRabbit
New Features
Bug Fixes
Improvements
Chores
✏️ Tip: You can customize this high-level summary in your review settings.